Hi,

I think there is no generic way. If this error has happened indeed after
starting a second job from the same savepoint, or something like that, user
can change Sink's operator UID.

If this is an issue of intentional recovery from an earlier
checkpoint/savepoint, maybe `FlinkKafkaProducer#setLogFailuresOnly` will be
helpful.

Best, Piotrek

wt., 1 cze 2021 o 15:16 Till Rohrmann <trohrm...@apache.org> napisał(a):

> The error message says that we are trying to reuse a transaction id that is
> currently being used or has expired.
>
> I am not 100% sure how this can happen. My suspicion is that you have
> resumed a job multiple times from the same savepoint. Have you checked that
> there is no other job which has been resumed from the same savepoint and
> which is currently running or has run and completed checkpoints?
>
> @pnowojski <pnowoj...@apache.org> @Becket Qin <becket....@gmail.com> how
> does the transaction id generation ensures that we don't have a clash of
> transaction ids if we resume the same job multiple times from the same
> savepoint? From the code, I do see that we have a TransactionalIdsGenerator
> which is initialized with the taskName and the operator UID.
>
> fyi: @Arvid Heise <ar...@apache.org>
>
> Cheers,
> Till
>
>
> On Mon, May 31, 2021 at 11:10 AM 周瑞 <rui.z...@woqutech.com> wrote:
>
> > HI:
> >       When "sink.semantic = exactly-once", the following exception is
> > thrown when recovering from svaepoint
> >
> >        public static final String KAFKA_TABLE_FORMAT =
> >             "CREATE TABLE "+TABLE_NAME+" (\n" +
> >                     "  "+COLUMN_NAME+" STRING\n" +
> >                     ") WITH (\n" +
> >                     "   'connector' = 'kafka',\n" +
> >                     "   'topic' = '%s',\n" +
> >                     "   'properties.bootstrap.servers' = '%s',\n" +
> >                     "   'sink.semantic' = 'exactly-once',\n" +
> >                     "   'properties.transaction.timeout.ms' =
> > '900000',\n" +
> >                     "   'sink.partitioner' =
> > 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" +
> >                     "   'format' = 'dbz-json'\n" +
> >                     ")\n";
> >   [] - Source: TableSourceScan(table=[[default_catalog, default_database,
> > debezium_source]], fields=[data]) -> Sink: Sink
> > (table=[default_catalog.default_database.KafkaTable], fields=[data]) (1/1
> > )#859 (075273be72ab01bf1afd3c066876aaa6) switched from INITIALIZING to
> > FAILED with failure cause: org.apache.kafka.common.KafkaException:
> > Unexpected error in InitProducerIdResponse; Producer attempted an
> > operation with an old epoch. Either there is a newer producer with the
> > same transactionalId, or the producer's transaction has been expired by
> > the broker.
> >     at org.apache.kafka.clients.producer.internals.
> >
> TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager
> > .java:1352)
> >     at org.apache.kafka.clients.producer.internals.
> > TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:
> > 1260)
> >     at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse
> > .java:109)
> >     at org.apache.kafka.clients.NetworkClient.completeResponses(
> > NetworkClient.java:572)
> >     at
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
> >     at org.apache.kafka.clients.producer.internals.Sender
> > .maybeSendAndPollTransactionalRequest(Sender.java:414)
> >     at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender
> > .java:312)
> >     at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:
> > 239)
> >     at java.lang.Thread.run(Thread.java:748)
> >
>

Reply via email to