[ https://issues.apache.org/jira/browse/FLINK-36569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891735#comment-17891735 ]
Jake.zhang edited comment on FLINK-36569 at 10/22/24 6:34 AM: -------------------------------------------------------------- add schedule close service in `org.apache.flink.connector.kafka.sink.KafkaWriter` constructor, reserve recently 5 checkpoint producer. it works. need to wait for the Kafka transaction timeout. {code:java} initFlinkMetrics(); // 定时任务检查是否有需要关闭的 checkpoint id this.autoCloseService = Executors.newSingleThreadScheduledExecutor(); this.autoCloseService.scheduleWithFixedDelay( () -> { try { LOG.info("last checkpoint id: {}", lastCheckpointId); // 保留到上一个 checkpoint 的 producer,即 successCheckpointId - 1 作为最大的 transaction FlinkKafkaInternalProducer flinkKafkaInternalProducer = null; while ((flinkKafkaInternalProducer = (FlinkKafkaInternalProducer) this.producerCloseables.peek()) != null) { String transactionId = flinkKafkaInternalProducer.getTransactionalId(); assert transactionId != null; String[] transactionIdArr = transactionId.split("-"); long itemCheckpointId = Long.parseLong(transactionIdArr[transactionIdArr.length - 1]); if (lastCheckpointId - 5 > itemCheckpointId) { // 消费出来置空 try { FlinkKafkaInternalProducer closeable = (FlinkKafkaInternalProducer) this.producerCloseables.poll(); closeable.close(); LOG.info( "close producer transaction id: {}", closeable.getTransactionalId()); } catch (Exception e) { LOG.warn("fkip close error", e); } } else { // 等待下次检查 break; } } } catch (Exception e) { LOG.warn("schedule auto close producer error", e); } }, 60, 60, TimeUnit.SECONDS); } {code} was (Author: ft20082): add schedule close service in `org.apache.flink.connector.kafka.sink.KafkaWriter`, reserve recently 5 checkpoint producer. it works. need to wait for the Kafka transaction timeout. {code:java} initFlinkMetrics(); // 定时任务检查是否有需要关闭的 checkpoint id this.autoCloseService = Executors.newSingleThreadScheduledExecutor(); this.autoCloseService.scheduleWithFixedDelay( () -> { try { LOG.info("last checkpoint id: {}", lastCheckpointId); // 保留到上一个 checkpoint 的 producer,即 successCheckpointId - 1 作为最大的 transaction FlinkKafkaInternalProducer flinkKafkaInternalProducer = null; while ((flinkKafkaInternalProducer = (FlinkKafkaInternalProducer) this.producerCloseables.peek()) != null) { String transactionId = flinkKafkaInternalProducer.getTransactionalId(); assert transactionId != null; String[] transactionIdArr = transactionId.split("-"); long itemCheckpointId = Long.parseLong(transactionIdArr[transactionIdArr.length - 1]); if (lastCheckpointId - 5 > itemCheckpointId) { // 消费出来置空 try { FlinkKafkaInternalProducer closeable = (FlinkKafkaInternalProducer) this.producerCloseables.poll(); closeable.close(); LOG.info( "close producer transaction id: {}", closeable.getTransactionalId()); } catch (Exception e) { LOG.warn("fkip close error", e); } } else { // 等待下次检查 break; } } } catch (Exception e) { LOG.warn("schedule auto close producer error", e); } }, 60, 60, TimeUnit.SECONDS); } {code} > flink kafka connector do not close kafka produer when it checkpoint success > --------------------------------------------------------------------------- > > Key: FLINK-36569 > URL: https://issues.apache.org/jira/browse/FLINK-36569 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.19.0, 1.20.0 > Environment: flink: 1.20 > flink kafka connector: 3.3.0-1.20 > Reporter: Jake.zhang > Priority: Major > Attachments: image-2024-10-18-13-31-39-253.png, > image-2024-10-21-14-02-41-823.png > > > flink kafka connector do't close FlinkKafkaInternalProducer when flink > checkpoint success in flink 1.20/1.19 . it will create one > FlinkKafkaInternalProducer per checkpoint. > > FlinkKafkaInternalProducer do not close automatic. so kafka producer network > thread will more and more > it create `getRecoveryProducer` each time, `recyclable` object always > null, so `recyclable.ifPresent(Recyclable::close)` not work. > `org.apache.flink.connector.kafka.sink.KafkaCommitter` > {code:java} > producer = > recyclable > .<FlinkKafkaInternalProducer<?, ?>>map(Recyclable::getObject) > .orElseGet(() -> getRecoveryProducer(committable)); > producer.commitTransaction(); > producer.flush(); > recyclable.ifPresent(Recyclable::close);{code} > > !image-2024-10-21-14-02-41-823.png! > > !image-2024-10-18-13-31-39-253.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)