[ https://issues.apache.org/jira/browse/FLINK-33231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17773739#comment-17773739 ]
Tzu-Li (Gordon) Tai commented on FLINK-33231: --------------------------------------------- Making this a blocker for the upcoming Kafka connector releases. > Memory leak in KafkaSourceReader if no data in consumed topic > ------------------------------------------------------------- > > Key: FLINK-33231 > URL: https://issues.apache.org/jira/browse/FLINK-33231 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.17.1 > Reporter: Lauri Suurväli > Priority: Blocker > Fix For: kafka-3.0.1, kafka-3.1.0 > > Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot > 2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png > > > *Problem description* > Our Flink streaming job TaskManager heap gets full when the job has nothing > to consume and process. > It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. > When there are no messages in the source topic the TaskManager heap usage > starts increasing until the job exits after receiving a SIGTERM signal. We > are running the job on AWS EMR with YARN. > The problems with the TaskManager heap usage do not occur when there is data > to process. It's also worth noting that sending a single message to the > source topic of a streaming job that has been sitting idle and suffers from > the memory leak will cause the heap to be cleared. However it does not > resolve the problem since the heap usage will start increasing immediately > after processing the message. > !Screenshot 2023-10-10 at 12.49.37.png! > TaskManager heap used percentage is calculated by > > {code:java} > flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / > flink.taskmanager.Status.JVM.Memory.Heap.Max{code} > > > I was able to take heap dumps of the TaskManager processes during a high > heap usage percentage. Heap dump analysis detected 912,355 instances of > java.util.HashMap empty collections retaining >= 43,793,040 bytes. > !Screenshot 2023-10-09 at 14.13.43.png! > The retained heap seemed to be located at: > > {code:java} > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code} > > !Screenshot 2023-10-09 at 13.02.34.png! > > *Possible hints:* > An empty HashMap is added during the snapshotState method to offsetsToCommit > map if it does not already exist for the given checkpoint. [KafkaSourceReader > line > 107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107] > > {code:java} > Map<TopicPartition, OffsetAndMetadata> offsetsMap = > offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); > {code} > > If the startingOffset for the given split is >= 0 then a new entry would be > added to the map from the previous step. [KafkaSourceReader line > 113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113] > {code:java} > if (split.getStartingOffset() >= 0) { > offsetsMap.put( > split.getTopicPartition(), > new OffsetAndMetadata(split.getStartingOffset())); > }{code} > If the starting offset is smaller than 0 then this would leave the offsetMap > created in step 1 empty. We can see from the logs that the startingOffset is > -3 when the splits are added to the reader. > > {code:java} > Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, > StoppingOffset: -9223372036854775808], [Partition: source-events-44, > StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: > source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], > [Partition: source-events-36, StartingOffset: 1, StoppingOffset: > -9223372036854775808], [Partition: source-events-4, StartingOffset: -3, > StoppingOffset: -9223372036854775808], [Partition: source-events-28, > StartingOffset: -3, StoppingOffset: -9223372036854775808]]{code} > > > The offsetsToCommit map is cleaned from entries once they have been committed > to Kafka which happens during the callback function that is passed to the > KafkaSourceFetcherManager.commitOffsets method in > KafkaSourceReader.notifyCheckpointComplete method. > However if the committedPartitions is empty for the given checkpoint, then > the KafkaSourceFetcherManager.commitOffsets method returns. > [KafkaSourceFetcherManager line > 78|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java#L78] > {code:java} > if (offsetsToCommit.isEmpty()) { > return; > } {code} > We can observe from the logs that indeed an empty map is encountered at this > step: > {code:java} > Committing offsets {}{code} > *Conclusion* > It seems that an empty map gets added per each checkpoint to offsetsToCommit > map. Since the startingOffset in our case is -3 then the empty map never gets > filled. During the offset commit phase the offsets for these checkpoints are > ignored, since there is nothing to commit, however there isn't any cleanup > either so the empty maps keep accumulating. > -- This message was sent by Atlassian Jira (v8.20.10#820010)