I cherry-picked your commits on top of the main branch and build a local 
version. I can confirm that this issue is solved with this fix.


From: Arvid Heise <ahe...@confluent.io>
Date: Tuesday, 20 May 2025 at 11:17
To: Teunissen, F.G.J. (Fred) <fred.teunis...@ing.com>
Cc: dev@flink.apache.org <dev@flink.apache.org>
Subject: Re: Issue with Duplicate transactionalIdPrefix in Flink 2.0 Kafka Sinks

I created https://issues.apache.org/jira/browse/FLINK-37818 and provided a fix. 
I will Look for a reviewer to get it in asap.

Best

Arvid

On Tue, May 20, 2025, 10:46 Teunissen, F.G.J. (Fred) 
<fred.teunis...@ing.com<mailto:fred.teunis...@ing.com>> wrote:
Hi Arvid,

Our application picks up a bunch of DDL/DML SQL statements defined by end-users 
and uses the Table-Api to create a Flink Job.​​ So setting unique prefixes per 
sink is a bit tricky or we should tell all end-users to create unique output 
table DDL statements (I would rather avoid that).
But we can build a local version of the Kafka Connector and use that in our 
application. If you can add a fix we can pick this up (and test this already 😊 
).

Kind regards,
Fred

From: Arvid Heise <ahe...@confluent.io<mailto:ahe...@confluent.io>>
Date: Tuesday, 20 May 2025 at 09:12
To: Teunissen, F.G.J. (Fred) 
<fred.teunis...@ing.com<mailto:fred.teunis...@ing.com>>
Cc: dev@flink.apache.org<mailto:dev@flink.apache.org> 
<dev@flink.apache.org<mailto:dev@flink.apache.org>>
Subject: Re: Issue with Duplicate transactionalIdPrefix in Flink 2.0 Kafka Sinks

Hi Fred,

ah yes, I think I understand the issue. The KafkaSink always creates a 
KafkaCommitter even if you are not using EXACTLY_ONCE. It's an unfortunate 
limitation of our Sink design.
When I implemented the change, I was assuming that you are running EOS if there 
is a committer (because else its a no-op).

I will add a fix but I will not be able to release a bugfix release in a timely 
manner as I'm traveling.
In the meantime, could you set unique prefixes per sink? The Table API option 
is called TRANSACTIONAL_ID_PREFIX.

Best,

Arvid

On Mon, May 19, 2025 at 8:02 PM Teunissen, F.G.J. (Fred) 
<fred.teunis...@ing.com<mailto:fred.teunis...@ing.com>> wrote:
Hi David,

​The following​ stack trace is generated on the jobmanager:
2025-05-19 17:54:34,474 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
inputTbl[1] -> (Calc[2] -> ConstraintEnforcer[3] -> outputTbl[3]: Writer -> 
outputTbl[3]: Committer, Calc[4] -> ConstraintEnforcer[5] -> outputTbl[5]: 
Writer -> outputTbl[5]: Committer) (1/1) 
(eba27e06694485745d077382c2019b60_e3dfc0d7e9ecd8a43f85f0b68ebf3b80_0_0) 
switched from INITIALIZING to FAILED on 172.19.0.6:46339-4464a6 @ 
transactional-id-taskmanager-1.transactional-id_issue-transactional-id 
(dataPort=35465).
java.lang.IllegalStateException: Found duplicate transactionalIdPrefix for 
multiple Kafka sinks: null. Transactional id prefixes need to be unique. You 
may experience memory leaks without fixing this.
at 
org.apache.flink.connector.kafka.sink.internal.BackchannelFactory.getBackchannel(BackchannelFactory.java:113)
 ~[flink-sql-connector-kafka-4.0.0-2.0.jar:4.0.0-2.0]
at 
org.apache.flink.connector.kafka.sink.internal.BackchannelFactory.getWritableBackchannel(BackchannelFactory.java:93)
 ~[flink-sql-connector-kafka-4.0.0-2.0.jar:4.0.0-2.0]
at 
org.apache.flink.connector.kafka.sink.KafkaCommitter.<init>(KafkaCommitter.java:76)
 ~[flink-sql-connector-kafka-4.0.0-2.0.jar:4.0.0-2.0]
at 
org.apache.flink.connector.kafka.sink.KafkaSink.createCommitter(KafkaSink.java:117)
 ~[flink-sql-connector-kafka-4.0.0-2.0.jar:4.0.0-2.0]
at 
org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory.lambda$createStreamOperator$0(CommitterOperatorFactory.java:66)
 ~[flink-dist-2.0.0.jar:2.0.0]
at 
org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.initializeState(CommitterOperator.java:127)
 ~[flink-dist-2.0.0.jar:2.0.0]
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:142)
 ~[flink-dist-2.0.0.jar:2.0.0]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:304)
 ~[flink-dist-2.0.0.jar:2.0.0]
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
 ~[flink-dist-2.0.0.jar:2.0.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:857)
 ~[flink-dist-2.0.0.jar:2.0.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:811)
 ~[flink-dist-2.0.0.jar:2.0.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
 ~[flink-dist-2.0.0.jar:2.0.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:811)
 ~[flink-dist-2.0.0.jar:2.0.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:770)
 ~[flink-dist-2.0.0.jar:2.0.0]
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963)
 ~[flink-dist-2.0.0.jar:2.0.0]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) 
~[flink-dist-2.0.0.jar:2.0.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756) 
~[flink-dist-2.0.0.jar:2.0.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568) 
~[flink-dist-2.0.0.jar:2.0.0]
at java.base/java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.lang.IllegalStateException: Writable backchannel already exists.
at 
org.apache.flink.connector.kafka.sink.internal.BackchannelImpl.createWritableBackchannel(BackchannelImpl.java:96)
 ~[flink-sql-connector-kafka-4.0.0-2.0.jar:4.0.0-2.0]
at 
org.apache.flink.connector.kafka.sink.internal.BackchannelFactory.getBackchannel(BackchannelFactory.java:110)
 ~[flink-sql-connector-kafka-4.0.0-2.0.jar:4.0.0-2.0]
... 18 more

Kind regards, Fred

From: David Radley <david_rad...@uk.ibm.com<mailto:david_rad...@uk.ibm.com>>
Date: Monday, 19 May 2025 at 19:47
To: dev@flink.apache.org<mailto:dev@flink.apache.org> 
<dev@flink.apache.org<mailto:dev@flink.apache.org>>
Cc: ar...@apache.org<mailto:ar...@apache.org> 
<ar...@apache.org<mailto:ar...@apache.org>>
Subject: Re: Issue with Duplicate transactionalIdPrefix in Flink 2.0 Kafka Sinks

Hi Fred,
I see. It looks like this check was added in 
https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-37282&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508648107%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=41%2F7AgJff9nrLmPwZsu%2FVbbjTsGdya2erRbS5UPlbuU%3D&reserved=0<https://issues.apache.org/jira/browse/FLINK-37282>

Do you have a stack trace for the exception – this could show how you got to 
this code in the at least once case?

By copy Arvid: any thoughts on this?

   Kind regards, David.

From: Teunissen, F.G.J. (Fred) <fred.teunis...@ing.com.INVALID>
Date: Monday, 19 May 2025 at 17:33
To: dev@flink.apache.org<mailto:dev@flink.apache.org> 
<dev@flink.apache.org<mailto:dev@flink.apache.org>>
Subject: [EXTERNAL] Re: Issue with Duplicate transactionalIdPrefix in Flink 2.0 
Kafka Sinks
Hi David,

Depending on the flink version we use a different Kafka connector.

  *
flink:2.0.0 -> flink-connector-kafka:4.0.0-2.0
  *
flink:1.20.1 -> flink-connector-kafka:3.4.0-1.20
  *
flink:1.19.2 -> flink-connector-kafka:3.3.0-1.19

We are using the default for delivery-guarantee (at-least-once), so according 
to the docs, the transactionalIdPrefix should not be required.

kind regards,
Fred

From: David Radley <david_rad...@uk.ibm.com<mailto:david_rad...@uk.ibm.com>>
Date: Monday, 19 May 2025 at 17:57
To: dev@flink.apache.org<mailto:dev@flink.apache.org> 
<dev@flink.apache.org<mailto:dev@flink.apache.org>>
Subject: Re: Issue with Duplicate transactionalIdPrefix in Flink 2.0 Kafka Sinks

Hi,
I had a quick look at this. What version of the Flink Kafka connector are you 
running?
I looked through recent commits in the Kafka connector and see
https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fcommit%2F7c112abe8bf78e0cd8a310aaa65b57f6a70ad30a&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508674283%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=E7JXoE0xpvaoPQe3jscBm3Fr9nY%2FKRm0Bl4rHvJKJhc%3D&reserved=0<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fcommit%2F7c112abe8bf78e0cd8a310aaa65b57f6a70ad30a&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508692036%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=7L0pMpt%2FW6Wra1AuU%2BD8qt012h%2BPXBhROCGL8gu%2B8iE%3D&reserved=0><https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fcommit%2F7c112abe8bf78e0cd8a310aaa65b57f6a70ad30a%253chttps%3A%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fcommit%2F7c112abe8bf78e0cd8a310aaa65b57f6a70ad30a%253e&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508713548%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=9AXt2GUtVEy2eF8LbfBmT1s4uPCrLhE0FSJpXX%2FKTOo%3D&reserved=0><https://github.com/apache/flink-connector-kafka/commit/7c112abe8bf78e0cd8a310aaa65b57f6a70ad30a>

for PR 
https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fpull%2F156&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508729669%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=2hSeSKm4b55O8uR4q6TQ4J0fW%2FDCGVRnMRbm3i3kzSg%3D&reserved=0<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fpull%2F156&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508744778%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=olRS6e3SuEInnpDYu2%2BRVBe4maTtz1vvOBdLhc2QjNI%3D&reserved=0><https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fpull%2F156%253chttps%3A%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fpull%2F156%253e&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508761138%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=el%2B8%2BbCFniRa5ZRiorUKfxykoQYgMIueMFKElhh21bA%3D&reserved=0><https://github.com/apache/flink-connector-kafka/pull/156>

This looks like a change in behaviour relating to policing the Duplicate 
transactionalIdPrefix for sinks,

   Kind regards, David.

From: Teunissen, F.G.J. (Fred) <fred.teunis...@ing.com.INVALID>
Date: Monday, 19 May 2025 at 13:45
To: dev@flink.apache.org<mailto:dev@flink.apache.org> 
<dev@flink.apache.org<mailto:dev@flink.apache.org>>
Subject: [EXTERNAL] Issue with Duplicate transactionalIdPrefix in Flink 2.0 
Kafka Sinks
Hi everyone,

I'm encountering an issue with Flink 2.0 when using the Table API. In previous 
versions (1.19/1.20), I was able to create a Flink job with the following setup:

  *
One Kafka topic-based input table
  *
One Kafka topic-based output table
  *
One statement set with two insert statements, both reading from the input and 
writing to the output table

However, with Flink 2.0, the JobManager generates the following error:
java.lang.IllegalStateException: Found duplicate transactionalIdPrefix for 
multiple Kafka sinks: null. Transactional id prefixes need to be unique. You 
may experience memory leaks without fixing this.

Is this expected behavior in Flink 2.0, or should it still be possible to use 
the same setup without encountering this error?

You can find a reproducible example of this issue on 
GitHub<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2FFredTing%2Ftransactional-id-issue&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508777443%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=%2FfVyw934Km3q3iUSbBeawEBwlVAK57vmC6%2FLB6xvJss%3D&reserved=0<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2FFredTing%2Ftransactional-id-issue&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508795211%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=oT6xHnEKwk2V7Ib0CIclRuhp3fQlDDmOPPQFhJdvrEI%3D&reserved=0<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2FFredTing%2Ftransactional-id-issue%253chttps%3A%2Fgithub.com%2FFredTing%2Ftransactional-id-issue&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508811495%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=PIY64sclw%2FQZffbqbWxttOBJvUxxLMxHuzfTTM0NPwg%3D&reserved=0<https://github.com/FredTing/transactional-id-issue>>>>.

To reproduce the error, you can run the bash script ./build-and-start-job.sh​. 
It will compile the job and start a Docker environment running the job. It also 
opens up the Flink dashboard, where you can find the failing job (after a few 
seconds).

Additionally, when you use 1.20​ or 1.19​ as an argument to the script, the job 
runs successfully.

Any insights or suggestions would be greatly appreciated!

Best regards,
Fred Teunissen

-----------------------------------------------------------------
ATTENTION:
The information in this e-mail is confidential and only meant for the intended 
recipient. If you are not the intended recipient, don't use or disclose it in 
any way. Please let the sender know and delete the message immediately.
-----------------------------------------------------------------

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: Building C, IBM Hursley Office, Hursley Park Road, 
Winchester, Hampshire SO21 2JN

-----------------------------------------------------------------
ATTENTION:
The information in this e-mail is confidential and only meant for the intended 
recipient. If you are not the intended recipient, don't use or disclose it in 
any way. Please let the sender know and delete the message immediately.
-----------------------------------------------------------------

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: Building C, IBM Hursley Office, Hursley Park Road, 
Winchester, Hampshire SO21 2JN

-----------------------------------------------------------------
ATTENTION:
The information in this e-mail is confidential and only meant for the intended 
recipient. If you are not the intended recipient, don't use or disclose it in 
any way. Please let the sender know and delete the message immediately.
-----------------------------------------------------------------

-----------------------------------------------------------------
ATTENTION:
The information in this e-mail is confidential and only meant for the intended 
recipient. If you are not the intended recipient, don't use or disclose it in 
any way. Please let the sender know and delete the message immediately.
-----------------------------------------------------------------

-----------------------------------------------------------------
ATTENTION:
The information in this e-mail is confidential and only meant for the intended 
recipient. If you are not the intended recipient, don't use or disclose it in 
any way. Please let the sender know and delete the message immediately.
-----------------------------------------------------------------

Reply via email to