[ 
https://issues.apache.org/jira/browse/FLINK-36569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17892455#comment-17892455
 ] 

Hongshun Wang commented on FLINK-36569:
---------------------------------------

[~ft20082]  Thanks for your explain, it seems one FlinkKafkaInternalProducer is 
enough for all the offset commit.

> flink kafka connector do not close kafka produer when it checkpoint success
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-36569
>                 URL: https://issues.apache.org/jira/browse/FLINK-36569
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.19.0, 1.20.0
>         Environment: flink: 1.20
> flink kafka connector: 3.3.0-1.20 
>            Reporter: Jake.zhang
>            Priority: Major
>         Attachments: image-2024-10-18-13-31-39-253.png, 
> image-2024-10-21-14-02-41-823.png
>
>
> flink kafka connector do't close FlinkKafkaInternalProducer when flink 
> checkpoint success in flink 1.20/1.19 .  it will create one  
> FlinkKafkaInternalProducer per checkpoint.
>  
> FlinkKafkaInternalProducer do not close automatic. so  kafka producer network 
> thread will more and more 
>  it create `getRecoveryProducer` each time,   `recyclable` object always 
> null, so  `recyclable.ifPresent(Recyclable::close)` not work.
> `org.apache.flink.connector.kafka.sink.KafkaCommitter`
> {code:java}
> producer =
> recyclable
> .<FlinkKafkaInternalProducer<?, ?>>map(Recyclable::getObject)
> .orElseGet(() -> getRecoveryProducer(committable));
> producer.commitTransaction();
> producer.flush();
> recyclable.ifPresent(Recyclable::close);{code}
>  
> !image-2024-10-21-14-02-41-823.png!
>  
> !image-2024-10-18-13-31-39-253.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to