[ 
https://issues.apache.org/jira/browse/FLINK-37356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated FLINK-37356:
----------------------------------
    Description: 
In my production environment,  READ_COMMITTED consumer can no longer consume 
any records. Then I found that the LSO of the partition doesn't change for a 
long time.  I lookup all the log in Kafka cluster, then find that there is a 
transaction lacking AddPartitionsToTxnRequest.

 

At first, I think the problem is caused by 
https://issues.apache.org/jira/browse/FLINK-31363 because my Kafka cluster log 
contains InvalidTxnStateException. However, though the transaction is in an 
invalid state,  no any data is written into Kafka topic partition in this 
transaction( because in this case, the transaction is empty). It will not 
influence any Kafka topic partition's LSO, thus consumer won't be blocked.

 

Then I check the code of Kafka client, it seems no way to produce data without 
AddPartitionsToTxnRequest done because the the `Sender` will refuse to dequeue 
batches from the accumulator until they have been added to the transaction.
{code:java}
// org.apache.kafka.clients.producer.KafkaProducer#doSend

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback 
callback) {
// ..ignore code

// Add the partition to the transaction (if in progress) after it has been 
successfully
// appended to the accumulator. We cannot do it before because the partition 
may be
// unknown or the initially selected partition may be changed when the batch is 
closed
// (as indicated by `abortForNewBatch`). Note that the `Sender` will refuse to 
dequeue
// batches from the accumulator until they have been added to the transaction.
if (transactionManager != null) {
    transactionManager.maybeAddPartition(appendCallbacks.topicPartition());


// ignore code
}{code}
{code:java}
//org.apache.kafka.clients.producer.internals.RecordAccumulator#drainBatchesForOneNode
 
if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // 
there is a rare case that a single batch size is larger than the request size 
due to // compression; in this case we will still eventually send this batch in 
a single request break; } else { if (shouldStopDrainBatchesForPartition(first, 
tp)) break; }
{code}
 

Then I have a idea that if a TransactionManager which doesn't clear 
partitionsInTransaction is reused again, the AddPartitionsToTxnRequest will be 
sent again. 
 
The flink kafka connector also reuse and recycle KafkaProducer: 
KafkaCommitter will recycle the producer to
producerPool after the transaction complete or exception,  and then KafkaWriter 
will reuse it from producerPool.
{code:java}
// code placeholder
org.apache.flink.connector.kafka.sink.KafkaCommitter#commit

@Override
public void commit(Collection<CommitRequest<KafkaCommittable>> requests)
        throws IOException, InterruptedException {
    for (CommitRequest<KafkaCommittable> request : requests) {
             
            producer.commitTransaction();
            producer.flush();
            recyclable.ifPresent(Recyclable::close);
        } catch (RetriableException e) {
            request.retryLater();
        } catch (ProducerFencedException e) {
            recyclable.ifPresent(Recyclable::close);
            request.signalFailedWithKnownReason(e);
        } catch (InvalidTxnStateException e) {
            recyclable.ifPresent(Recyclable::close);
            request.signalFailedWithKnownReason(e);
        } catch (UnknownProducerIdException e) {
            LOG.error(
            recyclable.ifPresent(Recyclable::close);
            request.signalFailedWithKnownReason(e);
        } catch (Exception e) {
            recyclable.ifPresent(Recyclable::close);
            request.signalFailedWithUnknownReason(e);
        }
    }
} {code}
 
If KafkaCommitter meet an exception and doesn't sucess to commitTransaction, 
the partitionsInTransaction in 
TransactionManager won't be 
clear(org.apache.kafka.clients.producer.internals.TransactionManager#completeTransaction).
  If KafkaWriter which reuse same producer and send data to  same partitions in 
next transaction, the AddPartitionsToTxnRequest won't be send.
 
Thus, in FlinkKafkaInternalProducer#setTransactionId, we should clear the 
partition information of TransactionManager.(now we just set 
transactionalId and currentState.
{code:java}
// code placeholder
public void setTransactionId(String transactionalId) {
    if (!transactionalId.equals(this.transactionalId)) {
        checkState(
                !inTransaction,
                String.format("Another transaction %s is still open.", 
transactionalId));
        LOG.debug("Change transaction id from {} to {}", this.transactionalId, 
transactionalId);
        Object transactionManager = getTransactionManager();
        synchronized (transactionManager) {
            setField(transactionManager, "transactionalId", transactionalId);
            setField(
                    transactionManager,
                    "currentState",
                    getTransactionManagerState("UNINITIALIZED"));
            this.transactionalId = transactionalId;
        }
    }
} {code}
 
 

 
 

  was:
In my production environment,  READ_COMMITTED consumer can no longer consume 
any records. Then I found that the LSO of the partition doesn't change for a 
long time.  I lookup all the log in Kafka cluster, then find that there is a 
transaction lacking AddPartitionsToTxnRequest.

 

At first, I think the problem is caused by 
https://issues.apache.org/jira/browse/FLINK-31363 because my Kafka cluster log 
contains InvalidTxnStateException. However, though the transaction is in an 
invalid state,  no any data is written into Kafka topic partition in this 
transaction( because in this case, the transaction is empty). It will not 
influence any Kafka topic partition's LSO, why data cannot be consumed in 
READ_COMMITTED mode?

 

Then I check the code of Kafka client, it seems no way to produce data without 
AddPartitionsToTxnRequest done because the the `Sender` will refuse to dequeue 
batches from the accumulator until they have been added to the transaction.
{code:java}
// org.apache.kafka.clients.producer.KafkaProducer#doSend

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback 
callback) {
// ..ignore code

// Add the partition to the transaction (if in progress) after it has been 
successfully
// appended to the accumulator. We cannot do it before because the partition 
may be
// unknown or the initially selected partition may be changed when the batch is 
closed
// (as indicated by `abortForNewBatch`). Note that the `Sender` will refuse to 
dequeue
// batches from the accumulator until they have been added to the transaction.
if (transactionManager != null) {
    transactionManager.maybeAddPartition(appendCallbacks.topicPartition());


// ignore code
}{code}
{code:java}
//org.apache.kafka.clients.producer.internals.RecordAccumulator#drainBatchesForOneNode
 
if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // 
there is a rare case that a single batch size is larger than the request size 
due to // compression; in this case we will still eventually send this batch in 
a single request break; } else { if (shouldStopDrainBatchesForPartition(first, 
tp)) break; }
{code}
 

Then I have a idea that if a TransactionManager which doesn't clear 
partitionsInTransaction is reused again, the AddPartitionsToTxnRequest will be 
sent again. 
 
The flink kafka connector also reuse and recycle KafkaProducer: 
KafkaCommitter will recycle the producer to
producerPool after the transaction complete or exception,  and then KafkaWriter 
will reuse it from producerPool.
{code:java}
// code placeholder
org.apache.flink.connector.kafka.sink.KafkaCommitter#commit

@Override
public void commit(Collection<CommitRequest<KafkaCommittable>> requests)
        throws IOException, InterruptedException {
    for (CommitRequest<KafkaCommittable> request : requests) {
             
            producer.commitTransaction();
            producer.flush();
            recyclable.ifPresent(Recyclable::close);
        } catch (RetriableException e) {
            request.retryLater();
        } catch (ProducerFencedException e) {
            recyclable.ifPresent(Recyclable::close);
            request.signalFailedWithKnownReason(e);
        } catch (InvalidTxnStateException e) {
            recyclable.ifPresent(Recyclable::close);
            request.signalFailedWithKnownReason(e);
        } catch (UnknownProducerIdException e) {
            LOG.error(
            recyclable.ifPresent(Recyclable::close);
            request.signalFailedWithKnownReason(e);
        } catch (Exception e) {
            recyclable.ifPresent(Recyclable::close);
            request.signalFailedWithUnknownReason(e);
        }
    }
} {code}
 
If KafkaCommitter meet an exception and doesn't sucess to commitTransaction, 
the partitionsInTransaction in 
TransactionManager won't be 
clear(org.apache.kafka.clients.producer.internals.TransactionManager#completeTransaction).
  If KafkaWriter which reuse same producer and send data to  same partitions in 
next transaction, the AddPartitionsToTxnRequest won't be send.
 
Thus, in FlinkKafkaInternalProducer#setTransactionId, we should clear the 
partition information of TransactionManager.(now we just set 
transactionalId and currentState.
{code:java}
// code placeholder
public void setTransactionId(String transactionalId) {
    if (!transactionalId.equals(this.transactionalId)) {
        checkState(
                !inTransaction,
                String.format("Another transaction %s is still open.", 
transactionalId));
        LOG.debug("Change transaction id from {} to {}", this.transactionalId, 
transactionalId);
        Object transactionManager = getTransactionManager();
        synchronized (transactionManager) {
            setField(transactionManager, "transactionalId", transactionalId);
            setField(
                    transactionManager,
                    "currentState",
                    getTransactionManagerState("UNINITIALIZED"));
            this.transactionalId = transactionalId;
        }
    }
} {code}
 
 


 
 


> Recycle use of kafka producer(which commit error) maybe send data without 
> AddPartitionsToTxnRequest
> ---------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-37356
>                 URL: https://issues.apache.org/jira/browse/FLINK-37356
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: kafka-3.4.0
>            Reporter: Hongshun Wang
>            Priority: Major
>
> In my production environment,  READ_COMMITTED consumer can no longer consume 
> any records. Then I found that the LSO of the partition doesn't change for a 
> long time.  I lookup all the log in Kafka cluster, then find that there is a 
> transaction lacking AddPartitionsToTxnRequest.
>  
> At first, I think the problem is caused by 
> https://issues.apache.org/jira/browse/FLINK-31363 because my Kafka cluster 
> log contains InvalidTxnStateException. However, though the transaction is in 
> an invalid state,  no any data is written into Kafka topic partition in this 
> transaction( because in this case, the transaction is empty). It will not 
> influence any Kafka topic partition's LSO, thus consumer won't be blocked.
>  
> Then I check the code of Kafka client, it seems no way to produce data 
> without AddPartitionsToTxnRequest done because the the `Sender` will refuse 
> to dequeue batches from the accumulator until they have been added to the 
> transaction.
> {code:java}
> // org.apache.kafka.clients.producer.KafkaProducer#doSend
> private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback 
> callback) {
> // ..ignore code
> // Add the partition to the transaction (if in progress) after it has been 
> successfully
> // appended to the accumulator. We cannot do it before because the partition 
> may be
> // unknown or the initially selected partition may be changed when the batch 
> is closed
> // (as indicated by `abortForNewBatch`). Note that the `Sender` will refuse 
> to dequeue
> // batches from the accumulator until they have been added to the transaction.
> if (transactionManager != null) {
>     transactionManager.maybeAddPartition(appendCallbacks.topicPartition());
> // ignore code
> }{code}
> {code:java}
> //org.apache.kafka.clients.producer.internals.RecordAccumulator#drainBatchesForOneNode
>  
> if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // 
> there is a rare case that a single batch size is larger than the request size 
> due to // compression; in this case we will still eventually send this batch 
> in a single request break; } else { if 
> (shouldStopDrainBatchesForPartition(first, tp)) break; }
> {code}
>  
> Then I have a idea that if a TransactionManager which doesn't clear 
> partitionsInTransaction is reused again, the AddPartitionsToTxnRequest will 
> be sent again. 
>  
> The flink kafka connector also reuse and recycle KafkaProducer: 
> KafkaCommitter will recycle the producer to
> producerPool after the transaction complete or exception,  and then 
> KafkaWriter will reuse it from producerPool.
> {code:java}
> // code placeholder
> org.apache.flink.connector.kafka.sink.KafkaCommitter#commit
> @Override
> public void commit(Collection<CommitRequest<KafkaCommittable>> requests)
>         throws IOException, InterruptedException {
>     for (CommitRequest<KafkaCommittable> request : requests) {
>              
>             producer.commitTransaction();
>             producer.flush();
>             recyclable.ifPresent(Recyclable::close);
>         } catch (RetriableException e) {
>             request.retryLater();
>         } catch (ProducerFencedException e) {
>             recyclable.ifPresent(Recyclable::close);
>             request.signalFailedWithKnownReason(e);
>         } catch (InvalidTxnStateException e) {
>             recyclable.ifPresent(Recyclable::close);
>             request.signalFailedWithKnownReason(e);
>         } catch (UnknownProducerIdException e) {
>             LOG.error(
>             recyclable.ifPresent(Recyclable::close);
>             request.signalFailedWithKnownReason(e);
>         } catch (Exception e) {
>             recyclable.ifPresent(Recyclable::close);
>             request.signalFailedWithUnknownReason(e);
>         }
>     }
> } {code}
>  
> If KafkaCommitter meet an exception and doesn't sucess to commitTransaction, 
> the partitionsInTransaction in 
> TransactionManager won't be 
> clear(org.apache.kafka.clients.producer.internals.TransactionManager#completeTransaction).
>   If KafkaWriter which reuse same producer and send data to  same partitions 
> in next transaction, the AddPartitionsToTxnRequest won't be send.
>  
> Thus, in FlinkKafkaInternalProducer#setTransactionId, we should clear the 
> partition information of TransactionManager.(now we just set 
> transactionalId and currentState.
> {code:java}
> // code placeholder
> public void setTransactionId(String transactionalId) {
>     if (!transactionalId.equals(this.transactionalId)) {
>         checkState(
>                 !inTransaction,
>                 String.format("Another transaction %s is still open.", 
> transactionalId));
>         LOG.debug("Change transaction id from {} to {}", 
> this.transactionalId, transactionalId);
>         Object transactionManager = getTransactionManager();
>         synchronized (transactionManager) {
>             setField(transactionManager, "transactionalId", transactionalId);
>             setField(
>                     transactionManager,
>                     "currentState",
>                     getTransactionManagerState("UNINITIALIZED"));
>             this.transactionalId = transactionalId;
>         }
>     }
> } {code}
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to