[ 
https://issues.apache.org/jira/browse/FLINK-25920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17931359#comment-17931359
 ] 

Hongshun Wang edited comment on FLINK-25920 at 2/28/25 3:16 AM:
----------------------------------------------------------------

[~arvid] Hi, I have met this in kafka connector CI not only once which show 
"Detected new producer leaks:" And then I find this error:
{code:java}
2025-02-27T13:13:35.3229288Z 13:13:25,192 [Sink: Writer (4/4)#0] INFO  
org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter [] - Aborting 
lingering transactions from previous execution. Recovered states: 
[KafkaWriterState{, transactionalIdPrefix='firstPrefix'}].
2025-02-27T13:13:35.3231729Z 13:13:25,192 [Sink: Writer (4/4)#0] WARN  
org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter [] - Transactional 
id prefix from previous execution firstPrefix has changed to newPrefix.

 2025-02-27T13:13:35.3235715Z 13:13:25,198 [Sink: Writer (1/4)#0] INFO  
org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer [] - 
Flushing new partitions
2025-02-27T13:13:35.3238141Z 13:13:25,203 [Sink: Committer (1/4)#0] WARN  
org.apache.flink.runtime.taskmanager.Task                    [] - Sink: 
Committer (1/4)#0 
(596deffdee1f58ad0980f46ab402aecd_ec8a2487ab44c5a0231db1309b835b9f_0_0) 
switched from RUNNING to FAILED with failure cause:
2025-02-27T13:13:35.3240466Z java.lang.UnsupportedOperationException: Currently 
it is not supported to update the CommittableSummary for a checkpoint coming 
from the same subtask. Please check the status of FLINK-25920
2025-02-27T13:13:35.3243123Z     at 
org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.upsertSummary(CheckpointCommittableManagerImpl.java:91)
 ~[flink-streaming-java-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3246713Z     at 
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addSummary(CommittableCollector.java:245)
 ~[flink-streaming-java-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3249097Z     at 
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:137)
 ~[flink-streaming-java-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3251330Z     at 
org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.processElement(CommitterOperator.java:207)
 ~[flink-streaming-java-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3253495Z     at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
 ~[flink-streaming-java-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3255947Z     at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
 ~[flink-streaming-java-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3258339Z     at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
 ~[flink-streaming-java-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3260507Z     at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-streaming-java-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3262067Z     at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
 ~[flink-streaming-java-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3263723Z     at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
 ~[flink-streaming-java-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3265540Z     at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
 ~[flink-streaming-java-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3267061Z     at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) 
~[flink-streaming-java-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3268436Z     at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
 ~[flink-runtime-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3269848Z     at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) 
[flink-runtime-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3271106Z     at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) 
[flink-runtime-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3272277Z     at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
[flink-runtime-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3273103Z     at java.lang.Thread.run(Thread.java:840) [?:?] 
{code}
More detail see: 

[https://github.com/apache/flink-connector-kafka/actions/runs/13567073626/job/37922471192]

 

Maybe we should flink version bump to 1.20.1?


was (Author: JIRAUSER298968):
[~arvid] Hi, I have met this in kafka connector CI not only once which show 
"Detected new producer leaks:" And then I find this error:
{code:java}
2025-02-27T13:13:35.3229288Z 13:13:25,192 [Sink: Writer (4/4)#0] INFO  
org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter [] - Aborting 
lingering transactions from previous execution. Recovered states: 
[KafkaWriterState{, transactionalIdPrefix='firstPrefix'}].
2025-02-27T13:13:35.3231729Z 13:13:25,192 [Sink: Writer (4/4)#0] WARN  
org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter [] - Transactional 
id prefix from previous execution firstPrefix has changed to newPrefix.

 2025-02-27T13:13:35.3235715Z 13:13:25,198 [Sink: Writer (1/4)#0] INFO  
org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer [] - 
Flushing new partitions
2025-02-27T13:13:35.3238141Z 13:13:25,203 [Sink: Committer (1/4)#0] WARN  
org.apache.flink.runtime.taskmanager.Task                    [] - Sink: 
Committer (1/4)#0 
(596deffdee1f58ad0980f46ab402aecd_ec8a2487ab44c5a0231db1309b835b9f_0_0) 
switched from RUNNING to FAILED with failure cause:
2025-02-27T13:13:35.3240466Z java.lang.UnsupportedOperationException: Currently 
it is not supported to update the CommittableSummary for a checkpoint coming 
from the same subtask. Please check the status of FLINK-25920
2025-02-27T13:13:35.3243123Z     at 
org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.upsertSummary(CheckpointCommittableManagerImpl.java:91)
 ~[flink-streaming-java-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3246713Z     at 
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addSummary(CommittableCollector.java:245)
 ~[flink-streaming-java-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3249097Z     at 
org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:137)
 ~[flink-streaming-java-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3251330Z     at 
org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.processElement(CommitterOperator.java:207)
 ~[flink-streaming-java-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3253495Z     at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
 ~[flink-streaming-java-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3255947Z     at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
 ~[flink-streaming-java-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3258339Z     at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
 ~[flink-streaming-java-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3260507Z     at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-streaming-java-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3262067Z     at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
 ~[flink-streaming-java-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3263723Z     at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
 ~[flink-streaming-java-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3265540Z     at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
 ~[flink-streaming-java-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3267061Z     at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) 
~[flink-streaming-java-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3268436Z     at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
 ~[flink-runtime-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3269848Z     at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) 
[flink-runtime-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3271106Z     at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) 
[flink-runtime-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3272277Z     at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
[flink-runtime-1.20.0.jar:1.20.0]
2025-02-27T13:13:35.3273103Z     at java.lang.Thread.run(Thread.java:840) [?:?] 
{code}
More detail see: 

[https://github.com/apache/flink-connector-kafka/actions/runs/13567073626/job/37922471192]

> Allow receiving updates of CommittableSummary
> ---------------------------------------------
>
>                 Key: FLINK-25920
>                 URL: https://issues.apache.org/jira/browse/FLINK-25920
>             Project: Flink
>          Issue Type: Sub-task
>          Components: API / DataStream, Connectors / Common
>    Affects Versions: 1.15.0, 1.16.0
>            Reporter: Fabian Paul
>            Assignee: Arvid Heise
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.19.2, 1.20.1, 2.0-preview
>
>         Attachments: trace.zip, trace_again.zip
>
>
> In the case of unaligned checkpoints, it might happen that the checkpoint 
> barrier overtakes the records and an empty committable summary is emitted 
> that needs to be correct at a later point when the records arrive.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to