Vicky Papavasileiou created FLINK-30945: -------------------------------------------
Summary: FTS does not support multiple writers into the same table and topic Key: FLINK-30945 URL: https://issues.apache.org/jira/browse/FLINK-30945 Project: Flink Issue Type: Bug Components: Table Store Reporter: Vicky Papavasileiou When creating two different streaming jobs that INSERT INTO the same table and kafka topic, the second job is never able to make progress as the transaction gets constantly aborted due to the producer getting fenced. FTS should set the transactionalIdPrefix to avoid transactions of different jobs clashing. {code:java} 2023-02-06 17:13:36,088 WARN org.apache.flink.runtime.taskmanager.Task [] - Writer -> Global Committer -> Sink: end (1/1)#0 (8cf4197af9716623c3c19e7fa3d7c071_b5c8d46f3e7b141acf271f12622e752b_0_0) switched from RUNNING to FAILED with failure cause: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:323) at org.apache.flink.table.store.connector.sink.StoreWriteOperator.notifyCheckpointComplete(StoreWriteOperator.java:175) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:479) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:413) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1412) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$15(StreamTask.java:1353) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$18(StreamTask.java:1392) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.flink.table.store.shaded.org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one. {code} Sample queries: {code:java} CREATE CATALOG table_store_catalog WITH ( 'type'='table-store', 'warehouse'='s3://my-bucket/table-store' ); USE CATALOG table_store_catalog; SET 'execution.checkpointing.interval' = '10 s'; CREATE TABLE word_count_kafka ( word STRING PRIMARY KEY NOT ENFORCED, cnt BIGINT ) WITH ( 'log.system' = 'kafka', 'kafka.bootstrap.servers' = 'broker:9092', 'kafka.topic' = 'word_count_log' ); CREATE TEMPORARY TABLE word_table ( word STRING ) WITH ( 'connector' = 'datagen', 'fields.word.length' = '1' ); {code} And the two INSERT jobs: {code:java} INSERT INTO word_count_kafka SELECT word, COUNT(*) FROM word_table GROUP BY word;{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)