I created https://issues.apache.org/jira/browse/FLINK-37818 and provided a
fix. I will Look for a reviewer to get it in asap.

Best

Arvid

On Tue, May 20, 2025, 10:46 Teunissen, F.G.J. (Fred) <fred.teunis...@ing.com>
wrote:

> Hi Arvid,
>
> Our application picks up a bunch of DDL/DML SQL statements defined by
> end-users and uses the Table-Api to create a Flink Job.​​ So setting unique
> prefixes per sink is a bit tricky or we should tell all end-users to create
> unique output table DDL statements (I would rather avoid that).
> But we can build a local version of the Kafka Connector and use that in
> our application. If you can add a fix we can pick this up (and test this
> already 😊 ).
>
> Kind regards,
> Fred
>
> *From: *Arvid Heise <ahe...@confluent.io>
> *Date: *Tuesday, 20 May 2025 at 09:12
> *To: *Teunissen, F.G.J. (Fred) <fred.teunis...@ing.com>
> *Cc: *dev@flink.apache.org <dev@flink.apache.org>
> *Subject: *Re: Issue with Duplicate transactionalIdPrefix in Flink 2.0
> Kafka Sinks
>
> Hi Fred,
>
> ah yes, I think I understand the issue. The KafkaSink always creates a
> KafkaCommitter even if you are not using EXACTLY_ONCE. It's an unfortunate
> limitation of our Sink design.
> When I implemented the change, I was assuming that you are running EOS if
> there is a committer (because else its a no-op).
>
> I will add a fix but I will not be able to release a bugfix release in a
> timely manner as I'm traveling.
> In the meantime, could you set unique prefixes per sink? The Table API
> option is called TRANSACTIONAL_ID_PREFIX.
>
> Best,
>
> Arvid
>
> On Mon, May 19, 2025 at 8:02 PM Teunissen, F.G.J. (Fred) <
> fred.teunis...@ing.com> wrote:
>
> Hi David,
>
> ​The following​ stack trace is generated on the jobmanager:
>
> 2025-05-19 17:54:34,474 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
> inputTbl[1] -> (Calc[2] -> ConstraintEnforcer[3] -> outputTbl[3]: Writer ->
> outputTbl[3]: Committer, Calc[4] -> ConstraintEnforcer[5] -> outputTbl[5]:
> Writer -> outputTbl[5]: Committer) (1/1)
> (eba27e06694485745d077382c2019b60_e3dfc0d7e9ecd8a43f85f0b68ebf3b80_0_0)
> switched from INITIALIZING to FAILED on 172.19.0.6:46339-4464a6 @
> transactional-id-taskmanager-1.transactional-id_issue-transactional-id
> (dataPort=35465).
> java.lang.IllegalStateException: Found duplicate transactionalIdPrefix for
> multiple Kafka sinks: null. Transactional id prefixes need to be unique.
> You may experience memory leaks without fixing this.
> at
> org.apache.flink.connector.kafka.sink.internal.BackchannelFactory.getBackchannel(BackchannelFactory.java:113)
> ~[flink-sql-connector-kafka-4.0.0-2.0.jar:4.0.0-2.0]
> at
> org.apache.flink.connector.kafka.sink.internal.BackchannelFactory.getWritableBackchannel(BackchannelFactory.java:93)
> ~[flink-sql-connector-kafka-4.0.0-2.0.jar:4.0.0-2.0]
> at
> org.apache.flink.connector.kafka.sink.KafkaCommitter.<init>(KafkaCommitter.java:76)
> ~[flink-sql-connector-kafka-4.0.0-2.0.jar:4.0.0-2.0]
> at
> org.apache.flink.connector.kafka.sink.KafkaSink.createCommitter(KafkaSink.java:117)
> ~[flink-sql-connector-kafka-4.0.0-2.0.jar:4.0.0-2.0]
> at
> org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory.lambda$createStreamOperator$0(CommitterOperatorFactory.java:66)
> ~[flink-dist-2.0.0.jar:2.0.0]
> at
> org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.initializeState(CommitterOperator.java:127)
> ~[flink-dist-2.0.0.jar:2.0.0]
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:142)
> ~[flink-dist-2.0.0.jar:2.0.0]
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:304)
> ~[flink-dist-2.0.0.jar:2.0.0]
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> ~[flink-dist-2.0.0.jar:2.0.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:857)
> ~[flink-dist-2.0.0.jar:2.0.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:811)
> ~[flink-dist-2.0.0.jar:2.0.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> ~[flink-dist-2.0.0.jar:2.0.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:811)
> ~[flink-dist-2.0.0.jar:2.0.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:770)
> ~[flink-dist-2.0.0.jar:2.0.0]
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963)
> ~[flink-dist-2.0.0.jar:2.0.0]
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
> ~[flink-dist-2.0.0.jar:2.0.0]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
> ~[flink-dist-2.0.0.jar:2.0.0]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
> ~[flink-dist-2.0.0.jar:2.0.0]
> at java.base/java.lang.Thread.run(Unknown Source) ~[?:?]
> Caused by: java.lang.IllegalStateException: Writable backchannel already
> exists.
> at
> org.apache.flink.connector.kafka.sink.internal.BackchannelImpl.createWritableBackchannel(BackchannelImpl.java:96)
> ~[flink-sql-connector-kafka-4.0.0-2.0.jar:4.0.0-2.0]
> at
> org.apache.flink.connector.kafka.sink.internal.BackchannelFactory.getBackchannel(BackchannelFactory.java:110)
> ~[flink-sql-connector-kafka-4.0.0-2.0.jar:4.0.0-2.0]
> ... 18 more
>
>
> Kind regards, Fred
>
> *From: *David Radley <david_rad...@uk.ibm.com>
> *Date: *Monday, 19 May 2025 at 19:47
> *To: *dev@flink.apache.org <dev@flink.apache.org>
> *Cc: *ar...@apache.org <ar...@apache.org>
> *Subject: *Re: Issue with Duplicate transactionalIdPrefix in Flink 2.0
> Kafka Sinks
>
> Hi Fred,
> I see. It looks like this check was added in
> https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-37282&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508648107%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=41%2F7AgJff9nrLmPwZsu%2FVbbjTsGdya2erRbS5UPlbuU%3D&reserved=0
> <https://issues.apache.org/jira/browse/FLINK-37282>
>
> Do you have a stack trace for the exception – this could show how you got
> to this code in the at least once case?
>
> By copy Arvid: any thoughts on this?
>
>    Kind regards, David.
>
> From: Teunissen, F.G.J. (Fred) <fred.teunis...@ing.com.INVALID>
> Date: Monday, 19 May 2025 at 17:33
> To: dev@flink.apache.org <dev@flink.apache.org>
> Subject: [EXTERNAL] Re: Issue with Duplicate transactionalIdPrefix in
> Flink 2.0 Kafka Sinks
> Hi David,
>
> Depending on the flink version we use a different Kafka connector.
>
>   *
> flink:2.0.0 -> flink-connector-kafka:4.0.0-2.0
>   *
> flink:1.20.1 -> flink-connector-kafka:3.4.0-1.20
>   *
> flink:1.19.2 -> flink-connector-kafka:3.3.0-1.19
>
> We are using the default for delivery-guarantee (at-least-once), so
> according to the docs, the transactionalIdPrefix should not be required.
>
> kind regards,
> Fred
>
> From: David Radley <david_rad...@uk.ibm.com>
> Date: Monday, 19 May 2025 at 17:57
> To: dev@flink.apache.org <dev@flink.apache.org>
> Subject: Re: Issue with Duplicate transactionalIdPrefix in Flink 2.0 Kafka
> Sinks
>
> Hi,
> I had a quick look at this. What version of the Flink Kafka connector are
> you running?
> I looked through recent commits in the Kafka connector and see
>
> https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fcommit%2F7c112abe8bf78e0cd8a310aaa65b57f6a70ad30a&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508674283%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=E7JXoE0xpvaoPQe3jscBm3Fr9nY%2FKRm0Bl4rHvJKJhc%3D&reserved=0<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fcommit%2F7c112abe8bf78e0cd8a310aaa65b57f6a70ad30a&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508692036%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=7L0pMpt%2FW6Wra1AuU%2BD8qt012h%2BPXBhROCGL8gu%2B8iE%3D&reserved=0><https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fcommit%2F7c112abe8bf78e0cd8a310aaa65b57f6a70ad30a%253chttps%3A%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fcommit%2F7c112abe8bf78e0cd8a310aaa65b57f6a70ad30a%253e&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508713548%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=9AXt2GUtVEy2eF8LbfBmT1s4uPCrLhE0FSJpXX%2FKTOo%3D&reserved=0>
> <https://github.com/apache/flink-connector-kafka/commit/7c112abe8bf78e0cd8a310aaa65b57f6a70ad30a>
>
> for PR
> https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fpull%2F156&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508729669%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=2hSeSKm4b55O8uR4q6TQ4J0fW%2FDCGVRnMRbm3i3kzSg%3D&reserved=0<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fpull%2F156&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508744778%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=olRS6e3SuEInnpDYu2%2BRVBe4maTtz1vvOBdLhc2QjNI%3D&reserved=0><https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fpull%2F156%253chttps%3A%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fpull%2F156%253e&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508761138%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=el%2B8%2BbCFniRa5ZRiorUKfxykoQYgMIueMFKElhh21bA%3D&reserved=0>
> <https://github.com/apache/flink-connector-kafka/pull/156>
>
> This looks like a change in behaviour relating to policing the Duplicate
> transactionalIdPrefix for sinks,
>
>    Kind regards, David.
>
> From: Teunissen, F.G.J. (Fred) <fred.teunis...@ing.com.INVALID>
> Date: Monday, 19 May 2025 at 13:45
> To: dev@flink.apache.org <dev@flink.apache.org>
> Subject: [EXTERNAL] Issue with Duplicate transactionalIdPrefix in Flink
> 2.0 Kafka Sinks
> Hi everyone,
>
> I'm encountering an issue with Flink 2.0 when using the Table API. In
> previous versions (1.19/1.20), I was able to create a Flink job with the
> following setup:
>
>   *
> One Kafka topic-based input table
>   *
> One Kafka topic-based output table
>   *
> One statement set with two insert statements, both reading from the input
> and writing to the output table
>
> However, with Flink 2.0, the JobManager generates the following error:
> java.lang.IllegalStateException: Found duplicate transactionalIdPrefix for
> multiple Kafka sinks: null. Transactional id prefixes need to be unique.
> You may experience memory leaks without fixing this.
>
> Is this expected behavior in Flink 2.0, or should it still be possible to
> use the same setup without encountering this error?
>
> You can find a reproducible example of this issue on GitHub<
> https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2FFredTing%2Ftransactional-id-issue&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508777443%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=%2FfVyw934Km3q3iUSbBeawEBwlVAK57vmC6%2FLB6xvJss%3D&reserved=0<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2FFredTing%2Ftransactional-id-issue&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508795211%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=oT6xHnEKwk2V7Ib0CIclRuhp3fQlDDmOPPQFhJdvrEI%3D&reserved=0<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2FFredTing%2Ftransactional-id-issue%253chttps%3A%2Fgithub.com%2FFredTing%2Ftransactional-id-issue&data=05%7C02%7CFred.Teunissen%40ing.com%7C0d076ad25bef4485eb7808dd96fd385a%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638832736508811495%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=PIY64sclw%2FQZffbqbWxttOBJvUxxLMxHuzfTTM0NPwg%3D&reserved=0
> <https://github.com/FredTing/transactional-id-issue>>>>.
>
> To reproduce the error, you can run the bash script
> ./build-and-start-job.sh​. It will compile the job and start a Docker
> environment running the job. It also opens up the Flink dashboard, where
> you can find the failing job (after a few seconds).
>
> Additionally, when you use 1.20​ or 1.19​ as an argument to the script,
> the job runs successfully.
>
> Any insights or suggestions would be greatly appreciated!
>
> Best regards,
> Fred Teunissen
>
> -----------------------------------------------------------------
> ATTENTION:
> The information in this e-mail is confidential and only meant for the
> intended recipient. If you are not the intended recipient, don't use or
> disclose it in any way. Please let the sender know and delete the message
> immediately.
> -----------------------------------------------------------------
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: Building C, IBM Hursley Office, Hursley Park Road,
> Winchester, Hampshire SO21 2JN
>
> -----------------------------------------------------------------
> ATTENTION:
> The information in this e-mail is confidential and only meant for the
> intended recipient. If you are not the intended recipient, don't use or
> disclose it in any way. Please let the sender know and delete the message
> immediately.
> -----------------------------------------------------------------
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: Building C, IBM Hursley Office, Hursley Park Road,
> Winchester, Hampshire SO21 2JN
>
> -----------------------------------------------------------------
> ATTENTION:
> The information in this e-mail is confidential and only meant for the 
> intended recipient. If you are not the intended recipient, don't use or 
> disclose it in any way. Please let the sender know and delete the message 
> immediately.
> -----------------------------------------------------------------
>
> -----------------------------------------------------------------
> ATTENTION:
> The information in this e-mail is confidential and only meant for the 
> intended recipient. If you are not the intended recipient, don't use or 
> disclose it in any way. Please let the sender know and delete the message 
> immediately.
> -----------------------------------------------------------------
>
>

Reply via email to