Hi Xintong,

Unfortunately I cannot upgrade to 1.10.2, because EMR has either 1.10.0 or
1.11.0.

About the overhead - turns out I already configured
taskmanager.memory.jvm-overhead.max to 2 gb instead of the default 1 gb.
Should I increase it further?

state.backend.rocksdb.memory.managed is already not explicitly configured.

Is there anything else I can do?



On Thu, Oct 29, 2020 at 1:24 PM Xintong Song <tonysong...@gmail.com> wrote:

> Hi Ori,
>
> RocksDB also uses managed memory. If the memory overuse indeed comes from
> RocksDB, then increasing managed memory fraction will not help. RocksDB
> will try to use as many memory as the configured managed memory size.
> Therefore increasing managed memory fraction also makes RocksDB try to use
> more memory. That is why I suggested increasing `jvm-overhead` instead.
>
> Please also make sure the configuration option
> `state.backend.rocksdb.memory.managed` is either not explicitly configured,
> or configured to `true`.
>
> In addition, I noticed that you are using Flink 1.10.0. You might want to
> upgrade to 1.10.2, to include the latest bug fixes on the 1.10 release.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Oct 29, 2020 at 4:41 PM Ori Popowski <ori....@gmail.com> wrote:
>
>> Hi,
>>
>> PID 20331 is indeed the Flink process, specifically the TaskManager
>> process.
>>
>> - Workload is a streaming workload reading from Kafka and writing to S3
>> using a custom Sink
>> - RockDB state backend is used with default settings
>> - My external dependencies are:
>> -- logback
>> -- jackson
>> -- flatbuffers
>> -- jaxb-api
>> -- scala-java8-compat
>> -- apache commons-io
>> -- apache commons-compress
>> -- software.amazon.awssdk s3
>> - What do you mean by UDFs? I've implemented several operators like
>> KafkaDeserializationSchema, FlatMap, Map, ProcessFunction.
>>
>> We use a SessionWindow with 30 minutes of gap, and a watermark with 10
>> minutes delay.
>>
>> We did confirm we have some keys in our job which keep receiving records
>> indefinitely, but I'm not sure why it would cause a managed memory leak,
>> since this should be flushed to RocksDB and free the memory used. We have a
>> guard against this, where we keep the overall size of all the records for
>> each key, and when it reaches 300mb, we don't move the records downstream,
>> which causes them to create a session and go through the sink.
>>
>> About what you suggested - I kind of did this by increasing the managed
>> memory fraction to 0.5. And it did postpone the occurrence of the problem
>> (meaning, the TMs started crashing after 10 days instead of 7 days). It
>> looks like anything I'll do on that front will only postpone the problem
>> but not solve it.
>>
>> I am attaching the full job configuration.
>>
>>
>>
>> On Thu, Oct 29, 2020 at 10:09 AM Xintong Song <tonysong...@gmail.com>
>> wrote:
>>
>>> Hi Ori,
>>>
>>> It looks like Flink indeed uses more memory than expected. I assume the
>>> first item with PID 20331 is the flink process, right?
>>>
>>> It would be helpful if you can briefly introduce your workload.
>>> - What kind of workload are you running? Streaming or batch?
>>> - Do you use RocksDB state backend?
>>> - Any UDFs or 3rd party dependencies that might allocate significant
>>> native memory?
>>>
>>> Moreover, if the metrics shows only 20% heap usages, I would suggest
>>> configuring less `task.heap.size`, leaving more memory to off-heap. The
>>> reduced heap size does not necessarily all go to the managed memory. You
>>> can also try increasing the `jvm-overhead`, simply to leave more native
>>> memory in the container in case there are other other significant native
>>> memory usages.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Wed, Oct 28, 2020 at 5:53 PM Ori Popowski <ori....@gmail.com> wrote:
>>>
>>>> Hi Xintong,
>>>>
>>>> See here:
>>>>
>>>> # Top memory users
>>>> ps auxwww --sort -rss | head -10
>>>> USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
>>>> yarn     20339 35.8 97.0 128600192 126672256 ? Sl   Oct15 5975:47
>>>> /etc/alternatives/jre/bin/java -Xmx54760833024 -Xms54760833024 -XX:Max
>>>> root      5245  0.1  0.4 5580484 627436 ?      Sl   Jul30 144:39
>>>> /etc/alternatives/jre/bin/java -Xmx1024m -XX:+ExitOnOutOfMemoryError -X
>>>> hadoop    5252  0.1  0.4 7376768 604772 ?      Sl   Jul30 153:22
>>>> /etc/alternatives/jre/bin/java -Xmx1024m -XX:+ExitOnOutOfMemoryError -X
>>>> yarn     26857  0.3  0.2 4214784 341464 ?      Sl   Sep17 198:43
>>>> /etc/alternatives/jre/bin/java -Dproc_nodemanager -Xmx2048m -XX:OnOutOf
>>>> root      5519  0.0  0.2 5658624 269344 ?      Sl   Jul30  45:21
>>>> /usr/bin/java -Xmx1500m -Xms300m -XX:+ExitOnOutOfMemoryError -XX:MinHea
>>>> root      1781  0.0  0.0 172644  8096 ?        Ss   Jul30   2:06
>>>> /usr/lib/systemd/systemd-journald
>>>> root      4801  0.0  0.0 2690260 4776 ?        Ssl  Jul30   4:42
>>>> /usr/bin/amazon-ssm-agent
>>>> root      6566  0.0  0.0 164672  4116 ?        R    00:30   0:00 ps
>>>> auxwww --sort -rss
>>>> root      6532  0.0  0.0 183124  3592 ?        S    00:30   0:00
>>>> /usr/sbin/CROND -n
>>>>
>>>> On Wed, Oct 28, 2020 at 11:34 AM Xintong Song <tonysong...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Ori,
>>>>>
>>>>> The error message suggests that there's not enough physical memory on
>>>>> the machine to satisfy the allocation. This does not necessarily mean a
>>>>> managed memory leak. Managed memory leak is only one of the possibilities.
>>>>> There are other potential reasons, e.g., another process/container on the
>>>>> machine used more memory than expected, Yarn NM is not configured with
>>>>> enough memory reserved for the system processes, etc.
>>>>>
>>>>> I would suggest to first look into the machine memory usages, see
>>>>> whether the Flink process indeed uses more memory than expected. This 
>>>>> could
>>>>> be achieved via:
>>>>> - Run the `top` command
>>>>> - Look into the `/proc/meminfo` file
>>>>> - Any container memory usage metrics that are available to your Yarn
>>>>> cluster
>>>>>
>>>>> Thank you~
>>>>>
>>>>> Xintong Song
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Oct 27, 2020 at 6:21 PM Ori Popowski <ori....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> After the job is running for 10 days in production, TaskManagers
>>>>>> start failing with:
>>>>>>
>>>>>> Connection unexpectedly closed by remote task manager
>>>>>>
>>>>>> Looking in the machine logs, I can see the following error:
>>>>>>
>>>>>> ============= Java processes for user hadoop =============
>>>>>> OpenJDK 64-Bit Server VM warning: INFO:
>>>>>> os::commit_memory(0x00007fb4f4010000, 1006567424, 0) failed; 
>>>>>> error='Cannot
>>>>>> allocate memory' (err
>>>>>> #
>>>>>> # There is insufficient memory for the Java Runtime Environment to
>>>>>> continue.
>>>>>> # Native memory allocation (mmap) failed to map 1006567424 bytes for
>>>>>> committing reserved memory.
>>>>>> # An error report file with more information is saved as:
>>>>>> # /mnt/tmp/hsperfdata_hadoop/hs_err_pid6585.log
>>>>>> =========== End java processes for user hadoop ===========
>>>>>>
>>>>>> In addition, the metrics for the TaskManager show very low Heap
>>>>>> memory consumption (20% of Xmx).
>>>>>>
>>>>>> Hence, I suspect there is a memory leak in the TaskManager's Managed
>>>>>> Memory.
>>>>>>
>>>>>> This my TaskManager's memory detail:
>>>>>> flink process 112g
>>>>>> framework.heap.size 0.2g
>>>>>> task.heap.size 50g
>>>>>> managed.size 54g
>>>>>> framework.off-heap.size 0.5g
>>>>>> task.off-heap.size 1g
>>>>>> network 2g
>>>>>> XX:MaxMetaspaceSize 1g
>>>>>>
>>>>>> As you can see, the managed memory is 54g, so it's already high (my
>>>>>> managed.fraction is set to 0.5).
>>>>>>
>>>>>> I'm running Flink 1.10. Full job details attached.
>>>>>>
>>>>>> Can someone advise what would cause a managed memory leak?
>>>>>>
>>>>>>
>>>>>>

Reply via email to