hachikuji commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1038362105


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() 
throws InterruptedExcept
         assertTrue(secondResponseFuture.isDone());
     }
 
+    @ParameterizedTest
+    @ValueSource(ints = {3, 7, 14, 51})

Review Comment:
   I think another way to do this is like this:
   ```java
   @EnumSource(names = { 
     "UNKNOWN_TOPIC_OR_PARTITION", 
     "REQUEST_TIMED_OUT", 
     "COORDINATOR_LOAD_IN_PROGRESS", 
     "CONCURRENT_TRANSACTIONS" 
   })
   ```



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() 
throws InterruptedExcept
         assertTrue(secondResponseFuture.isDone());
     }
 
+    @ParameterizedTest
+    @ValueSource(ints = {3, 7, 14, 51})
+    public void testRetriableErrors(int errorCode) {
+        // Tests Errors.CONCURRENT_TRANSACTIONS, 
Errors.COORDINATOR_LOAD_IN_PROGRESS,
+        // Errors.REQUEST_TIMED_OUT, Errors.UNKNOWN_TOPIC_OR_PARTITION
+        // We skip COORDINATOR_NOT_AVAILABLE since it breaks the logic.
+        Errors error = Errors.forCode((short) errorCode);
+
+        // Ensure FindCoordinator retries.
+        TransactionalRequestResult result = 
transactionManager.initializeTransactions();
+        prepareFindCoordinatorResponse(error, false, 
CoordinatorType.TRANSACTION, transactionalId);
+        prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
+        runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
+        assertEquals(brokerNode, 
transactionManager.coordinator(CoordinatorType.TRANSACTION));
+
+        // Ensure InitPid retries.
+        prepareInitPidResponse(error, false, producerId, epoch);
+        prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
+        runUntil(transactionManager::hasProducerId);
+
+        result.await();
+        transactionManager.beginTransaction();
+
+        // Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is 
handled differently here, we substitute.
+        Errors addPartitionsToTxnError = errorCode == 51 ? 
Errors.COORDINATOR_LOAD_IN_PROGRESS : error;
+        transactionManager.maybeAddPartition(tp0);
+        prepareAddPartitionsToTxnResponse(addPartitionsToTxnError, tp0, epoch, 
producerId);
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+        runUntil(() -> transactionManager.transactionContainsPartition(tp0));
+
+        // Ensure txnOffsetCommit retries is tested in 
testRetriableErrorInTxnOffsetCommit.
+
+        // Ensure EndTxn retries.
+        TransactionalRequestResult abortResult = 
transactionManager.beginCommit();
+        prepareEndTxnResponse(error, TransactionResult.COMMIT, producerId, 
epoch);
+        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 
producerId, epoch);
+        runUntil(abortResult::isCompleted);
+        assertTrue(abortResult.isSuccessful());
+    }
+
+    @Test
+    public void testCoordinatorNotAvailable() {
+        // Ensure FindCoordinator with COORDINATOR_NOT_AVAILABLE error retries.
+        TransactionalRequestResult result = 
transactionManager.initializeTransactions();
+        prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, 
false, CoordinatorType.TRANSACTION, transactionalId);
+        prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
+        runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
+        assertEquals(brokerNode, 
transactionManager.coordinator(CoordinatorType.TRANSACTION));
+
+        prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
+        runUntil(transactionManager::hasProducerId);
+
+        result.await();
+    }
+

Review Comment:
   nit: extra new line



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() 
throws InterruptedExcept
         assertTrue(secondResponseFuture.isDone());
     }
 
+    @ParameterizedTest
+    @ValueSource(ints = {3, 7, 14, 51})
+    public void testRetriableErrors(int errorCode) {
+        // Tests Errors.CONCURRENT_TRANSACTIONS, 
Errors.COORDINATOR_LOAD_IN_PROGRESS,
+        // Errors.REQUEST_TIMED_OUT, Errors.UNKNOWN_TOPIC_OR_PARTITION
+        // We skip COORDINATOR_NOT_AVAILABLE since it breaks the logic.
+        Errors error = Errors.forCode((short) errorCode);
+
+        // Ensure FindCoordinator retries.
+        TransactionalRequestResult result = 
transactionManager.initializeTransactions();
+        prepareFindCoordinatorResponse(error, false, 
CoordinatorType.TRANSACTION, transactionalId);
+        prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
+        runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
+        assertEquals(brokerNode, 
transactionManager.coordinator(CoordinatorType.TRANSACTION));
+
+        // Ensure InitPid retries.
+        prepareInitPidResponse(error, false, producerId, epoch);
+        prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
+        runUntil(transactionManager::hasProducerId);
+
+        result.await();
+        transactionManager.beginTransaction();
+
+        // Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is 
handled differently here, we substitute.
+        Errors addPartitionsToTxnError = errorCode == 51 ? 
Errors.COORDINATOR_LOAD_IN_PROGRESS : error;
+        transactionManager.maybeAddPartition(tp0);
+        prepareAddPartitionsToTxnResponse(addPartitionsToTxnError, tp0, epoch, 
producerId);
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+        runUntil(() -> transactionManager.transactionContainsPartition(tp0));
+
+        // Ensure txnOffsetCommit retries is tested in 
testRetriableErrorInTxnOffsetCommit.
+
+        // Ensure EndTxn retries.
+        TransactionalRequestResult abortResult = 
transactionManager.beginCommit();
+        prepareEndTxnResponse(error, TransactionResult.COMMIT, producerId, 
epoch);
+        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 
producerId, epoch);
+        runUntil(abortResult::isCompleted);
+        assertTrue(abortResult.isSuccessful());
+    }
+
+    @Test
+    public void testCoordinatorNotAvailable() {

Review Comment:
   Do we have another test for `NOT_COORDINATOR`? I wonder if we can 
parameterize this one as well and cover both cases?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to