Thanks for the update. Skimming over the code it looks indeed that we are
overwriting the values of the static value ProducerIdAndEpoch.NONE. I am
not 100% how this will cause the observed problem, though. I am also not a
Flink Kafka connector and Kafka expert so I would appreciate it if someone
more familiar could double check this part of the code.

Concerning the required changing of the UID of an operator Piotr, is this a
known issue and documented somewhere? I find this rather surprising from a
user's point of view.

Cheers,
Till

On Thu, Jun 3, 2021 at 8:53 AM Till Rohrmann <trohrm...@apache.org> wrote:

> Forwarding 周瑞's message to a duplicate thread:
>
> After our analysis, we found a bug in the
> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.resumeTransaction
> method
> The analysis process is as follows:
>
>
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction
> public void initializeState(FunctionInitializationContext context) throws
> Exception {
>     state = context.getOperatorStateStore().getListState(stateDescriptor);
>     boolean recoveredUserContext = false;
>     if (context.isRestored()) {
>         LOG.info("{} - restoring state", name());
>         for (State<TXN, CONTEXT> operatorState : state.get()) {
>             userContext = operatorState.getContext();
>             List<TransactionHolder<TXN>> recoveredTransactions =
>                     operatorState.getPendingCommitTransactions();
>             List<TXN> handledTransactions = new
> ArrayList<>(recoveredTransactions.size() + 1);
>             for (TransactionHolder<TXN> recoveredTransaction :
> recoveredTransactions) {
>                 // If this fails to succeed eventually, there is actually
> data loss
>                 recoverAndCommitInternal(recoveredTransaction);
>                 handledTransactions.add(recoveredTransaction.handle);
>                 LOG.info("{} committed recovered transaction {}", name(),
> recoveredTransaction);
>             }
>
>             {
>                 TXN transaction =
> operatorState.getPendingTransaction().handle;
>                 recoverAndAbort(transaction);
>                 handledTransactions.add(transaction);
>                 LOG.info(
>                         "{} aborted recovered transaction {}",
>                         name(),
>                         operatorState.getPendingTransaction());
>             }
>
>             if (userContext.isPresent()) {
>                 finishRecoveringContext(handledTransactions);
>                 recoveredUserContext = true;
>             }
>         }
>     }
>
> (1)recoverAndCommitInternal(recoveredTransaction);
> The previous transactionalid, producerId and epoch in the state are used
> to commit the transaction,However, we find that the producerIdAndEpoch of
> transactionManager is a static constant (ProducerIdAndEpoch.NONE), which
> pollutes the static constant ProducerIdAndEpoch.NONE
>
> @Override
> protected void recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState
> transaction) {
>     if (transaction.isTransactional()) {
>         FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
>         try {
>             producer =
> initTransactionalProducer(transaction.transactionalId, false);
>             producer.resumeTransaction(transaction.producerId,
> transaction.epoch);
>             producer.commitTransaction();
>         } catch (InvalidTxnStateException | ProducerFencedException ex) {
>             // That means we have committed this transaction before.
>             LOG.warn(
>                     "Encountered error {} while recovering transaction {}.
> "
>                             + "Presumably this transaction has been
> already committed before",
>                     ex,
>                     transaction);
>         } finally {
>             if (producer != null) {
>                 producer.close(0, TimeUnit.SECONDS);
>             }
>         }
>     }
> }
>
> public void resumeTransaction(long producerId, short epoch) {
>     synchronized (producerClosingLock) {
>         ensureNotClosed();
>         Preconditions.checkState(
>                 producerId >= 0 && epoch >= 0,
>                 "Incorrect values for producerId %s and epoch %s",
>                 producerId,
>                 epoch);
>         LOG.info(
>                 "Attempting to resume transaction {} with producerId {}
> and epoch {}",
>                 transactionalId,
>                 producerId,
>                 epoch);
>
>         Object transactionManager = getField(kafkaProducer,
> "transactionManager");
>         synchronized (transactionManager) {
>             Object topicPartitionBookkeeper =
>                     getField(transactionManager,
> "topicPartitionBookkeeper");
>
>             invoke(
>                     transactionManager,
>                     "transitionTo",
>                     getEnum(
>
> "org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
>             invoke(topicPartitionBookkeeper, "reset");
>
>             Object producerIdAndEpoch = getField(transactionManager,
> "producerIdAndEpoch");
>             setField(producerIdAndEpoch, "producerId", producerId);
>             setField(producerIdAndEpoch, "epoch", epoch);
>
>             invoke(
>                     transactionManager,
>                     "transitionTo",
>                     getEnum(
>
> "org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
>
>             invoke(
>                     transactionManager,
>                     "transitionTo",
>                     getEnum(
>
> "org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
>             setField(transactionManager, "transactionStarted", true);
>         }
>     }
> }
>
>
> public TransactionManager(LogContext logContext,
>                           String transactionalId,
>                           int transactionTimeoutMs,
>                           long retryBackoffMs,
>                           ApiVersions apiVersions) {
>     this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
>     this.transactionalId = transactionalId;
>     this.log = logContext.logger(TransactionManager.class);
>     this.transactionTimeoutMs = transactionTimeoutMs;
>     this.transactionCoordinator = null;
>     this.consumerGroupCoordinator = null;
>     this.newPartitionsInTransaction = new HashSet<>();
>     this.pendingPartitionsInTransaction = new HashSet<>();
>     this.partitionsInTransaction = new HashSet<>();
>     this.pendingRequests = new PriorityQueue<>(10,
> Comparator.comparingInt(o -> o.priority().priority));
>     this.pendingTxnOffsetCommits = new HashMap<>();
>     this.partitionsWithUnresolvedSequences = new HashMap<>();
>     this.partitionsToRewriteSequences = new HashSet<>();
>     this.retryBackoffMs = retryBackoffMs;
>     this.topicPartitionBookkeeper = new TopicPartitionBookkeeper();
>     this.apiVersions = apiVersions;
> }
>
>
>
> public class ProducerIdAndEpoch {
>     public static final ProducerIdAndEpoch NONE = new
> ProducerIdAndEpoch(RecordBatch.NO_PRODUCER_ID,
> RecordBatch.NO_PRODUCER_EPOCH);
>
>     public final long producerId;
>     public final short epoch;
>
>     public ProducerIdAndEpoch(long producerId, short epoch) {
>         this.producerId = producerId;
>         this.epoch = epoch;
>     }
>
>     public boolean isValid() {
>         return RecordBatch.NO_PRODUCER_ID < producerId;
>     }
>
>     @Override
>     public String toString() {
>         return "(producerId=" + producerId + ", epoch=" + epoch + ")";
>     }
>
>     @Override
>     public boolean equals(Object o) {
>         if (this == o) return true;
>         if (o == null || getClass() != o.getClass()) return false;
>
>         ProducerIdAndEpoch that = (ProducerIdAndEpoch) o;
>
>         if (producerId != that.producerId) return false;
>         return epoch == that.epoch;
>     }
>
>     @Override
>     public int hashCode() {
>         int result = (int) (producerId ^ (producerId >>> 32));
>         result = 31 * result + (int) epoch;
>         return result;
>     }
>
> }
>
> (2)In the second step,
> recoverAndAbort(FlinkKafkaProducer.KafkaTransactionState transaction), when
> initializing the transaction, producerId and epoch in the first step
> pollute ProducerIdAndEpoch.NONE. Therefore, when an initialization request
> is sent to Kafka, the values of the producerId and epoch  variables in the
> request parameter ProducerIdAndEpoch.NONE are equal to the values of the
> producerId and epoch  variables in the first transaction commit, not equal
> to - 1, - 1. So Kafka throws an exception:
> Unexpected error in InitProducerIdResponse; Producer attempted an
> operation with an old epoch. Either there is a newer producer with the same
> transactionalId, or the producer's transaction has been expired by the
> broker.
>     at
> org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
>     at
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
>     at
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>     at
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572)
>     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
>     at
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
>     at
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
>     at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>     at java.lang.Thread.run(Thread.java:748)
>
> protected void recoverAndAbort(FlinkKafkaProducer.KafkaTransactionState
> transaction) {
>     if (transaction.isTransactional()) {
>         FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
>         try {
>             producer =
> initTransactionalProducer(transaction.transactionalId, false);
>             producer.initTransactions();
>         } finally {
>             if (producer != null) {
>                 producer.close(0, TimeUnit.SECONDS);
>             }
>         }
>     }
> }
>
> public synchronized TransactionalRequestResult initializeTransactions() {
>     return initializeTransactions(ProducerIdAndEpoch.NONE);
> }
>
> synchronized TransactionalRequestResult
> initializeTransactions(ProducerIdAndEpoch producerIdAndEpoch) {
>     boolean isEpochBump = producerIdAndEpoch != ProducerIdAndEpoch.NONE;
>     return handleCachedTransactionRequestResult(() -> {
>         // If this is an epoch bump, we will transition the state as part
> of handling the EndTxnRequest
>         if (!isEpochBump) {
>             transitionTo(State.INITIALIZING);
>             log.info("Invoking InitProducerId for the first time in order
> to acquire a producer ID");
>         } else {
>             log.info("Invoking InitProducerId with current producer ID
> and epoch {} in order to bump the epoch", producerIdAndEpoch);
>         }
>         InitProducerIdRequestData requestData = new
> InitProducerIdRequestData()
>                 .setTransactionalId(transactionalId)
>                 .setTransactionTimeoutMs(transactionTimeoutMs)
>                 .setProducerId(producerIdAndEpoch.producerId)
>                 .setProducerEpoch(producerIdAndEpoch.epoch);
>         InitProducerIdHandler handler = new InitProducerIdHandler(new
> InitProducerIdRequest.Builder(requestData),
>                 isEpochBump);
>         enqueueRequest(handler);
>         return handler.result;
>     }, State.INITIALIZING);
> }
>
> On Wed, Jun 2, 2021 at 3:55 PM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
>> Hi,
>>
>> I think there is no generic way. If this error has happened indeed after
>> starting a second job from the same savepoint, or something like that, user
>> can change Sink's operator UID.
>>
>> If this is an issue of intentional recovery from an earlier
>> checkpoint/savepoint, maybe `FlinkKafkaProducer#setLogFailuresOnly` will be
>> helpful.
>>
>> Best, Piotrek
>>
>> wt., 1 cze 2021 o 15:16 Till Rohrmann <trohrm...@apache.org> napisał(a):
>>
>>> The error message says that we are trying to reuse a transaction id that
>>> is
>>> currently being used or has expired.
>>>
>>> I am not 100% sure how this can happen. My suspicion is that you have
>>> resumed a job multiple times from the same savepoint. Have you checked
>>> that
>>> there is no other job which has been resumed from the same savepoint and
>>> which is currently running or has run and completed checkpoints?
>>>
>>> @pnowojski <pnowoj...@apache.org> @Becket Qin <becket....@gmail.com> how
>>> does the transaction id generation ensures that we don't have a clash of
>>> transaction ids if we resume the same job multiple times from the same
>>> savepoint? From the code, I do see that we have a
>>> TransactionalIdsGenerator
>>> which is initialized with the taskName and the operator UID.
>>>
>>> fyi: @Arvid Heise <ar...@apache.org>
>>>
>>> Cheers,
>>> Till
>>>
>>>
>>> On Mon, May 31, 2021 at 11:10 AM 周瑞 <rui.z...@woqutech.com> wrote:
>>>
>>> > HI:
>>> >       When "sink.semantic = exactly-once", the following exception is
>>> > thrown when recovering from svaepoint
>>> >
>>> >        public static final String KAFKA_TABLE_FORMAT =
>>> >             "CREATE TABLE "+TABLE_NAME+" (\n" +
>>> >                     "  "+COLUMN_NAME+" STRING\n" +
>>> >                     ") WITH (\n" +
>>> >                     "   'connector' = 'kafka',\n" +
>>> >                     "   'topic' = '%s',\n" +
>>> >                     "   'properties.bootstrap.servers' = '%s',\n" +
>>> >                     "   'sink.semantic' = 'exactly-once',\n" +
>>> >                     "   'properties.transaction.timeout.ms' =
>>> > '900000',\n" +
>>> >                     "   'sink.partitioner' =
>>> > 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" +
>>> >                     "   'format' = 'dbz-json'\n" +
>>> >                     ")\n";
>>> >   [] - Source: TableSourceScan(table=[[default_catalog,
>>> default_database,
>>> > debezium_source]], fields=[data]) -> Sink: Sink
>>> > (table=[default_catalog.default_database.KafkaTable], fields=[data])
>>> (1/1
>>> > )#859 (075273be72ab01bf1afd3c066876aaa6) switched from INITIALIZING to
>>> > FAILED with failure cause: org.apache.kafka.common.KafkaException:
>>> > Unexpected error in InitProducerIdResponse; Producer attempted an
>>> > operation with an old epoch. Either there is a newer producer with the
>>> > same transactionalId, or the producer's transaction has been expired by
>>> > the broker.
>>> >     at org.apache.kafka.clients.producer.internals.
>>> >
>>> TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager
>>> > .java:1352)
>>> >     at org.apache.kafka.clients.producer.internals.
>>> >
>>> TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:
>>> > 1260)
>>> >     at
>>> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse
>>> > .java:109)
>>> >     at org.apache.kafka.clients.NetworkClient.completeResponses(
>>> > NetworkClient.java:572)
>>> >     at
>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
>>> >     at org.apache.kafka.clients.producer.internals.Sender
>>> > .maybeSendAndPollTransactionalRequest(Sender.java:414)
>>> >     at
>>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender
>>> > .java:312)
>>> >     at
>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:
>>> > 239)
>>> >     at java.lang.Thread.run(Thread.java:748)
>>> >
>>>
>>

Reply via email to