Vicky Papavasileiou created FLINK-30945:
-------------------------------------------

             Summary: FTS does not support multiple writers into the same table 
and topic
                 Key: FLINK-30945
                 URL: https://issues.apache.org/jira/browse/FLINK-30945
             Project: Flink
          Issue Type: Bug
          Components: Table Store
            Reporter: Vicky Papavasileiou


When creating two different streaming jobs that INSERT INTO the same table and 
kafka topic, the second job is never able to make progress as the transaction 
gets constantly aborted due to the producer getting fenced.

FTS should set the transactionalIdPrefix to avoid transactions of different 
jobs clashing.
{code:java}
2023-02-06 17:13:36,088 WARN org.apache.flink.runtime.taskmanager.Task [] - 
Writer -> Global Committer -> Sink: end (1/1)#0 
(8cf4197af9716623c3c19e7fa3d7c071_b5c8d46f3e7b141acf271f12622e752b_0_0) 
switched from RUNNING to FAILED with failure cause: 
org.apache.flink.util.FlinkRuntimeException: Committing one of transactions 
failed, logging first encountered failure at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:323)
 at 
org.apache.flink.table.store.connector.sink.StoreWriteOperator.notifyCheckpointComplete(StoreWriteOperator.java:175)
 at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104)
 at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145)
 at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:479)
 at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:413)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1412)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$15(StreamTask.java:1353)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$18(StreamTask.java:1392)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
 at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) 
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
 at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at 
java.lang.Thread.run(Thread.java:750) Caused by: 
org.apache.flink.table.store.shaded.org.apache.kafka.common.errors.ProducerFencedException:
 There is a newer producer with the same transactionalId which fences the 
current one. {code}
Sample queries:
 
 
{code:java}
CREATE CATALOG table_store_catalog WITH (
    'type'='table-store',
    'warehouse'='s3://my-bucket/table-store'
 );
USE CATALOG table_store_catalog;
SET 'execution.checkpointing.interval' = '10 s';
CREATE TABLE word_count_kafka (
     word STRING PRIMARY KEY NOT ENFORCED,
     cnt BIGINT
 ) WITH (
     'log.system' = 'kafka',
     'kafka.bootstrap.servers' = 'broker:9092',
     'kafka.topic' = 'word_count_log'
 );
CREATE TEMPORARY TABLE word_table (
     word STRING
 ) WITH (
     'connector' = 'datagen',
     'fields.word.length' = '1'
 );
{code}
 

And the two INSERT jobs:
{code:java}
INSERT INTO word_count_kafka SELECT word, COUNT(*) FROM word_table GROUP BY 
word;{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to