[ https://issues.apache.org/jira/browse/FLINK-37356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17928756#comment-17928756 ]
Arvid Heise commented on FLINK-37356: ------------------------------------- Yes that makes more sense. I just couldn't connect it to empty transactions. So my proposal is to close the producer and open a new one in this case. It should be rather rare. I'd avoid tinkering around with the internal state of the transactionManager even more because I fear that it could break when users use a different kafka client versions (that's already fragile). Do you mind if I try to fix it? > Recycle use of kafka producer(which commit error) maybe send data without > AddPartitionsToTxnRequest > --------------------------------------------------------------------------------------------------- > > Key: FLINK-37356 > URL: https://issues.apache.org/jira/browse/FLINK-37356 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka > Affects Versions: kafka-3.4.0 > Reporter: Hongshun Wang > Priority: Major > > In my production environment, READ_COMMITTED consumer can no longer consume > any records. Then I found that the LSO of the partition doesn't change for a > long time. I lookup all the log in Kafka cluster, then find that there is a > transaction lacking AddPartitionsToTxnRequest. > > At first, I think the problem is caused by > https://issues.apache.org/jira/browse/FLINK-31363 because my Kafka cluster > log contains InvalidTxnStateException. However, though the transaction is in > an invalid state, no data is written into Kafka topic partition in this > transaction( because in this case, the transaction is empty). It will not > influence any Kafka topic partition's LSO, thus consumer won't be blocked. > > Then I check the code of Kafka client, it seems no way to produce data > without AddPartitionsToTxnRequest done because the the `Sender` will refuse > to dequeue batches from the accumulator until they have been added to the > transaction. > {code:java} > // org.apache.kafka.clients.producer.KafkaProducer#doSend > private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback > callback) { > // ..ignore code > // Add the partition to the transaction (if in progress) after it has been > successfully > // appended to the accumulator. We cannot do it before because the partition > may be > // unknown or the initially selected partition may be changed when the batch > is closed > // (as indicated by `abortForNewBatch`). Note that the `Sender` will refuse > to dequeue > // batches from the accumulator until they have been added to the transaction. > if (transactionManager != null) { > transactionManager.maybeAddPartition(appendCallbacks.topicPartition()); > // ignore code > }{code} > {code:java} > //org.apache.kafka.clients.producer.internals.RecordAccumulator#drainBatchesForOneNode > > if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // > there is a rare case that a single batch size is larger than the request size > due to // compression; in this case we will still eventually send this batch > in a single request break; } else { if > (shouldStopDrainBatchesForPartition(first, tp)) break; } > {code} > > Then I have a idea that if a TransactionManager which doesn't clear > partitionsInTransaction is reused again, the AddPartitionsToTxnRequest will > be sent again. It maybe happen in Flink Kafka connector: > > 1. The flink kafka connector also reuse and recycle KafkaProducer: > KafkaCommitter will recycle the producer to > producerPool after the transaction complete or exception, and then > KafkaWriter will reuse it from producerPool. > {code:java} > // code placeholder > org.apache.flink.connector.kafka.sink.KafkaCommitter#commit > @Override > public void commit(Collection<CommitRequest<KafkaCommittable>> requests) > throws IOException, InterruptedException { > for (CommitRequest<KafkaCommittable> request : requests) { > > producer.commitTransaction(); > producer.flush(); > recyclable.ifPresent(Recyclable::close); > } catch (RetriableException e) { > request.retryLater(); > } catch (ProducerFencedException e) { > recyclable.ifPresent(Recyclable::close); > request.signalFailedWithKnownReason(e); > } catch (InvalidTxnStateException e) { > recyclable.ifPresent(Recyclable::close); > request.signalFailedWithKnownReason(e); > } catch (UnknownProducerIdException e) { > LOG.error( > recyclable.ifPresent(Recyclable::close); > request.signalFailedWithKnownReason(e); > } catch (Exception e) { > recyclable.ifPresent(Recyclable::close); > request.signalFailedWithUnknownReason(e); > } > } > } {code} > 2. If KafkaCommitter meet an exception and doesn't sucess to > commitTransaction, the partitionsInTransaction in > TransactionManager won't be > clear(org.apache.kafka.clients.producer.internals.TransactionManager#completeTransaction). > > 3. If KafkaWriter which reuse same producer and send data to same > partitions in next transaction, the AddPartitionsToTxnRequest won't be send. > > Thus, in FlinkKafkaInternalProducer#setTransactionId, we should clear the > partition information of TransactionManager.(now we just set > transactionalId and currentState. > {code:java} > // code placeholder > public void setTransactionId(String transactionalId) { > if (!transactionalId.equals(this.transactionalId)) { > checkState( > !inTransaction, > String.format("Another transaction %s is still open.", > transactionalId)); > LOG.debug("Change transaction id from {} to {}", > this.transactionalId, transactionalId); > Object transactionManager = getTransactionManager(); > synchronized (transactionManager) { > setField(transactionManager, "transactionalId", transactionalId); > setField( > transactionManager, > "currentState", > getTransactionManagerState("UNINITIALIZED")); > this.transactionalId = transactionalId; > } > } > } {code} > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)