Hi Fred, ah yes, I think I understand the issue. The KafkaSink always creates a KafkaCommitter even if you are not using EXACTLY_ONCE. It's an unfortunate limitation of our Sink design. When I implemented the change, I was assuming that you are running EOS if there is a committer (because else its a no-op).
I will add a fix but I will not be able to release a bugfix release in a timely manner as I'm traveling. In the meantime, could you set unique prefixes per sink? The Table API option is called TRANSACTIONAL_ID_PREFIX. Best, Arvid On Mon, May 19, 2025 at 8:02 PM Teunissen, F.G.J. (Fred) < fred.teunis...@ing.com> wrote: > Hi David, > > The following stack trace is generated on the jobmanager: > > 2025-05-19 17:54:34,474 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > inputTbl[1] -> (Calc[2] -> ConstraintEnforcer[3] -> outputTbl[3]: Writer -> > outputTbl[3]: Committer, Calc[4] -> ConstraintEnforcer[5] -> outputTbl[5]: > Writer -> outputTbl[5]: Committer) (1/1) > (eba27e06694485745d077382c2019b60_e3dfc0d7e9ecd8a43f85f0b68ebf3b80_0_0) > switched from INITIALIZING to FAILED on 172.19.0.6:46339-4464a6 @ > transactional-id-taskmanager-1.transactional-id_issue-transactional-id > (dataPort=35465). > java.lang.IllegalStateException: Found duplicate transactionalIdPrefix for > multiple Kafka sinks: null. Transactional id prefixes need to be unique. > You may experience memory leaks without fixing this. > at > org.apache.flink.connector.kafka.sink.internal.BackchannelFactory.getBackchannel(BackchannelFactory.java:113) > ~[flink-sql-connector-kafka-4.0.0-2.0.jar:4.0.0-2.0] > at > org.apache.flink.connector.kafka.sink.internal.BackchannelFactory.getWritableBackchannel(BackchannelFactory.java:93) > ~[flink-sql-connector-kafka-4.0.0-2.0.jar:4.0.0-2.0] > at > org.apache.flink.connector.kafka.sink.KafkaCommitter.<init>(KafkaCommitter.java:76) > ~[flink-sql-connector-kafka-4.0.0-2.0.jar:4.0.0-2.0] > at > org.apache.flink.connector.kafka.sink.KafkaSink.createCommitter(KafkaSink.java:117) > ~[flink-sql-connector-kafka-4.0.0-2.0.jar:4.0.0-2.0] > at > org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory.lambda$createStreamOperator$0(CommitterOperatorFactory.java:66) > ~[flink-dist-2.0.0.jar:2.0.0] > at > org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.initializeState(CommitterOperator.java:127) > ~[flink-dist-2.0.0.jar:2.0.0] > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:142) > ~[flink-dist-2.0.0.jar:2.0.0] > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:304) > ~[flink-dist-2.0.0.jar:2.0.0] > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > ~[flink-dist-2.0.0.jar:2.0.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:857) > ~[flink-dist-2.0.0.jar:2.0.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:811) > ~[flink-dist-2.0.0.jar:2.0.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > ~[flink-dist-2.0.0.jar:2.0.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:811) > ~[flink-dist-2.0.0.jar:2.0.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:770) > ~[flink-dist-2.0.0.jar:2.0.0] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963) > ~[flink-dist-2.0.0.jar:2.0.0] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) > ~[flink-dist-2.0.0.jar:2.0.0] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756) > ~[flink-dist-2.0.0.jar:2.0.0] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568) > ~[flink-dist-2.0.0.jar:2.0.0] > at java.base/java.lang.Thread.run(Unknown Source) ~[?:?] > Caused by: java.lang.IllegalStateException: Writable backchannel already > exists. > at > org.apache.flink.connector.kafka.sink.internal.BackchannelImpl.createWritableBackchannel(BackchannelImpl.java:96) > ~[flink-sql-connector-kafka-4.0.0-2.0.jar:4.0.0-2.0] > at > org.apache.flink.connector.kafka.sink.internal.BackchannelFactory.getBackchannel(BackchannelFactory.java:110) > ~[flink-sql-connector-kafka-4.0.0-2.0.jar:4.0.0-2.0] > ... 18 more > > > Kind regards, Fred > > *From: *David Radley <david_rad...@uk.ibm.com> > *Date: *Monday, 19 May 2025 at 19:47 > *To: *dev@flink.apache.org <dev@flink.apache.org> > *Cc: *ar...@apache.org <ar...@apache.org> > *Subject: *Re: Issue with Duplicate transactionalIdPrefix in Flink 2.0 > Kafka Sinks > > Hi Fred, > I see. It looks like this check was added in > https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-37282&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508648107%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=41%2F7AgJff9nrLmPwZsu%2FVbbjTsGdya2erRbS5UPlbuU%3D&reserved=0 > <https://issues.apache.org/jira/browse/FLINK-37282> > > Do you have a stack trace for the exception – this could show how you got > to this code in the at least once case? > > By copy Arvid: any thoughts on this? > > Kind regards, David. > > From: Teunissen, F.G.J. (Fred) <fred.teunis...@ing.com.INVALID> > Date: Monday, 19 May 2025 at 17:33 > To: dev@flink.apache.org <dev@flink.apache.org> > Subject: [EXTERNAL] Re: Issue with Duplicate transactionalIdPrefix in > Flink 2.0 Kafka Sinks > Hi David, > > Depending on the flink version we use a different Kafka connector. > > * > flink:2.0.0 -> flink-connector-kafka:4.0.0-2.0 > * > flink:1.20.1 -> flink-connector-kafka:3.4.0-1.20 > * > flink:1.19.2 -> flink-connector-kafka:3.3.0-1.19 > > We are using the default for delivery-guarantee (at-least-once), so > according to the docs, the transactionalIdPrefix should not be required. > > kind regards, > Fred > > From: David Radley <david_rad...@uk.ibm.com> > Date: Monday, 19 May 2025 at 17:57 > To: dev@flink.apache.org <dev@flink.apache.org> > Subject: Re: Issue with Duplicate transactionalIdPrefix in Flink 2.0 Kafka > Sinks > > Hi, > I had a quick look at this. What version of the Flink Kafka connector are > you running? > I looked through recent commits in the Kafka connector and see > > https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fcommit%2F7c112abe8bf78e0cd8a310aaa65b57f6a70ad30a&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508674283%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=E7JXoE0xpvaoPQe3jscBm3Fr9nY%2FKRm0Bl4rHvJKJhc%3D&reserved=0<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fcommit%2F7c112abe8bf78e0cd8a310aaa65b57f6a70ad30a&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508692036%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=7L0pMpt%2FW6Wra1AuU%2BD8qt012h%2BPXBhROCGL8gu%2B8iE%3D&reserved=0><https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fcommit%2F7c112abe8bf78e0cd8a310aaa65b57f6a70ad30a%253chttps%3A%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fcommit%2F7c112abe8bf78e0cd8a310aaa65b57f6a70ad30a%253e&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508713548%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=9AXt2GUtVEy2eF8LbfBmT1s4uPCrLhE0FSJpXX%2FKTOo%3D&reserved=0> > <https://github.com/apache/flink-connector-kafka/commit/7c112abe8bf78e0cd8a310aaa65b57f6a70ad30a> > > for PR > https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fpull%2F156&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508729669%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=2hSeSKm4b55O8uR4q6TQ4J0fW%2FDCGVRnMRbm3i3kzSg%3D&reserved=0<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fpull%2F156&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508744778%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=olRS6e3SuEInnpDYu2%2BRVBe4maTtz1vvOBdLhc2QjNI%3D&reserved=0><https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fpull%2F156%253chttps%3A%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fpull%2F156%253e&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508761138%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=el%2B8%2BbCFniRa5ZRiorUKfxykoQYgMIueMFKElhh21bA%3D&reserved=0> > <https://github.com/apache/flink-connector-kafka/pull/156> > > This looks like a change in behaviour relating to policing the Duplicate > transactionalIdPrefix for sinks, > > Kind regards, David. > > From: Teunissen, F.G.J. (Fred) <fred.teunis...@ing.com.INVALID> > Date: Monday, 19 May 2025 at 13:45 > To: dev@flink.apache.org <dev@flink.apache.org> > Subject: [EXTERNAL] Issue with Duplicate transactionalIdPrefix in Flink > 2.0 Kafka Sinks > Hi everyone, > > I'm encountering an issue with Flink 2.0 when using the Table API. In > previous versions (1.19/1.20), I was able to create a Flink job with the > following setup: > > * > One Kafka topic-based input table > * > One Kafka topic-based output table > * > One statement set with two insert statements, both reading from the input > and writing to the output table > > However, with Flink 2.0, the JobManager generates the following error: > java.lang.IllegalStateException: Found duplicate transactionalIdPrefix for > multiple Kafka sinks: null. Transactional id prefixes need to be unique. > You may experience memory leaks without fixing this. > > Is this expected behavior in Flink 2.0, or should it still be possible to > use the same setup without encountering this error? > > You can find a reproducible example of this issue on GitHub< > https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2FFredTing%2Ftransactional-id-issue&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508777443%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=%2FfVyw934Km3q3iUSbBeawEBwlVAK57vmC6%2FLB6xvJss%3D&reserved=0<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2FFredTing%2Ftransactional-id-issue&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508795211%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=oT6xHnEKwk2V7Ib0CIclRuhp3fQlDDmOPPQFhJdvrEI%3D&reserved=0<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2FFredTing%2Ftransactional-id-issue%253chttps%3A%2Fgithub.com%2FFredTing%2Ftransactional-id-issue&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508811495%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=PIY64sclw%2FQZffbqbWxttOBJvUxxLMxHuzfTTM0NPwg%3D&reserved=0 > <https://github.com/FredTing/transactional-id-issue>>>>. > > To reproduce the error, you can run the bash script > ./build-and-start-job.sh. It will compile the job and start a Docker > environment running the job. It also opens up the Flink dashboard, where > you can find the failing job (after a few seconds). > > Additionally, when you use 1.20 or 1.19 as an argument to the script, > the job runs successfully. > > Any insights or suggestions would be greatly appreciated! > > Best regards, > Fred Teunissen > > ----------------------------------------------------------------- > ATTENTION: > The information in this e-mail is confidential and only meant for the > intended recipient. If you are not the intended recipient, don't use or > disclose it in any way. Please let the sender know and delete the message > immediately. > ----------------------------------------------------------------- > > Unless otherwise stated above: > > IBM United Kingdom Limited > Registered in England and Wales with number 741598 > Registered office: Building C, IBM Hursley Office, Hursley Park Road, > Winchester, Hampshire SO21 2JN > > ----------------------------------------------------------------- > ATTENTION: > The information in this e-mail is confidential and only meant for the > intended recipient. If you are not the intended recipient, don't use or > disclose it in any way. Please let the sender know and delete the message > immediately. > ----------------------------------------------------------------- > > Unless otherwise stated above: > > IBM United Kingdom Limited > Registered in England and Wales with number 741598 > Registered office: Building C, IBM Hursley Office, Hursley Park Road, > Winchester, Hampshire SO21 2JN > > ----------------------------------------------------------------- > ATTENTION: > The information in this e-mail is confidential and only meant for the > intended recipient. If you are not the intended recipient, don't use or > disclose it in any way. Please let the sender know and delete the message > immediately. > ----------------------------------------------------------------- > >