Thanks for the update. Skimming over the code it looks indeed that we are overwriting the values of the static value ProducerIdAndEpoch.NONE. I am not 100% how this will cause the observed problem, though. I am also not a Flink Kafka connector and Kafka expert so I would appreciate it if someone more familiar could double check this part of the code.
Concerning the required changing of the UID of an operator Piotr, is this a known issue and documented somewhere? I find this rather surprising from a user's point of view. Cheers, Till On Thu, Jun 3, 2021 at 8:53 AM Till Rohrmann <trohrm...@apache.org> wrote: > Forwarding 周瑞's message to a duplicate thread: > > After our analysis, we found a bug in the > org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.resumeTransaction > method > The analysis process is as follows: > > > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction > public void initializeState(FunctionInitializationContext context) throws > Exception { > state = context.getOperatorStateStore().getListState(stateDescriptor); > boolean recoveredUserContext = false; > if (context.isRestored()) { > LOG.info("{} - restoring state", name()); > for (State<TXN, CONTEXT> operatorState : state.get()) { > userContext = operatorState.getContext(); > List<TransactionHolder<TXN>> recoveredTransactions = > operatorState.getPendingCommitTransactions(); > List<TXN> handledTransactions = new > ArrayList<>(recoveredTransactions.size() + 1); > for (TransactionHolder<TXN> recoveredTransaction : > recoveredTransactions) { > // If this fails to succeed eventually, there is actually > data loss > recoverAndCommitInternal(recoveredTransaction); > handledTransactions.add(recoveredTransaction.handle); > LOG.info("{} committed recovered transaction {}", name(), > recoveredTransaction); > } > > { > TXN transaction = > operatorState.getPendingTransaction().handle; > recoverAndAbort(transaction); > handledTransactions.add(transaction); > LOG.info( > "{} aborted recovered transaction {}", > name(), > operatorState.getPendingTransaction()); > } > > if (userContext.isPresent()) { > finishRecoveringContext(handledTransactions); > recoveredUserContext = true; > } > } > } > > (1)recoverAndCommitInternal(recoveredTransaction); > The previous transactionalid, producerId and epoch in the state are used > to commit the transaction,However, we find that the producerIdAndEpoch of > transactionManager is a static constant (ProducerIdAndEpoch.NONE), which > pollutes the static constant ProducerIdAndEpoch.NONE > > @Override > protected void recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState > transaction) { > if (transaction.isTransactional()) { > FlinkKafkaInternalProducer<byte[], byte[]> producer = null; > try { > producer = > initTransactionalProducer(transaction.transactionalId, false); > producer.resumeTransaction(transaction.producerId, > transaction.epoch); > producer.commitTransaction(); > } catch (InvalidTxnStateException | ProducerFencedException ex) { > // That means we have committed this transaction before. > LOG.warn( > "Encountered error {} while recovering transaction {}. > " > + "Presumably this transaction has been > already committed before", > ex, > transaction); > } finally { > if (producer != null) { > producer.close(0, TimeUnit.SECONDS); > } > } > } > } > > public void resumeTransaction(long producerId, short epoch) { > synchronized (producerClosingLock) { > ensureNotClosed(); > Preconditions.checkState( > producerId >= 0 && epoch >= 0, > "Incorrect values for producerId %s and epoch %s", > producerId, > epoch); > LOG.info( > "Attempting to resume transaction {} with producerId {} > and epoch {}", > transactionalId, > producerId, > epoch); > > Object transactionManager = getField(kafkaProducer, > "transactionManager"); > synchronized (transactionManager) { > Object topicPartitionBookkeeper = > getField(transactionManager, > "topicPartitionBookkeeper"); > > invoke( > transactionManager, > "transitionTo", > getEnum( > > "org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING")); > invoke(topicPartitionBookkeeper, "reset"); > > Object producerIdAndEpoch = getField(transactionManager, > "producerIdAndEpoch"); > setField(producerIdAndEpoch, "producerId", producerId); > setField(producerIdAndEpoch, "epoch", epoch); > > invoke( > transactionManager, > "transitionTo", > getEnum( > > "org.apache.kafka.clients.producer.internals.TransactionManager$State.READY")); > > invoke( > transactionManager, > "transitionTo", > getEnum( > > "org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION")); > setField(transactionManager, "transactionStarted", true); > } > } > } > > > public TransactionManager(LogContext logContext, > String transactionalId, > int transactionTimeoutMs, > long retryBackoffMs, > ApiVersions apiVersions) { > this.producerIdAndEpoch = ProducerIdAndEpoch.NONE; > this.transactionalId = transactionalId; > this.log = logContext.logger(TransactionManager.class); > this.transactionTimeoutMs = transactionTimeoutMs; > this.transactionCoordinator = null; > this.consumerGroupCoordinator = null; > this.newPartitionsInTransaction = new HashSet<>(); > this.pendingPartitionsInTransaction = new HashSet<>(); > this.partitionsInTransaction = new HashSet<>(); > this.pendingRequests = new PriorityQueue<>(10, > Comparator.comparingInt(o -> o.priority().priority)); > this.pendingTxnOffsetCommits = new HashMap<>(); > this.partitionsWithUnresolvedSequences = new HashMap<>(); > this.partitionsToRewriteSequences = new HashSet<>(); > this.retryBackoffMs = retryBackoffMs; > this.topicPartitionBookkeeper = new TopicPartitionBookkeeper(); > this.apiVersions = apiVersions; > } > > > > public class ProducerIdAndEpoch { > public static final ProducerIdAndEpoch NONE = new > ProducerIdAndEpoch(RecordBatch.NO_PRODUCER_ID, > RecordBatch.NO_PRODUCER_EPOCH); > > public final long producerId; > public final short epoch; > > public ProducerIdAndEpoch(long producerId, short epoch) { > this.producerId = producerId; > this.epoch = epoch; > } > > public boolean isValid() { > return RecordBatch.NO_PRODUCER_ID < producerId; > } > > @Override > public String toString() { > return "(producerId=" + producerId + ", epoch=" + epoch + ")"; > } > > @Override > public boolean equals(Object o) { > if (this == o) return true; > if (o == null || getClass() != o.getClass()) return false; > > ProducerIdAndEpoch that = (ProducerIdAndEpoch) o; > > if (producerId != that.producerId) return false; > return epoch == that.epoch; > } > > @Override > public int hashCode() { > int result = (int) (producerId ^ (producerId >>> 32)); > result = 31 * result + (int) epoch; > return result; > } > > } > > (2)In the second step, > recoverAndAbort(FlinkKafkaProducer.KafkaTransactionState transaction), when > initializing the transaction, producerId and epoch in the first step > pollute ProducerIdAndEpoch.NONE. Therefore, when an initialization request > is sent to Kafka, the values of the producerId and epoch variables in the > request parameter ProducerIdAndEpoch.NONE are equal to the values of the > producerId and epoch variables in the first transaction commit, not equal > to - 1, - 1. So Kafka throws an exception: > Unexpected error in InitProducerIdResponse; Producer attempted an > operation with an old epoch. Either there is a newer producer with the same > transactionalId, or the producer's transaction has been expired by the > broker. > at > org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352) > at > org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564) > at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) > at java.lang.Thread.run(Thread.java:748) > > protected void recoverAndAbort(FlinkKafkaProducer.KafkaTransactionState > transaction) { > if (transaction.isTransactional()) { > FlinkKafkaInternalProducer<byte[], byte[]> producer = null; > try { > producer = > initTransactionalProducer(transaction.transactionalId, false); > producer.initTransactions(); > } finally { > if (producer != null) { > producer.close(0, TimeUnit.SECONDS); > } > } > } > } > > public synchronized TransactionalRequestResult initializeTransactions() { > return initializeTransactions(ProducerIdAndEpoch.NONE); > } > > synchronized TransactionalRequestResult > initializeTransactions(ProducerIdAndEpoch producerIdAndEpoch) { > boolean isEpochBump = producerIdAndEpoch != ProducerIdAndEpoch.NONE; > return handleCachedTransactionRequestResult(() -> { > // If this is an epoch bump, we will transition the state as part > of handling the EndTxnRequest > if (!isEpochBump) { > transitionTo(State.INITIALIZING); > log.info("Invoking InitProducerId for the first time in order > to acquire a producer ID"); > } else { > log.info("Invoking InitProducerId with current producer ID > and epoch {} in order to bump the epoch", producerIdAndEpoch); > } > InitProducerIdRequestData requestData = new > InitProducerIdRequestData() > .setTransactionalId(transactionalId) > .setTransactionTimeoutMs(transactionTimeoutMs) > .setProducerId(producerIdAndEpoch.producerId) > .setProducerEpoch(producerIdAndEpoch.epoch); > InitProducerIdHandler handler = new InitProducerIdHandler(new > InitProducerIdRequest.Builder(requestData), > isEpochBump); > enqueueRequest(handler); > return handler.result; > }, State.INITIALIZING); > } > > On Wed, Jun 2, 2021 at 3:55 PM Piotr Nowojski <pnowoj...@apache.org> > wrote: > >> Hi, >> >> I think there is no generic way. If this error has happened indeed after >> starting a second job from the same savepoint, or something like that, user >> can change Sink's operator UID. >> >> If this is an issue of intentional recovery from an earlier >> checkpoint/savepoint, maybe `FlinkKafkaProducer#setLogFailuresOnly` will be >> helpful. >> >> Best, Piotrek >> >> wt., 1 cze 2021 o 15:16 Till Rohrmann <trohrm...@apache.org> napisał(a): >> >>> The error message says that we are trying to reuse a transaction id that >>> is >>> currently being used or has expired. >>> >>> I am not 100% sure how this can happen. My suspicion is that you have >>> resumed a job multiple times from the same savepoint. Have you checked >>> that >>> there is no other job which has been resumed from the same savepoint and >>> which is currently running or has run and completed checkpoints? >>> >>> @pnowojski <pnowoj...@apache.org> @Becket Qin <becket....@gmail.com> how >>> does the transaction id generation ensures that we don't have a clash of >>> transaction ids if we resume the same job multiple times from the same >>> savepoint? From the code, I do see that we have a >>> TransactionalIdsGenerator >>> which is initialized with the taskName and the operator UID. >>> >>> fyi: @Arvid Heise <ar...@apache.org> >>> >>> Cheers, >>> Till >>> >>> >>> On Mon, May 31, 2021 at 11:10 AM 周瑞 <rui.z...@woqutech.com> wrote: >>> >>> > HI: >>> > When "sink.semantic = exactly-once", the following exception is >>> > thrown when recovering from svaepoint >>> > >>> > public static final String KAFKA_TABLE_FORMAT = >>> > "CREATE TABLE "+TABLE_NAME+" (\n" + >>> > " "+COLUMN_NAME+" STRING\n" + >>> > ") WITH (\n" + >>> > " 'connector' = 'kafka',\n" + >>> > " 'topic' = '%s',\n" + >>> > " 'properties.bootstrap.servers' = '%s',\n" + >>> > " 'sink.semantic' = 'exactly-once',\n" + >>> > " 'properties.transaction.timeout.ms' = >>> > '900000',\n" + >>> > " 'sink.partitioner' = >>> > 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" + >>> > " 'format' = 'dbz-json'\n" + >>> > ")\n"; >>> > [] - Source: TableSourceScan(table=[[default_catalog, >>> default_database, >>> > debezium_source]], fields=[data]) -> Sink: Sink >>> > (table=[default_catalog.default_database.KafkaTable], fields=[data]) >>> (1/1 >>> > )#859 (075273be72ab01bf1afd3c066876aaa6) switched from INITIALIZING to >>> > FAILED with failure cause: org.apache.kafka.common.KafkaException: >>> > Unexpected error in InitProducerIdResponse; Producer attempted an >>> > operation with an old epoch. Either there is a newer producer with the >>> > same transactionalId, or the producer's transaction has been expired by >>> > the broker. >>> > at org.apache.kafka.clients.producer.internals. >>> > >>> TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager >>> > .java:1352) >>> > at org.apache.kafka.clients.producer.internals. >>> > >>> TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java: >>> > 1260) >>> > at >>> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse >>> > .java:109) >>> > at org.apache.kafka.clients.NetworkClient.completeResponses( >>> > NetworkClient.java:572) >>> > at >>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564) >>> > at org.apache.kafka.clients.producer.internals.Sender >>> > .maybeSendAndPollTransactionalRequest(Sender.java:414) >>> > at >>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender >>> > .java:312) >>> > at >>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java: >>> > 239) >>> > at java.lang.Thread.run(Thread.java:748) >>> > >>> >>