Forwarding 周瑞's message to a duplicate thread: After our analysis, we found a bug in the org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.resumeTransaction method The analysis process is as follows:
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction public void initializeState(FunctionInitializationContext context) throws Exception { state = context.getOperatorStateStore().getListState(stateDescriptor); boolean recoveredUserContext = false; if (context.isRestored()) { LOG.info("{} - restoring state", name()); for (State<TXN, CONTEXT> operatorState : state.get()) { userContext = operatorState.getContext(); List<TransactionHolder<TXN>> recoveredTransactions = operatorState.getPendingCommitTransactions(); List<TXN> handledTransactions = new ArrayList<>(recoveredTransactions.size() + 1); for (TransactionHolder<TXN> recoveredTransaction : recoveredTransactions) { // If this fails to succeed eventually, there is actually data loss recoverAndCommitInternal(recoveredTransaction); handledTransactions.add(recoveredTransaction.handle); LOG.info("{} committed recovered transaction {}", name(), recoveredTransaction); } { TXN transaction = operatorState.getPendingTransaction().handle; recoverAndAbort(transaction); handledTransactions.add(transaction); LOG.info( "{} aborted recovered transaction {}", name(), operatorState.getPendingTransaction()); } if (userContext.isPresent()) { finishRecoveringContext(handledTransactions); recoveredUserContext = true; } } } (1)recoverAndCommitInternal(recoveredTransaction); The previous transactionalid, producerId and epoch in the state are used to commit the transaction,However, we find that the producerIdAndEpoch of transactionManager is a static constant (ProducerIdAndEpoch.NONE), which pollutes the static constant ProducerIdAndEpoch.NONE @Override protected void recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState transaction) { if (transaction.isTransactional()) { FlinkKafkaInternalProducer<byte[], byte[]> producer = null; try { producer = initTransactionalProducer(transaction.transactionalId, false); producer.resumeTransaction(transaction.producerId, transaction.epoch); producer.commitTransaction(); } catch (InvalidTxnStateException | ProducerFencedException ex) { // That means we have committed this transaction before. LOG.warn( "Encountered error {} while recovering transaction {}. " + "Presumably this transaction has been already committed before", ex, transaction); } finally { if (producer != null) { producer.close(0, TimeUnit.SECONDS); } } } } public void resumeTransaction(long producerId, short epoch) { synchronized (producerClosingLock) { ensureNotClosed(); Preconditions.checkState( producerId >= 0 && epoch >= 0, "Incorrect values for producerId %s and epoch %s", producerId, epoch); LOG.info( "Attempting to resume transaction {} with producerId {} and epoch {}", transactionalId, producerId, epoch); Object transactionManager = getField(kafkaProducer, "transactionManager"); synchronized (transactionManager) { Object topicPartitionBookkeeper = getField(transactionManager, "topicPartitionBookkeeper"); invoke( transactionManager, "transitionTo", getEnum( "org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING")); invoke(topicPartitionBookkeeper, "reset"); Object producerIdAndEpoch = getField(transactionManager, "producerIdAndEpoch"); setField(producerIdAndEpoch, "producerId", producerId); setField(producerIdAndEpoch, "epoch", epoch); invoke( transactionManager, "transitionTo", getEnum( "org.apache.kafka.clients.producer.internals.TransactionManager$State.READY")); invoke( transactionManager, "transitionTo", getEnum( "org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION")); setField(transactionManager, "transactionStarted", true); } } } public TransactionManager(LogContext logContext, String transactionalId, int transactionTimeoutMs, long retryBackoffMs, ApiVersions apiVersions) { this.producerIdAndEpoch = ProducerIdAndEpoch.NONE; this.transactionalId = transactionalId; this.log = logContext.logger(TransactionManager.class); this.transactionTimeoutMs = transactionTimeoutMs; this.transactionCoordinator = null; this.consumerGroupCoordinator = null; this.newPartitionsInTransaction = new HashSet<>(); this.pendingPartitionsInTransaction = new HashSet<>(); this.partitionsInTransaction = new HashSet<>(); this.pendingRequests = new PriorityQueue<>(10, Comparator.comparingInt(o -> o.priority().priority)); this.pendingTxnOffsetCommits = new HashMap<>(); this.partitionsWithUnresolvedSequences = new HashMap<>(); this.partitionsToRewriteSequences = new HashSet<>(); this.retryBackoffMs = retryBackoffMs; this.topicPartitionBookkeeper = new TopicPartitionBookkeeper(); this.apiVersions = apiVersions; } public class ProducerIdAndEpoch { public static final ProducerIdAndEpoch NONE = new ProducerIdAndEpoch(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH); public final long producerId; public final short epoch; public ProducerIdAndEpoch(long producerId, short epoch) { this.producerId = producerId; this.epoch = epoch; } public boolean isValid() { return RecordBatch.NO_PRODUCER_ID < producerId; } @Override public String toString() { return "(producerId=" + producerId + ", epoch=" + epoch + ")"; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ProducerIdAndEpoch that = (ProducerIdAndEpoch) o; if (producerId != that.producerId) return false; return epoch == that.epoch; } @Override public int hashCode() { int result = (int) (producerId ^ (producerId >>> 32)); result = 31 * result + (int) epoch; return result; } } (2)In the second step, recoverAndAbort(FlinkKafkaProducer.KafkaTransactionState transaction), when initializing the transaction, producerId and epoch in the first step pollute ProducerIdAndEpoch.NONE. Therefore, when an initialization request is sent to Kafka, the values of the producerId and epoch variables in the request parameter ProducerIdAndEpoch.NONE are equal to the values of the producerId and epoch variables in the first transaction commit, not equal to - 1, - 1. So Kafka throws an exception: Unexpected error in InitProducerIdResponse; Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352) at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564) at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at java.lang.Thread.run(Thread.java:748) protected void recoverAndAbort(FlinkKafkaProducer.KafkaTransactionState transaction) { if (transaction.isTransactional()) { FlinkKafkaInternalProducer<byte[], byte[]> producer = null; try { producer = initTransactionalProducer(transaction.transactionalId, false); producer.initTransactions(); } finally { if (producer != null) { producer.close(0, TimeUnit.SECONDS); } } } } public synchronized TransactionalRequestResult initializeTransactions() { return initializeTransactions(ProducerIdAndEpoch.NONE); } synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoch producerIdAndEpoch) { boolean isEpochBump = producerIdAndEpoch != ProducerIdAndEpoch.NONE; return handleCachedTransactionRequestResult(() -> { // If this is an epoch bump, we will transition the state as part of handling the EndTxnRequest if (!isEpochBump) { transitionTo(State.INITIALIZING); log.info("Invoking InitProducerId for the first time in order to acquire a producer ID"); } else { log.info("Invoking InitProducerId with current producer ID and epoch {} in order to bump the epoch", producerIdAndEpoch); } InitProducerIdRequestData requestData = new InitProducerIdRequestData() .setTransactionalId(transactionalId) .setTransactionTimeoutMs(transactionTimeoutMs) .setProducerId(producerIdAndEpoch.producerId) .setProducerEpoch(producerIdAndEpoch.epoch); InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData), isEpochBump); enqueueRequest(handler); return handler.result; }, State.INITIALIZING); } On Wed, Jun 2, 2021 at 3:55 PM Piotr Nowojski <pnowoj...@apache.org> wrote: > Hi, > > I think there is no generic way. If this error has happened indeed after > starting a second job from the same savepoint, or something like that, user > can change Sink's operator UID. > > If this is an issue of intentional recovery from an earlier > checkpoint/savepoint, maybe `FlinkKafkaProducer#setLogFailuresOnly` will be > helpful. > > Best, Piotrek > > wt., 1 cze 2021 o 15:16 Till Rohrmann <trohrm...@apache.org> napisał(a): > >> The error message says that we are trying to reuse a transaction id that >> is >> currently being used or has expired. >> >> I am not 100% sure how this can happen. My suspicion is that you have >> resumed a job multiple times from the same savepoint. Have you checked >> that >> there is no other job which has been resumed from the same savepoint and >> which is currently running or has run and completed checkpoints? >> >> @pnowojski <pnowoj...@apache.org> @Becket Qin <becket....@gmail.com> how >> does the transaction id generation ensures that we don't have a clash of >> transaction ids if we resume the same job multiple times from the same >> savepoint? From the code, I do see that we have a >> TransactionalIdsGenerator >> which is initialized with the taskName and the operator UID. >> >> fyi: @Arvid Heise <ar...@apache.org> >> >> Cheers, >> Till >> >> >> On Mon, May 31, 2021 at 11:10 AM 周瑞 <rui.z...@woqutech.com> wrote: >> >> > HI: >> > When "sink.semantic = exactly-once", the following exception is >> > thrown when recovering from svaepoint >> > >> > public static final String KAFKA_TABLE_FORMAT = >> > "CREATE TABLE "+TABLE_NAME+" (\n" + >> > " "+COLUMN_NAME+" STRING\n" + >> > ") WITH (\n" + >> > " 'connector' = 'kafka',\n" + >> > " 'topic' = '%s',\n" + >> > " 'properties.bootstrap.servers' = '%s',\n" + >> > " 'sink.semantic' = 'exactly-once',\n" + >> > " 'properties.transaction.timeout.ms' = >> > '900000',\n" + >> > " 'sink.partitioner' = >> > 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" + >> > " 'format' = 'dbz-json'\n" + >> > ")\n"; >> > [] - Source: TableSourceScan(table=[[default_catalog, >> default_database, >> > debezium_source]], fields=[data]) -> Sink: Sink >> > (table=[default_catalog.default_database.KafkaTable], fields=[data]) >> (1/1 >> > )#859 (075273be72ab01bf1afd3c066876aaa6) switched from INITIALIZING to >> > FAILED with failure cause: org.apache.kafka.common.KafkaException: >> > Unexpected error in InitProducerIdResponse; Producer attempted an >> > operation with an old epoch. Either there is a newer producer with the >> > same transactionalId, or the producer's transaction has been expired by >> > the broker. >> > at org.apache.kafka.clients.producer.internals. >> > >> TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager >> > .java:1352) >> > at org.apache.kafka.clients.producer.internals. >> > TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java: >> > 1260) >> > at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse >> > .java:109) >> > at org.apache.kafka.clients.NetworkClient.completeResponses( >> > NetworkClient.java:572) >> > at >> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564) >> > at org.apache.kafka.clients.producer.internals.Sender >> > .maybeSendAndPollTransactionalRequest(Sender.java:414) >> > at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender >> > .java:312) >> > at >> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java: >> > 239) >> > at java.lang.Thread.run(Thread.java:748) >> > >> >