Hi Xintong, Unfortunately I cannot upgrade to 1.10.2, because EMR has either 1.10.0 or 1.11.0.
About the overhead - turns out I already configured taskmanager.memory.jvm-overhead.max to 2 gb instead of the default 1 gb. Should I increase it further? state.backend.rocksdb.memory.managed is already not explicitly configured. Is there anything else I can do? On Thu, Oct 29, 2020 at 1:24 PM Xintong Song <tonysong...@gmail.com> wrote: > Hi Ori, > > RocksDB also uses managed memory. If the memory overuse indeed comes from > RocksDB, then increasing managed memory fraction will not help. RocksDB > will try to use as many memory as the configured managed memory size. > Therefore increasing managed memory fraction also makes RocksDB try to use > more memory. That is why I suggested increasing `jvm-overhead` instead. > > Please also make sure the configuration option > `state.backend.rocksdb.memory.managed` is either not explicitly configured, > or configured to `true`. > > In addition, I noticed that you are using Flink 1.10.0. You might want to > upgrade to 1.10.2, to include the latest bug fixes on the 1.10 release. > > Thank you~ > > Xintong Song > > > > On Thu, Oct 29, 2020 at 4:41 PM Ori Popowski <ori....@gmail.com> wrote: > >> Hi, >> >> PID 20331 is indeed the Flink process, specifically the TaskManager >> process. >> >> - Workload is a streaming workload reading from Kafka and writing to S3 >> using a custom Sink >> - RockDB state backend is used with default settings >> - My external dependencies are: >> -- logback >> -- jackson >> -- flatbuffers >> -- jaxb-api >> -- scala-java8-compat >> -- apache commons-io >> -- apache commons-compress >> -- software.amazon.awssdk s3 >> - What do you mean by UDFs? I've implemented several operators like >> KafkaDeserializationSchema, FlatMap, Map, ProcessFunction. >> >> We use a SessionWindow with 30 minutes of gap, and a watermark with 10 >> minutes delay. >> >> We did confirm we have some keys in our job which keep receiving records >> indefinitely, but I'm not sure why it would cause a managed memory leak, >> since this should be flushed to RocksDB and free the memory used. We have a >> guard against this, where we keep the overall size of all the records for >> each key, and when it reaches 300mb, we don't move the records downstream, >> which causes them to create a session and go through the sink. >> >> About what you suggested - I kind of did this by increasing the managed >> memory fraction to 0.5. And it did postpone the occurrence of the problem >> (meaning, the TMs started crashing after 10 days instead of 7 days). It >> looks like anything I'll do on that front will only postpone the problem >> but not solve it. >> >> I am attaching the full job configuration. >> >> >> >> On Thu, Oct 29, 2020 at 10:09 AM Xintong Song <tonysong...@gmail.com> >> wrote: >> >>> Hi Ori, >>> >>> It looks like Flink indeed uses more memory than expected. I assume the >>> first item with PID 20331 is the flink process, right? >>> >>> It would be helpful if you can briefly introduce your workload. >>> - What kind of workload are you running? Streaming or batch? >>> - Do you use RocksDB state backend? >>> - Any UDFs or 3rd party dependencies that might allocate significant >>> native memory? >>> >>> Moreover, if the metrics shows only 20% heap usages, I would suggest >>> configuring less `task.heap.size`, leaving more memory to off-heap. The >>> reduced heap size does not necessarily all go to the managed memory. You >>> can also try increasing the `jvm-overhead`, simply to leave more native >>> memory in the container in case there are other other significant native >>> memory usages. >>> >>> Thank you~ >>> >>> Xintong Song >>> >>> >>> >>> On Wed, Oct 28, 2020 at 5:53 PM Ori Popowski <ori....@gmail.com> wrote: >>> >>>> Hi Xintong, >>>> >>>> See here: >>>> >>>> # Top memory users >>>> ps auxwww --sort -rss | head -10 >>>> USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND >>>> yarn 20339 35.8 97.0 128600192 126672256 ? Sl Oct15 5975:47 >>>> /etc/alternatives/jre/bin/java -Xmx54760833024 -Xms54760833024 -XX:Max >>>> root 5245 0.1 0.4 5580484 627436 ? Sl Jul30 144:39 >>>> /etc/alternatives/jre/bin/java -Xmx1024m -XX:+ExitOnOutOfMemoryError -X >>>> hadoop 5252 0.1 0.4 7376768 604772 ? Sl Jul30 153:22 >>>> /etc/alternatives/jre/bin/java -Xmx1024m -XX:+ExitOnOutOfMemoryError -X >>>> yarn 26857 0.3 0.2 4214784 341464 ? Sl Sep17 198:43 >>>> /etc/alternatives/jre/bin/java -Dproc_nodemanager -Xmx2048m -XX:OnOutOf >>>> root 5519 0.0 0.2 5658624 269344 ? Sl Jul30 45:21 >>>> /usr/bin/java -Xmx1500m -Xms300m -XX:+ExitOnOutOfMemoryError -XX:MinHea >>>> root 1781 0.0 0.0 172644 8096 ? Ss Jul30 2:06 >>>> /usr/lib/systemd/systemd-journald >>>> root 4801 0.0 0.0 2690260 4776 ? Ssl Jul30 4:42 >>>> /usr/bin/amazon-ssm-agent >>>> root 6566 0.0 0.0 164672 4116 ? R 00:30 0:00 ps >>>> auxwww --sort -rss >>>> root 6532 0.0 0.0 183124 3592 ? S 00:30 0:00 >>>> /usr/sbin/CROND -n >>>> >>>> On Wed, Oct 28, 2020 at 11:34 AM Xintong Song <tonysong...@gmail.com> >>>> wrote: >>>> >>>>> Hi Ori, >>>>> >>>>> The error message suggests that there's not enough physical memory on >>>>> the machine to satisfy the allocation. This does not necessarily mean a >>>>> managed memory leak. Managed memory leak is only one of the possibilities. >>>>> There are other potential reasons, e.g., another process/container on the >>>>> machine used more memory than expected, Yarn NM is not configured with >>>>> enough memory reserved for the system processes, etc. >>>>> >>>>> I would suggest to first look into the machine memory usages, see >>>>> whether the Flink process indeed uses more memory than expected. This >>>>> could >>>>> be achieved via: >>>>> - Run the `top` command >>>>> - Look into the `/proc/meminfo` file >>>>> - Any container memory usage metrics that are available to your Yarn >>>>> cluster >>>>> >>>>> Thank you~ >>>>> >>>>> Xintong Song >>>>> >>>>> >>>>> >>>>> On Tue, Oct 27, 2020 at 6:21 PM Ori Popowski <ori....@gmail.com> >>>>> wrote: >>>>> >>>>>> After the job is running for 10 days in production, TaskManagers >>>>>> start failing with: >>>>>> >>>>>> Connection unexpectedly closed by remote task manager >>>>>> >>>>>> Looking in the machine logs, I can see the following error: >>>>>> >>>>>> ============= Java processes for user hadoop ============= >>>>>> OpenJDK 64-Bit Server VM warning: INFO: >>>>>> os::commit_memory(0x00007fb4f4010000, 1006567424, 0) failed; >>>>>> error='Cannot >>>>>> allocate memory' (err >>>>>> # >>>>>> # There is insufficient memory for the Java Runtime Environment to >>>>>> continue. >>>>>> # Native memory allocation (mmap) failed to map 1006567424 bytes for >>>>>> committing reserved memory. >>>>>> # An error report file with more information is saved as: >>>>>> # /mnt/tmp/hsperfdata_hadoop/hs_err_pid6585.log >>>>>> =========== End java processes for user hadoop =========== >>>>>> >>>>>> In addition, the metrics for the TaskManager show very low Heap >>>>>> memory consumption (20% of Xmx). >>>>>> >>>>>> Hence, I suspect there is a memory leak in the TaskManager's Managed >>>>>> Memory. >>>>>> >>>>>> This my TaskManager's memory detail: >>>>>> flink process 112g >>>>>> framework.heap.size 0.2g >>>>>> task.heap.size 50g >>>>>> managed.size 54g >>>>>> framework.off-heap.size 0.5g >>>>>> task.off-heap.size 1g >>>>>> network 2g >>>>>> XX:MaxMetaspaceSize 1g >>>>>> >>>>>> As you can see, the managed memory is 54g, so it's already high (my >>>>>> managed.fraction is set to 0.5). >>>>>> >>>>>> I'm running Flink 1.10. Full job details attached. >>>>>> >>>>>> Can someone advise what would cause a managed memory leak? >>>>>> >>>>>> >>>>>>