Responded as part of the following discussion
https://lists.apache.org/x/thread.html/re85a1fb3f17b5ef1af2844fc1a07b6d2f5e6237bf4c33059e13890ee@%3Cuser.flink.apache.org%3E.
Let's continue the discussion there.

Cheers,
Till

On Mon, May 31, 2021 at 11:02 AM 周瑞 <rui.z...@woqutech.com> wrote:

> HI:
>       When "sink.semantic = exactual-only", the following exception is
> thrown when recovering from svaepoint
>
>        public static final String KAFKA_TABLE_FORMAT =
>             "CREATE TABLE "+TABLE_NAME+" (\n" +
>                     "  "+COLUMN_NAME+" STRING\n" +
>                     ") WITH (\n" +
>                     "   'connector' = 'kafka',\n" +
>                     "   'topic' = '%s',\n" +
>                     "   'properties.bootstrap.servers' = '%s',\n" +
>                     "   'sink.semantic' = 'exactly-once',\n" +
>                     "   'properties.transaction.timeout.ms' =
> '900000',\n" +
>                     "   'sink.partitioner' =
> 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" +
>                     "   'format' = 'dbz-json'\n" +
>                     ")\n";
>   [] - Source: TableSourceScan(table=[[default_catalog, default_database,
> debezium_source]], fields=[data]) -> Sink: Sink
> (table=[default_catalog.default_database.KafkaTable], fields=[data]) (1/1
> )#859 (075273be72ab01bf1afd3c066876aaa6) switched from INITIALIZING to
> FAILED with failure cause: org.apache.kafka.common.KafkaException:
> Unexpected error in InitProducerIdResponse; Producer attempted an
> operation with an old epoch. Either there is a newer producer with the
> same transactionalId, or the producer's transaction has been expired by
> the broker.
>     at org.apache.kafka.clients.producer.internals.
> TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager
> .java:1352)
>     at org.apache.kafka.clients.producer.internals.
> TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:
> 1260)
>     at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse
> .java:109)
>     at org.apache.kafka.clients.NetworkClient.completeResponses(
> NetworkClient.java:572)
>     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
>     at org.apache.kafka.clients.producer.internals.Sender
> .maybeSendAndPollTransactionalRequest(Sender.java:414)
>     at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender
> .java:312)
>     at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:
> 239)
>     at java.lang.Thread.run(Thread.java:748)
>

Reply via email to