Rahul Yadav created FLINK-6301:
----------------------------------

             Summary: Flink KafkaConnector09 leaks memory on reading compressed 
messages due to a Kafka consumer bug
                 Key: FLINK-6301
                 URL: https://issues.apache.org/jira/browse/FLINK-6301
             Project: Flink
          Issue Type: Bug
          Components: Kafka Connector
    Affects Versions: 1.1.4, 1.1.3, 1.2.0
            Reporter: Rahul Yadav


Hi

We are running Flink on a standalone cluster with 8 TaskManagers having 8 vCPUs 
and 8 slots each. Each host has 16 GB of RAM.

In our jobs, 

# We are consuming gzip compressed messages from Kafka using *KafkaConnector09* 
and use *rocksDB* backend for checkpoint storage.
# To debug the leak, we used *jemalloc and jprof* to profile the sources of 
malloc calls from the java process and attached are the profiles generated at 
various stages of the job. As we can see, apart from the os.malloc and 
rocksDB.allocateNewBlock, there are additional malloc calls coming from 
inflate() method of java.util.zip.inflater. These calls are innocuous as long 
as the inflater.end() method is called after it's use.
# To look for sources of inflate() method, we used Btrace on the running 
process to dump caller stack on the method call. Following is the stackTrace we 
got: 
{code}
java.util.zip.Inflater.inflate(Inflater.java)
java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152)
java.util.zip.GZIPInputStream.read(GZIPInputStream.java:117)
java.io.DataInputStream.readFully(DataInputStream.java:195)
org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:253)
org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:210)
org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
org.apache.kafka.common.record.MemoryRecords$RecordsIterator.innerDone(MemoryRecords.java:282)
org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:233)
org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:210)
org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
org.apache.kafka.clients.consumer.internals.Fetcher.handleFetchResponse(Fetcher.java:563)
org.apache.kafka.clients.consumer.internals.Fetcher.access$000(Fetcher.java:69)
org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:139)
org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:136)
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380)
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274)
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:227)
{code}
The end() method on Inflater is called inside the close() method of 
*InflaterInputSteam* (extended by *GZIPInputStream*) but looking through the 
Kafka consumer code, we found that RecordsIterator is not closing the 
compressor stream after use and hence, causing the memory leak:

https://github.com/apache/kafka/blob/23c69d62a0cabf06c4db8b338f0ca824dc6d81a7/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java#L210

https://issues.apache.org/jira/browse/KAFKA-3937 was filed for this and the 
issue was fixed in 0.10.1.0 but not back-ported to previous versions.

So, I would assume that we have to two paths from here: 
1. Wait for the changes to be back-ported to 0.9.x Kafka consumer and then, 
update the Kafka-clients dependency:
https://github.com/apache/flink/blob/release-1.2/flink-connectors/flink-connector-kafka-0.9/pom.xml#L40

2. Update the kafka-connector10 to use 0.10.1.0 clients library instead of 
0.10.0.1.
https://github.com/apache/flink/blob/release-1.2/flink-connectors/flink-connector-kafka-0.10/pom.xml#L40

Apart from the master, also back-port the changes to 1.2.x for Kafka connector 
10 and all the 1.x dependencies for Kafka connector 09.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to