Vicky Papavasileiou created FLINK-30945:
-------------------------------------------
Summary: FTS does not support multiple writers into the same table
and topic
Key: FLINK-30945
URL: https://issues.apache.org/jira/browse/FLINK-30945
Project: Flink
Issue Type: Bug
Components: Table Store
Reporter: Vicky Papavasileiou
When creating two different streaming jobs that INSERT INTO the same table and
kafka topic, the second job is never able to make progress as the transaction
gets constantly aborted due to the producer getting fenced.
FTS should set the transactionalIdPrefix to avoid transactions of different
jobs clashing.
{code:java}
2023-02-06 17:13:36,088 WARN org.apache.flink.runtime.taskmanager.Task [] -
Writer -> Global Committer -> Sink: end (1/1)#0
(8cf4197af9716623c3c19e7fa3d7c071_b5c8d46f3e7b141acf271f12622e752b_0_0)
switched from RUNNING to FAILED with failure cause:
org.apache.flink.util.FlinkRuntimeException: Committing one of transactions
failed, logging first encountered failure at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:323)
at
org.apache.flink.table.store.connector.sink.StoreWriteOperator.notifyCheckpointComplete(StoreWriteOperator.java:175)
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:479)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:413)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1412)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$15(StreamTask.java:1353)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$18(StreamTask.java:1392)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at
java.lang.Thread.run(Thread.java:750) Caused by:
org.apache.flink.table.store.shaded.org.apache.kafka.common.errors.ProducerFencedException:
There is a newer producer with the same transactionalId which fences the
current one. {code}
Sample queries:
{code:java}
CREATE CATALOG table_store_catalog WITH (
'type'='table-store',
'warehouse'='s3://my-bucket/table-store'
);
USE CATALOG table_store_catalog;
SET 'execution.checkpointing.interval' = '10 s';
CREATE TABLE word_count_kafka (
word STRING PRIMARY KEY NOT ENFORCED,
cnt BIGINT
) WITH (
'log.system' = 'kafka',
'kafka.bootstrap.servers' = 'broker:9092',
'kafka.topic' = 'word_count_log'
);
CREATE TEMPORARY TABLE word_table (
word STRING
) WITH (
'connector' = 'datagen',
'fields.word.length' = '1'
);
{code}
And the two INSERT jobs:
{code:java}
INSERT INTO word_count_kafka SELECT word, COUNT(*) FROM word_table GROUP BY
word;{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)