Hi, I think there is no generic way. If this error has happened indeed after starting a second job from the same savepoint, or something like that, user can change Sink's operator UID.
If this is an issue of intentional recovery from an earlier checkpoint/savepoint, maybe `FlinkKafkaProducer#setLogFailuresOnly` will be helpful. Best, Piotrek wt., 1 cze 2021 o 15:16 Till Rohrmann <trohrm...@apache.org> napisał(a): > The error message says that we are trying to reuse a transaction id that is > currently being used or has expired. > > I am not 100% sure how this can happen. My suspicion is that you have > resumed a job multiple times from the same savepoint. Have you checked that > there is no other job which has been resumed from the same savepoint and > which is currently running or has run and completed checkpoints? > > @pnowojski <pnowoj...@apache.org> @Becket Qin <becket....@gmail.com> how > does the transaction id generation ensures that we don't have a clash of > transaction ids if we resume the same job multiple times from the same > savepoint? From the code, I do see that we have a TransactionalIdsGenerator > which is initialized with the taskName and the operator UID. > > fyi: @Arvid Heise <ar...@apache.org> > > Cheers, > Till > > > On Mon, May 31, 2021 at 11:10 AM 周瑞 <rui.z...@woqutech.com> wrote: > > > HI: > > When "sink.semantic = exactly-once", the following exception is > > thrown when recovering from svaepoint > > > > public static final String KAFKA_TABLE_FORMAT = > > "CREATE TABLE "+TABLE_NAME+" (\n" + > > " "+COLUMN_NAME+" STRING\n" + > > ") WITH (\n" + > > " 'connector' = 'kafka',\n" + > > " 'topic' = '%s',\n" + > > " 'properties.bootstrap.servers' = '%s',\n" + > > " 'sink.semantic' = 'exactly-once',\n" + > > " 'properties.transaction.timeout.ms' = > > '900000',\n" + > > " 'sink.partitioner' = > > 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" + > > " 'format' = 'dbz-json'\n" + > > ")\n"; > > [] - Source: TableSourceScan(table=[[default_catalog, default_database, > > debezium_source]], fields=[data]) -> Sink: Sink > > (table=[default_catalog.default_database.KafkaTable], fields=[data]) (1/1 > > )#859 (075273be72ab01bf1afd3c066876aaa6) switched from INITIALIZING to > > FAILED with failure cause: org.apache.kafka.common.KafkaException: > > Unexpected error in InitProducerIdResponse; Producer attempted an > > operation with an old epoch. Either there is a newer producer with the > > same transactionalId, or the producer's transaction has been expired by > > the broker. > > at org.apache.kafka.clients.producer.internals. > > > TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager > > .java:1352) > > at org.apache.kafka.clients.producer.internals. > > TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java: > > 1260) > > at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse > > .java:109) > > at org.apache.kafka.clients.NetworkClient.completeResponses( > > NetworkClient.java:572) > > at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564) > > at org.apache.kafka.clients.producer.internals.Sender > > .maybeSendAndPollTransactionalRequest(Sender.java:414) > > at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender > > .java:312) > > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java: > > 239) > > at java.lang.Thread.run(Thread.java:748) > > >