[ https://issues.apache.org/jira/browse/FLINK-36569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891376#comment-17891376 ]
Jake.zhang commented on FLINK-36569: ------------------------------------ Yes, it create `getRecoveryProducer` each time, `recyclable` object always null, so `recyclable.ifPresent(Recyclable::close)` not work. {code:java} producer = recyclable .<FlinkKafkaInternalProducer<?, ?>>map(Recyclable::getObject) .orElseGet(() -> getRecoveryProducer(committable)); producer.commitTransaction(); producer.flush(); recyclable.ifPresent(Recyclable::close);{code} > flink kafka connector do not close kafka produer when it checkpoint success > --------------------------------------------------------------------------- > > Key: FLINK-36569 > URL: https://issues.apache.org/jira/browse/FLINK-36569 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.19.0, 1.20.0 > Environment: flink: 1.20 > flink kafka connector: 3.3.0-1.20 > Reporter: Jake.zhang > Priority: Major > Attachments: image-2024-10-18-13-31-39-253.png > > > flink kafka connector do't close FlinkKafkaInternalProducer when flink > checkpoint success in flink 1.20/1.19 . it will create one > FlinkKafkaInternalProducer per checkpoint. > > FlinkKafkaInternalProducer do not close automatic. so kafka producer network > thread will more and more > > !image-2024-10-18-13-31-39-253.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)