Responded as part of the following discussion https://lists.apache.org/x/thread.html/re85a1fb3f17b5ef1af2844fc1a07b6d2f5e6237bf4c33059e13890ee@%3Cuser.flink.apache.org%3E. Let's continue the discussion there.
Cheers, Till On Mon, May 31, 2021 at 11:02 AM 周瑞 <rui.z...@woqutech.com> wrote: > HI: > When "sink.semantic = exactual-only", the following exception is > thrown when recovering from svaepoint > > public static final String KAFKA_TABLE_FORMAT = > "CREATE TABLE "+TABLE_NAME+" (\n" + > " "+COLUMN_NAME+" STRING\n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = '%s',\n" + > " 'properties.bootstrap.servers' = '%s',\n" + > " 'sink.semantic' = 'exactly-once',\n" + > " 'properties.transaction.timeout.ms' = > '900000',\n" + > " 'sink.partitioner' = > 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" + > " 'format' = 'dbz-json'\n" + > ")\n"; > [] - Source: TableSourceScan(table=[[default_catalog, default_database, > debezium_source]], fields=[data]) -> Sink: Sink > (table=[default_catalog.default_database.KafkaTable], fields=[data]) (1/1 > )#859 (075273be72ab01bf1afd3c066876aaa6) switched from INITIALIZING to > FAILED with failure cause: org.apache.kafka.common.KafkaException: > Unexpected error in InitProducerIdResponse; Producer attempted an > operation with an old epoch. Either there is a newer producer with the > same transactionalId, or the producer's transaction has been expired by > the broker. > at org.apache.kafka.clients.producer.internals. > TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager > .java:1352) > at org.apache.kafka.clients.producer.internals. > TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java: > 1260) > at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse > .java:109) > at org.apache.kafka.clients.NetworkClient.completeResponses( > NetworkClient.java:572) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564) > at org.apache.kafka.clients.producer.internals.Sender > .maybeSendAndPollTransactionalRequest(Sender.java:414) > at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender > .java:312) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java: > 239) > at java.lang.Thread.run(Thread.java:748) >