[ https://issues.apache.org/jira/browse/FLINK-36569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17892455#comment-17892455 ]
Hongshun Wang commented on FLINK-36569: --------------------------------------- [~ft20082] Thanks for your explain, it seems one FlinkKafkaInternalProducer is enough for all the offset commit. > flink kafka connector do not close kafka produer when it checkpoint success > --------------------------------------------------------------------------- > > Key: FLINK-36569 > URL: https://issues.apache.org/jira/browse/FLINK-36569 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.19.0, 1.20.0 > Environment: flink: 1.20 > flink kafka connector: 3.3.0-1.20 > Reporter: Jake.zhang > Priority: Major > Attachments: image-2024-10-18-13-31-39-253.png, > image-2024-10-21-14-02-41-823.png > > > flink kafka connector do't close FlinkKafkaInternalProducer when flink > checkpoint success in flink 1.20/1.19 . it will create one > FlinkKafkaInternalProducer per checkpoint. > > FlinkKafkaInternalProducer do not close automatic. so kafka producer network > thread will more and more > it create `getRecoveryProducer` each time, `recyclable` object always > null, so `recyclable.ifPresent(Recyclable::close)` not work. > `org.apache.flink.connector.kafka.sink.KafkaCommitter` > {code:java} > producer = > recyclable > .<FlinkKafkaInternalProducer<?, ?>>map(Recyclable::getObject) > .orElseGet(() -> getRecoveryProducer(committable)); > producer.commitTransaction(); > producer.flush(); > recyclable.ifPresent(Recyclable::close);{code} > > !image-2024-10-21-14-02-41-823.png! > > !image-2024-10-18-13-31-39-253.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)