[ https://issues.apache.org/jira/browse/FLINK-25920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17931359#comment-17931359 ]
Hongshun Wang edited comment on FLINK-25920 at 2/28/25 3:16 AM: ---------------------------------------------------------------- [~arvid] Hi, I have met this in kafka connector CI not only once which show "Detected new producer leaks:" And then I find this error: {code:java} 2025-02-27T13:13:35.3229288Z 13:13:25,192 [Sink: Writer (4/4)#0] INFO org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter [] - Aborting lingering transactions from previous execution. Recovered states: [KafkaWriterState{, transactionalIdPrefix='firstPrefix'}]. 2025-02-27T13:13:35.3231729Z 13:13:25,192 [Sink: Writer (4/4)#0] WARN org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter [] - Transactional id prefix from previous execution firstPrefix has changed to newPrefix. 2025-02-27T13:13:35.3235715Z 13:13:25,198 [Sink: Writer (1/4)#0] INFO org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer [] - Flushing new partitions 2025-02-27T13:13:35.3238141Z 13:13:25,203 [Sink: Committer (1/4)#0] WARN org.apache.flink.runtime.taskmanager.Task [] - Sink: Committer (1/4)#0 (596deffdee1f58ad0980f46ab402aecd_ec8a2487ab44c5a0231db1309b835b9f_0_0) switched from RUNNING to FAILED with failure cause: 2025-02-27T13:13:35.3240466Z java.lang.UnsupportedOperationException: Currently it is not supported to update the CommittableSummary for a checkpoint coming from the same subtask. Please check the status of FLINK-25920 2025-02-27T13:13:35.3243123Z at org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.upsertSummary(CheckpointCommittableManagerImpl.java:91) ~[flink-streaming-java-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3246713Z at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addSummary(CommittableCollector.java:245) ~[flink-streaming-java-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3249097Z at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:137) ~[flink-streaming-java-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3251330Z at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.processElement(CommitterOperator.java:207) ~[flink-streaming-java-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3253495Z at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238) ~[flink-streaming-java-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3255947Z at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157) ~[flink-streaming-java-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3258339Z at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114) ~[flink-streaming-java-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3260507Z at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-streaming-java-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3262067Z at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638) ~[flink-streaming-java-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3263723Z at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-streaming-java-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3265540Z at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973) ~[flink-streaming-java-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3267061Z at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) ~[flink-streaming-java-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3268436Z at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) ~[flink-runtime-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3269848Z at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) [flink-runtime-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3271106Z at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) [flink-runtime-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3272277Z at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-runtime-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3273103Z at java.lang.Thread.run(Thread.java:840) [?:?] {code} More detail see: [https://github.com/apache/flink-connector-kafka/actions/runs/13567073626/job/37922471192] Maybe we should flink version bump to 1.20.1? was (Author: JIRAUSER298968): [~arvid] Hi, I have met this in kafka connector CI not only once which show "Detected new producer leaks:" And then I find this error: {code:java} 2025-02-27T13:13:35.3229288Z 13:13:25,192 [Sink: Writer (4/4)#0] INFO org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter [] - Aborting lingering transactions from previous execution. Recovered states: [KafkaWriterState{, transactionalIdPrefix='firstPrefix'}]. 2025-02-27T13:13:35.3231729Z 13:13:25,192 [Sink: Writer (4/4)#0] WARN org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter [] - Transactional id prefix from previous execution firstPrefix has changed to newPrefix. 2025-02-27T13:13:35.3235715Z 13:13:25,198 [Sink: Writer (1/4)#0] INFO org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer [] - Flushing new partitions 2025-02-27T13:13:35.3238141Z 13:13:25,203 [Sink: Committer (1/4)#0] WARN org.apache.flink.runtime.taskmanager.Task [] - Sink: Committer (1/4)#0 (596deffdee1f58ad0980f46ab402aecd_ec8a2487ab44c5a0231db1309b835b9f_0_0) switched from RUNNING to FAILED with failure cause: 2025-02-27T13:13:35.3240466Z java.lang.UnsupportedOperationException: Currently it is not supported to update the CommittableSummary for a checkpoint coming from the same subtask. Please check the status of FLINK-25920 2025-02-27T13:13:35.3243123Z at org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.upsertSummary(CheckpointCommittableManagerImpl.java:91) ~[flink-streaming-java-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3246713Z at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addSummary(CommittableCollector.java:245) ~[flink-streaming-java-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3249097Z at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:137) ~[flink-streaming-java-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3251330Z at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.processElement(CommitterOperator.java:207) ~[flink-streaming-java-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3253495Z at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238) ~[flink-streaming-java-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3255947Z at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157) ~[flink-streaming-java-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3258339Z at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114) ~[flink-streaming-java-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3260507Z at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-streaming-java-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3262067Z at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638) ~[flink-streaming-java-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3263723Z at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-streaming-java-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3265540Z at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973) ~[flink-streaming-java-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3267061Z at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) ~[flink-streaming-java-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3268436Z at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) ~[flink-runtime-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3269848Z at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) [flink-runtime-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3271106Z at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) [flink-runtime-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3272277Z at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-runtime-1.20.0.jar:1.20.0] 2025-02-27T13:13:35.3273103Z at java.lang.Thread.run(Thread.java:840) [?:?] {code} More detail see: [https://github.com/apache/flink-connector-kafka/actions/runs/13567073626/job/37922471192] > Allow receiving updates of CommittableSummary > --------------------------------------------- > > Key: FLINK-25920 > URL: https://issues.apache.org/jira/browse/FLINK-25920 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream, Connectors / Common > Affects Versions: 1.15.0, 1.16.0 > Reporter: Fabian Paul > Assignee: Arvid Heise > Priority: Major > Labels: pull-request-available > Fix For: 1.19.2, 1.20.1, 2.0-preview > > Attachments: trace.zip, trace_again.zip > > > In the case of unaligned checkpoints, it might happen that the checkpoint > barrier overtakes the records and an empty committable summary is emitted > that needs to be correct at a later point when the records arrive. -- This message was sent by Atlassian Jira (v8.20.10#820010)