[ 
https://issues.apache.org/jira/browse/FLINK-33231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17773790#comment-17773790
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-33231:
---------------------------------------------

[~lauri.suurvali] I think that would work, but the issue is that in the 
callback, on success we log that a commit was successful, and also source 
reader metrics is bumped. Which can be confusing if no offsets were actually 
committed. Moreoever, with that approach we would be relying on internal 
details of the Kafka client that is hard to cover with tests (i.e. things might 
silently change such that a remote request is issued even if provided offsets 
are empty, which is not ideal).

So, I think we can be a bit cleaner by short-cutting the 
{{notifyCheckpointComplete}} method such that is the offsets for a checkpoint 
is empty, we don't even attempt to use the fetcher manager to try to commit 
offsets.

> Memory leak in KafkaSourceReader if no data in consumed topic
> -------------------------------------------------------------
>
>                 Key: FLINK-33231
>                 URL: https://issues.apache.org/jira/browse/FLINK-33231
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.17.1
>            Reporter: Lauri Suurväli
>            Priority: Blocker
>             Fix For: kafka-3.0.1, kafka-3.1.0
>
>         Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot 
> 2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png
>
>
> *Problem description*
> Our Flink streaming job TaskManager heap gets full when the job has nothing 
> to consume and process.
> It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. 
> When there are no messages in the source topic the TaskManager heap usage 
> starts increasing until the job exits after receiving a SIGTERM signal. We 
> are running the job on AWS EMR with YARN.
> The problems with the TaskManager heap usage do not occur when there is data 
> to process. It's also worth noting that sending a single message to the 
> source topic of a streaming job that has been sitting idle and suffers from 
> the memory leak will cause the heap to be cleared. However it does not 
> resolve the problem since the heap usage will start increasing immediately 
> after processing the message.
> !Screenshot 2023-10-10 at 12.49.37.png!
> TaskManager heap used percentage is calculated by 
>  
> {code:java}
> flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / 
> flink.taskmanager.Status.JVM.Memory.Heap.Max{code}
>  
>  
>  I was able to take heap dumps of the TaskManager processes during a high 
> heap usage percentage. Heap dump analysis detected 912,355 instances of 
> java.util.HashMap empty collections retaining >= 43,793,040 bytes.
> !Screenshot 2023-10-09 at 14.13.43.png!
> The retained heap seemed to be located at:
>  
> {code:java}
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code}
>  
> !Screenshot 2023-10-09 at 13.02.34.png!
>  
> *Possible hints:*
> An empty HashMap is added during the snapshotState method to offsetsToCommit 
> map if it does not already exist for the given checkpoint. [KafkaSourceReader 
> line 
> 107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107]
>  
> {code:java}
> Map<TopicPartition, OffsetAndMetadata> offsetsMap =
>         offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); 
> {code}
>  
> If the startingOffset for the given split is >= 0 then a new entry would be 
> added to the map from the previous step. [KafkaSourceReader line 
> 113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113]
> {code:java}
> if (split.getStartingOffset() >= 0) {
>     offsetsMap.put(
>         split.getTopicPartition(),
>         new OffsetAndMetadata(split.getStartingOffset()));
> }{code}
> If the starting offset is smaller than 0 then this would leave the offsetMap 
> created in step 1 empty. We can see from the logs that the startingOffset is 
> -3 when the splits are added to the reader.
>  
> {code:java}
> Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-44, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: 
> source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], 
> [Partition: source-events-36, StartingOffset: 1, StoppingOffset: 
> -9223372036854775808], [Partition: source-events-4, StartingOffset: -3, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-28, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808]]{code}
>  
>  
> The offsetsToCommit map is cleaned from entries once they have been committed 
> to Kafka which happens during the callback function that is passed to the 
> KafkaSourceFetcherManager.commitOffsets method in 
> KafkaSourceReader.notifyCheckpointComplete method.
> However if the committedPartitions is empty for the given checkpoint, then 
> the KafkaSourceFetcherManager.commitOffsets method returns.  
> [KafkaSourceFetcherManager line 
> 78|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java#L78]
> {code:java}
> if (offsetsToCommit.isEmpty()) {
>     return;
> } {code}
> We can observe from the logs that indeed an empty map is encountered at this 
> step:
> {code:java}
> Committing offsets {}{code}
> *Conclusion*
> It seems that an empty map gets added per each checkpoint to offsetsToCommit 
> map. Since the startingOffset in our case is -3 then the empty map never gets 
> filled. During the offset commit phase the offsets for these checkpoints are 
> ignored, since there is nothing to commit, however there isn't any cleanup 
> either so the empty maps keep accumulating. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to