[ 
https://issues.apache.org/jira/browse/FLINK-37356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated FLINK-37356:
----------------------------------
    Summary: Recycle use of kafka producer(which commit error) may send data 
without AddPartitionsToTxnRequest  (was: Recycle use of kafka producer(which 
commit error) will send data without AddPartitionsToTxnRequest)

> Recycle use of kafka producer(which commit error) may send data without 
> AddPartitionsToTxnRequest
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-37356
>                 URL: https://issues.apache.org/jira/browse/FLINK-37356
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: kafka-3.4.0
>            Reporter: Hongshun Wang
>            Priority: Major
>
> In my production environment,  READ_COMMITTED consumer can no longer consume 
> any records. Then I found that the LSO of the partition doesn't change for a 
> long time.  I lookup all the log in Kafka cluster, then find that there is a 
> transaction lacking AddPartitionsToTxnRequest.
>  
> At first, I think the problem is caused by 
> https://issues.apache.org/jira/browse/FLINK-31363 because my Kafka cluster 
> log contains InvalidTxnStateException. However, though the transaction is in 
> an invalid state,  no any data is written into Kafka topic partition in this 
> transaction( because in this case, the transaction is empty). It will not 
> influence any Kafka topic partition's LSO, why data cannot be consumed in 
> READ_COMMITTED mode?
>  
> Then I check the code of Kafka client, it seems no way to produce data 
> without AddPartitionsToTxnRequest done because the the `Sender` will refuse 
> to dequeue batches from the accumulator until they have been added to the 
> transaction.
> {code:java}
> // org.apache.kafka.clients.producer.KafkaProducer#doSend
> private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback 
> callback) {
> // ..ignore code
> // Add the partition to the transaction (if in progress) after it has been 
> successfully
> // appended to the accumulator. We cannot do it before because the partition 
> may be
> // unknown or the initially selected partition may be changed when the batch 
> is closed
> // (as indicated by `abortForNewBatch`). Note that the `Sender` will refuse 
> to dequeue
> // batches from the accumulator until they have been added to the transaction.
> if (transactionManager != null) {
>     transactionManager.maybeAddPartition(appendCallbacks.topicPartition());
> // ignore code
> }{code}
> {code:java}
> //org.apache.kafka.clients.producer.internals.RecordAccumulator#drainBatchesForOneNode
>  
> if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // 
> there is a rare case that a single batch size is larger than the request size 
> due to // compression; in this case we will still eventually send this batch 
> in a single request break; } else { if 
> (shouldStopDrainBatchesForPartition(first, tp)) break; }
> {code}
>  
> Then I have a idea that if a TransactionManager which doesn't clear 
> partitionsInTransaction is reused again, the AddPartitionsToTxnRequest will 
> be sent again. 
>  
> The flink kafka connector also reuse and recycle KafkaProducer: 
> KafkaCommitter will recycle the producer to
> producerPool after the transaction complete or exception,  and then 
> KafkaWriter will reuse it from producerPool.
> {code:java}
> // code placeholder
> org.apache.flink.connector.kafka.sink.KafkaCommitter#commit
> @Override
> public void commit(Collection<CommitRequest<KafkaCommittable>> requests)
>         throws IOException, InterruptedException {
>     for (CommitRequest<KafkaCommittable> request : requests) {
>              
>             producer.commitTransaction();
>             producer.flush();
>             recyclable.ifPresent(Recyclable::close);
>         } catch (RetriableException e) {
>             request.retryLater();
>         } catch (ProducerFencedException e) {
>             recyclable.ifPresent(Recyclable::close);
>             request.signalFailedWithKnownReason(e);
>         } catch (InvalidTxnStateException e) {
>             recyclable.ifPresent(Recyclable::close);
>             request.signalFailedWithKnownReason(e);
>         } catch (UnknownProducerIdException e) {
>             LOG.error(
>             recyclable.ifPresent(Recyclable::close);
>             request.signalFailedWithKnownReason(e);
>         } catch (Exception e) {
>             recyclable.ifPresent(Recyclable::close);
>             request.signalFailedWithUnknownReason(e);
>         }
>     }
> } {code}
>  
> If KafkaCommitter meet an exception and doesn't sucess to commitTransaction, 
> the partitionsInTransaction in 
> TransactionManager won't be 
> clear(org.apache.kafka.clients.producer.internals.TransactionManager#completeTransaction).
>   If KafkaWriter which reuse same producer and send data to  same partitions 
> in next transaction, the AddPartitionsToTxnRequest won't be send.
>  
> Thus, in FlinkKafkaInternalProducer#setTransactionId, we should clear the 
> partition information of TransactionManager.(now we just set 
> transactionalId and currentState.
> {code:java}
> // code placeholder
> public void setTransactionId(String transactionalId) {
>     if (!transactionalId.equals(this.transactionalId)) {
>         checkState(
>                 !inTransaction,
>                 String.format("Another transaction %s is still open.", 
> transactionalId));
>         LOG.debug("Change transaction id from {} to {}", 
> this.transactionalId, transactionalId);
>         Object transactionManager = getTransactionManager();
>         synchronized (transactionManager) {
>             setField(transactionManager, "transactionalId", transactionalId);
>             setField(
>                     transactionManager,
>                     "currentState",
>                     getTransactionManagerState("UNINITIALIZED"));
>             this.transactionalId = transactionalId;
>         }
>     }
> } {code}
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to