[ https://issues.apache.org/jira/browse/FLINK-6301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15966797#comment-15966797 ]
Rahul Yadav commented on FLINK-6301: ------------------------------------ Linking to the underlying Kafka issue > Flink KafkaConnector09 leaks memory on reading compressed messages due to a > Kafka consumer bug > ---------------------------------------------------------------------------------------------- > > Key: FLINK-6301 > URL: https://issues.apache.org/jira/browse/FLINK-6301 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.2.0, 1.1.3, 1.1.4 > Reporter: Rahul Yadav > Attachments: jeprof.24611.1228.i1228.heap.svg, > jeprof.24611.1695.i1695.heap.svg, jeprof.24611.265.i265.heap.svg, > jeprof.24611.3138.i3138.heap.svg, jeprof.24611.595.i595.heap.svg, > jeprof.24611.705.i705.heap.svg, jeprof.24611.81.i81.heap.svg > > > Hi > We are running Flink on a standalone cluster with 8 TaskManagers having 8 > vCPUs and 8 slots each. Each host has 16 GB of RAM. > In our jobs, > # We are consuming gzip compressed messages from Kafka using > *KafkaConnector09* and use *rocksDB* backend for checkpoint storage. > # To debug the leak, we used *jemalloc and jprof* to profile the sources of > malloc calls from the java process and attached are the profiles generated at > various stages of the job. As we can see, apart from the os.malloc and > rocksDB.allocateNewBlock, there are additional malloc calls coming from > inflate() method of java.util.zip.inflater. These calls are innocuous as long > as the inflater.end() method is called after it's use. > # To look for sources of inflate() method, we used Btrace on the running > process to dump caller stack on the method call. Following is the stackTrace > we got: > {code} > java.util.zip.Inflater.inflate(Inflater.java) > java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152) > java.util.zip.GZIPInputStream.read(GZIPInputStream.java:117) > java.io.DataInputStream.readFully(DataInputStream.java:195) > org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:253) > org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:210) > org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79) > org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45) > org.apache.kafka.common.record.MemoryRecords$RecordsIterator.innerDone(MemoryRecords.java:282) > org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:233) > org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:210) > org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79) > org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45) > org.apache.kafka.clients.consumer.internals.Fetcher.handleFetchResponse(Fetcher.java:563) > org.apache.kafka.clients.consumer.internals.Fetcher.access$000(Fetcher.java:69) > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:139) > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:136) > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380) > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274) > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908) > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:227) > {code} > The end() method on Inflater is called inside the close() method of > *InflaterInputSteam* (extended by *GZIPInputStream*) but looking through the > Kafka consumer code, we found that RecordsIterator is not closing the > compressor stream after use and hence, causing the memory leak: > https://github.com/apache/kafka/blob/23c69d62a0cabf06c4db8b338f0ca824dc6d81a7/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java#L210 > https://issues.apache.org/jira/browse/KAFKA-3937 was filed for this and the > issue was fixed in 0.10.1.0 but not back-ported to previous versions. > So, I would assume that we have to two paths from here: > 1. Wait for the changes to be back-ported to 0.9.x Kafka consumer and then, > update the Kafka-clients dependency: > https://github.com/apache/flink/blob/release-1.2/flink-connectors/flink-connector-kafka-0.9/pom.xml#L40 > 2. Update the kafka-connector10 to use 0.10.1.0 clients library instead of > 0.10.0.1. > https://github.com/apache/flink/blob/release-1.2/flink-connectors/flink-connector-kafka-0.10/pom.xml#L40 > Apart from the master, also back-port the changes to 1.2.x for Kafka > connector 10 and all the 1.x dependencies for Kafka connector 09. -- This message was sent by Atlassian JIRA (v6.3.15#6346)