[ https://issues.apache.org/jira/browse/FLINK-30945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jingsong Lee closed FLINK-30945. -------------------------------- Resolution: Fixed https://github.com/apache/incubator-paimon/issues/745 > FTS does not support multiple writers into the same table and topic > ------------------------------------------------------------------- > > Key: FLINK-30945 > URL: https://issues.apache.org/jira/browse/FLINK-30945 > Project: Flink > Issue Type: Bug > Components: Table Store > Reporter: Vicky Papavasileiou > Priority: Major > > When creating two different streaming jobs that INSERT INTO the same table > and kafka topic, the second job is never able to make progress as the > transaction gets constantly aborted due to the producer getting fenced. > FTS should set the transactionalIdPrefix to avoid transactions of different > jobs clashing. > {code:java} > 2023-02-06 17:13:36,088 WARN org.apache.flink.runtime.taskmanager.Task [] - > Writer -> Global Committer -> Sink: end (1/1)#0 > (8cf4197af9716623c3c19e7fa3d7c071_b5c8d46f3e7b141acf271f12622e752b_0_0) > switched from RUNNING to FAILED with failure cause: > org.apache.flink.util.FlinkRuntimeException: Committing one of transactions > failed, logging first encountered failure at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:323) > at > org.apache.flink.table.store.connector.sink.StoreWriteOperator.notifyCheckpointComplete(StoreWriteOperator.java:175) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:479) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:413) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1412) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$15(StreamTask.java:1353) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$18(StreamTask.java:1392) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at > java.lang.Thread.run(Thread.java:750) Caused by: > org.apache.flink.table.store.shaded.org.apache.kafka.common.errors.ProducerFencedException: > There is a newer producer with the same transactionalId which fences the > current one. {code} > Sample queries: > > > {code:java} > CREATE CATALOG table_store_catalog WITH ( > 'type'='table-store', > 'warehouse'='s3://my-bucket/table-store' > ); > USE CATALOG table_store_catalog; > SET 'execution.checkpointing.interval' = '10 s'; > CREATE TABLE word_count_kafka ( > word STRING PRIMARY KEY NOT ENFORCED, > cnt BIGINT > ) WITH ( > 'log.system' = 'kafka', > 'kafka.bootstrap.servers' = 'broker:9092', > 'kafka.topic' = 'word_count_log' > ); > CREATE TEMPORARY TABLE word_table ( > word STRING > ) WITH ( > 'connector' = 'datagen', > 'fields.word.length' = '1' > ); > {code} > > And the two INSERT jobs: > {code:java} > INSERT INTO word_count_kafka SELECT word, COUNT(*) FROM word_table GROUP BY > word;{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)