[ 
https://issues.apache.org/jira/browse/FLINK-36569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17891735#comment-17891735
 ] 

Jake.zhang edited comment on FLINK-36569 at 10/22/24 6:34 AM:
--------------------------------------------------------------

add schedule close service in 
`org.apache.flink.connector.kafka.sink.KafkaWriter` constructor,  reserve 
recently 5 checkpoint producer.  it works.

need to wait for the Kafka transaction timeout.
{code:java}
    initFlinkMetrics();
    // 定时任务检查是否有需要关闭的 checkpoint id
    this.autoCloseService = Executors.newSingleThreadScheduledExecutor();
    this.autoCloseService.scheduleWithFixedDelay(
            () -> {
                try {
                    LOG.info("last checkpoint id: {}", lastCheckpointId);
                    // 保留到上一个 checkpoint 的 producer,即 successCheckpointId - 1 
作为最大的 transaction
                    FlinkKafkaInternalProducer flinkKafkaInternalProducer = 
null;
                    while ((flinkKafkaInternalProducer =
                                    (FlinkKafkaInternalProducer) 
this.producerCloseables.peek())
                            != null) {
                        String transactionId = 
flinkKafkaInternalProducer.getTransactionalId();
                        assert transactionId != null;
                        String[] transactionIdArr = transactionId.split("-");
                        long itemCheckpointId =
                                
Long.parseLong(transactionIdArr[transactionIdArr.length - 1]);
                        if (lastCheckpointId - 5 > itemCheckpointId) {
                            // 消费出来置空
                            try {
                                FlinkKafkaInternalProducer closeable =
                                        (FlinkKafkaInternalProducer)
                                                this.producerCloseables.poll();
                                closeable.close();
                                LOG.info(
                                        "close producer transaction id: {}",
                                        closeable.getTransactionalId());
                            } catch (Exception e) {
                                LOG.warn("fkip close error", e);
                            }
                        } else {
                            // 等待下次检查
                            break;
                        }
                    }
                } catch (Exception e) {
                    LOG.warn("schedule auto close producer error", e);
                }
            },
            60,
            60,
            TimeUnit.SECONDS);
} {code}


was (Author: ft20082):
add schedule close service in 
`org.apache.flink.connector.kafka.sink.KafkaWriter`,  reserve recently 5 
checkpoint producer.  it works.

need to wait for the Kafka transaction timeout.
{code:java}
    initFlinkMetrics();
    // 定时任务检查是否有需要关闭的 checkpoint id
    this.autoCloseService = Executors.newSingleThreadScheduledExecutor();
    this.autoCloseService.scheduleWithFixedDelay(
            () -> {
                try {
                    LOG.info("last checkpoint id: {}", lastCheckpointId);
                    // 保留到上一个 checkpoint 的 producer,即 successCheckpointId - 1 
作为最大的 transaction
                    FlinkKafkaInternalProducer flinkKafkaInternalProducer = 
null;
                    while ((flinkKafkaInternalProducer =
                                    (FlinkKafkaInternalProducer) 
this.producerCloseables.peek())
                            != null) {
                        String transactionId = 
flinkKafkaInternalProducer.getTransactionalId();
                        assert transactionId != null;
                        String[] transactionIdArr = transactionId.split("-");
                        long itemCheckpointId =
                                
Long.parseLong(transactionIdArr[transactionIdArr.length - 1]);
                        if (lastCheckpointId - 5 > itemCheckpointId) {
                            // 消费出来置空
                            try {
                                FlinkKafkaInternalProducer closeable =
                                        (FlinkKafkaInternalProducer)
                                                this.producerCloseables.poll();
                                closeable.close();
                                LOG.info(
                                        "close producer transaction id: {}",
                                        closeable.getTransactionalId());
                            } catch (Exception e) {
                                LOG.warn("fkip close error", e);
                            }
                        } else {
                            // 等待下次检查
                            break;
                        }
                    }
                } catch (Exception e) {
                    LOG.warn("schedule auto close producer error", e);
                }
            },
            60,
            60,
            TimeUnit.SECONDS);
} {code}

> flink kafka connector do not close kafka produer when it checkpoint success
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-36569
>                 URL: https://issues.apache.org/jira/browse/FLINK-36569
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.19.0, 1.20.0
>         Environment: flink: 1.20
> flink kafka connector: 3.3.0-1.20 
>            Reporter: Jake.zhang
>            Priority: Major
>         Attachments: image-2024-10-18-13-31-39-253.png, 
> image-2024-10-21-14-02-41-823.png
>
>
> flink kafka connector do't close FlinkKafkaInternalProducer when flink 
> checkpoint success in flink 1.20/1.19 .  it will create one  
> FlinkKafkaInternalProducer per checkpoint.
>  
> FlinkKafkaInternalProducer do not close automatic. so  kafka producer network 
> thread will more and more 
>  it create `getRecoveryProducer` each time,   `recyclable` object always 
> null, so  `recyclable.ifPresent(Recyclable::close)` not work.
> `org.apache.flink.connector.kafka.sink.KafkaCommitter`
> {code:java}
> producer =
> recyclable
> .<FlinkKafkaInternalProducer<?, ?>>map(Recyclable::getObject)
> .orElseGet(() -> getRecoveryProducer(committable));
> producer.commitTransaction();
> producer.flush();
> recyclable.ifPresent(Recyclable::close);{code}
>  
> !image-2024-10-21-14-02-41-823.png!
>  
> !image-2024-10-18-13-31-39-253.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to