guozhangwang commented on a change in pull request #10880:
URL: https://github.com/apache/kafka/pull/10880#discussion_r654615386
##########
File path:
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
##########
@@ -2670,6 +2671,139 @@ public void testTransactionalRequestsSentOnShutdown() {
}
}
+ @Test
+ public void testRecordsFlushedImmediatelyOnTransactionCompletion() throws
Exception {
+ try (Metrics m = new Metrics()) {
+ int lingerMs = 50;
+ SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+ TransactionManager txnManager = new TransactionManager(logContext,
"txnId", 6000, 100, apiVersions);
+ setupWithTransactionState(txnManager, lingerMs);
+
+ Sender sender = new Sender(logContext, client, metadata,
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+ 1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS,
txnManager, apiVersions);
+
+ // Begin a transaction and successfully add one partition to it.
+ ProducerIdAndEpoch producerIdAndEpoch = new
ProducerIdAndEpoch(123456L, (short) 0);
+ doInitTransactions(txnManager, producerIdAndEpoch);
+ txnManager.beginTransaction();
+ addPartitionToTxn(sender, txnManager, tp0);
+
+ // Send a couple records and assert that they are not sent
immediately (due to linger).
+ appendToAccumulator(tp0);
+ appendToAccumulator(tp0);
+ sender.runOnce();
+ assertFalse(client.hasInFlightRequests());
+
+ // Now begin the commit and assert that the Produce request is
sent immediately
+ // without waiting for the linger.
+ txnManager.beginCommit();
+ runUntil(sender, client::hasInFlightRequests);
+
+ // Respond to the produce request and wait for the EndTxn request
to be sent.
+ respondToProduce(tp0, Errors.NONE, 1L);
+ runUntil(sender, txnManager::hasInFlightRequest);
+
+ // Respond to the expected EndTxn request.
+ respondToEndTxn(Errors.NONE);
+ runUntil(sender, txnManager::isReady);
+
+ // Finally, we want to assert that the linger time is still
effective
+ // when the new transaction begins.
+ txnManager.beginTransaction();
+ addPartitionToTxn(sender, txnManager, tp0);
+
+ appendToAccumulator(tp0);
+ appendToAccumulator(tp0);
+ time.sleep(lingerMs - 1);
+ sender.runOnce();
+ assertFalse(client.hasInFlightRequests());
+ assertTrue(accumulator.hasUndrained());
+
+ time.sleep(1);
+ runUntil(sender, client::hasInFlightRequests);
+ assertFalse(accumulator.hasUndrained());
+ }
+ }
+
+ @Test
+ public void testAwaitPendingRecordsBeforeCommittingTransaction() throws
Exception {
+ try (Metrics m = new Metrics()) {
+ SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+ TransactionManager txnManager = new TransactionManager(logContext,
"txnId", 6000, 100, apiVersions);
+ setupWithTransactionState(txnManager);
+
+ Sender sender = new Sender(logContext, client, metadata,
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+ 1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS,
txnManager, apiVersions);
+
+ // Begin a transaction and successfully add one partition to it.
+ ProducerIdAndEpoch producerIdAndEpoch = new
ProducerIdAndEpoch(123456L, (short) 0);
+ doInitTransactions(txnManager, producerIdAndEpoch);
+ txnManager.beginTransaction();
+ addPartitionToTxn(sender, txnManager, tp0);
+
+ // Send one Produce request.
+ appendToAccumulator(tp0);
+ runUntil(sender, () -> client.requests().size() == 1);
+ assertFalse(accumulator.hasUndrained());
+ assertTrue(client.hasInFlightRequests());
+ assertTrue(txnManager.hasInflightBatches(tp0));
+
+ // Enqueue another record and then commit the transaction. We
expect the unsent record to
+ // get sent before the transaction can be completed.
+ appendToAccumulator(tp0);
+ txnManager.beginCommit();
+ runUntil(sender, () -> client.requests().size() == 2);
+
+ assertTrue(txnManager.isCompleting());
+ assertFalse(txnManager.hasInFlightRequest());
+ assertTrue(txnManager.hasInflightBatches(tp0));
+
+ // Now respond to the pending Produce requests.
+ respondToProduce(tp0, Errors.NONE, 0L);
+ respondToProduce(tp0, Errors.NONE, 1L);
+ runUntil(sender, txnManager::hasInFlightRequest);
+
+ // Finally, respond to the expected EndTxn request.
+ respondToEndTxn(Errors.NONE);
+ runUntil(sender, txnManager::isReady);
+ }
+ }
+
+ private void addPartitionToTxn(Sender sender, TransactionManager
txnManager, TopicPartition tp) {
+ txnManager.maybeAddPartitionToTransaction(tp);
+ client.prepareResponse(new AddPartitionsToTxnResponse(0,
Collections.singletonMap(tp, Errors.NONE)));
+ runUntil(sender, () -> txnManager.isPartitionAdded(tp));
+ assertFalse(txnManager.hasInFlightRequest());
+ }
+
+ private void respondToProduce(TopicPartition tp, Errors error, long
offset) {
+ client.respond(
+ request -> request instanceof ProduceRequest,
+ produceResponse(tp, offset, error, 0)
+ );
+
+ }
+
+ private void respondToEndTxn(Errors error) {
+ client.respond(
+ request -> request instanceof EndTxnRequest,
+ new EndTxnResponse(new EndTxnResponseData()
+ .setErrorCode(error.code())
+ .setThrottleTimeMs(0))
+ );
+ }
+
+ private void runUntil(Sender sender, Supplier<Boolean> condition) {
Review comment:
I remember we have similar utils in `TransactionManagerTest`, could we
consolidate them?
##########
File path:
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
##########
@@ -2670,6 +2671,139 @@ public void testTransactionalRequestsSentOnShutdown() {
}
}
+ @Test
+ public void testRecordsFlushedImmediatelyOnTransactionCompletion() throws
Exception {
+ try (Metrics m = new Metrics()) {
+ int lingerMs = 50;
+ SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+ TransactionManager txnManager = new TransactionManager(logContext,
"txnId", 6000, 100, apiVersions);
+ setupWithTransactionState(txnManager, lingerMs);
+
+ Sender sender = new Sender(logContext, client, metadata,
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+ 1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS,
txnManager, apiVersions);
+
+ // Begin a transaction and successfully add one partition to it.
+ ProducerIdAndEpoch producerIdAndEpoch = new
ProducerIdAndEpoch(123456L, (short) 0);
+ doInitTransactions(txnManager, producerIdAndEpoch);
+ txnManager.beginTransaction();
+ addPartitionToTxn(sender, txnManager, tp0);
+
+ // Send a couple records and assert that they are not sent
immediately (due to linger).
+ appendToAccumulator(tp0);
+ appendToAccumulator(tp0);
+ sender.runOnce();
+ assertFalse(client.hasInFlightRequests());
+
+ // Now begin the commit and assert that the Produce request is
sent immediately
+ // without waiting for the linger.
+ txnManager.beginCommit();
+ runUntil(sender, client::hasInFlightRequests);
+
+ // Respond to the produce request and wait for the EndTxn request
to be sent.
+ respondToProduce(tp0, Errors.NONE, 1L);
+ runUntil(sender, txnManager::hasInFlightRequest);
+
+ // Respond to the expected EndTxn request.
+ respondToEndTxn(Errors.NONE);
+ runUntil(sender, txnManager::isReady);
+
+ // Finally, we want to assert that the linger time is still
effective
+ // when the new transaction begins.
+ txnManager.beginTransaction();
+ addPartitionToTxn(sender, txnManager, tp0);
+
+ appendToAccumulator(tp0);
+ appendToAccumulator(tp0);
+ time.sleep(lingerMs - 1);
+ sender.runOnce();
+ assertFalse(client.hasInFlightRequests());
+ assertTrue(accumulator.hasUndrained());
+
+ time.sleep(1);
+ runUntil(sender, client::hasInFlightRequests);
+ assertFalse(accumulator.hasUndrained());
+ }
+ }
+
+ @Test
+ public void testAwaitPendingRecordsBeforeCommittingTransaction() throws
Exception {
+ try (Metrics m = new Metrics()) {
+ SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+ TransactionManager txnManager = new TransactionManager(logContext,
"txnId", 6000, 100, apiVersions);
+ setupWithTransactionState(txnManager);
+
+ Sender sender = new Sender(logContext, client, metadata,
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+ 1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS,
txnManager, apiVersions);
+
+ // Begin a transaction and successfully add one partition to it.
+ ProducerIdAndEpoch producerIdAndEpoch = new
ProducerIdAndEpoch(123456L, (short) 0);
+ doInitTransactions(txnManager, producerIdAndEpoch);
+ txnManager.beginTransaction();
+ addPartitionToTxn(sender, txnManager, tp0);
+
+ // Send one Produce request.
+ appendToAccumulator(tp0);
+ runUntil(sender, () -> client.requests().size() == 1);
+ assertFalse(accumulator.hasUndrained());
+ assertTrue(client.hasInFlightRequests());
+ assertTrue(txnManager.hasInflightBatches(tp0));
+
+ // Enqueue another record and then commit the transaction. We
expect the unsent record to
+ // get sent before the transaction can be completed.
+ appendToAccumulator(tp0);
+ txnManager.beginCommit();
+ runUntil(sender, () -> client.requests().size() == 2);
+
+ assertTrue(txnManager.isCompleting());
+ assertFalse(txnManager.hasInFlightRequest());
+ assertTrue(txnManager.hasInflightBatches(tp0));
+
+ // Now respond to the pending Produce requests.
+ respondToProduce(tp0, Errors.NONE, 0L);
+ respondToProduce(tp0, Errors.NONE, 1L);
+ runUntil(sender, txnManager::hasInFlightRequest);
+
+ // Finally, respond to the expected EndTxn request.
+ respondToEndTxn(Errors.NONE);
+ runUntil(sender, txnManager::isReady);
+ }
+ }
+
+ private void addPartitionToTxn(Sender sender, TransactionManager
txnManager, TopicPartition tp) {
+ txnManager.maybeAddPartitionToTransaction(tp);
+ client.prepareResponse(new AddPartitionsToTxnResponse(0,
Collections.singletonMap(tp, Errors.NONE)));
+ runUntil(sender, () -> txnManager.isPartitionAdded(tp));
+ assertFalse(txnManager.hasInFlightRequest());
+ }
+
+ private void respondToProduce(TopicPartition tp, Errors error, long
offset) {
+ client.respond(
+ request -> request instanceof ProduceRequest,
+ produceResponse(tp, offset, error, 0)
+ );
+
+ }
+
+ private void respondToEndTxn(Errors error) {
+ client.respond(
+ request -> request instanceof EndTxnRequest,
+ new EndTxnResponse(new EndTxnResponseData()
+ .setErrorCode(error.code())
+ .setThrottleTimeMs(0))
+ );
+ }
+
+ private void runUntil(Sender sender, Supplier<Boolean> condition) {
Review comment:
Yup, I think that's also fine.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]