jolshan commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1038385341


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() 
throws InterruptedExcept
         assertTrue(secondResponseFuture.isDone());
     }
 
+    @ParameterizedTest
+    @ValueSource(ints = {3, 7, 14, 51})
+    public void testRetriableErrors(int errorCode) {
+        // Tests Errors.CONCURRENT_TRANSACTIONS, 
Errors.COORDINATOR_LOAD_IN_PROGRESS,
+        // Errors.REQUEST_TIMED_OUT, Errors.UNKNOWN_TOPIC_OR_PARTITION
+        // We skip COORDINATOR_NOT_AVAILABLE since it breaks the logic.
+        Errors error = Errors.forCode((short) errorCode);
+
+        // Ensure FindCoordinator retries.
+        TransactionalRequestResult result = 
transactionManager.initializeTransactions();
+        prepareFindCoordinatorResponse(error, false, 
CoordinatorType.TRANSACTION, transactionalId);
+        prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
+        runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
+        assertEquals(brokerNode, 
transactionManager.coordinator(CoordinatorType.TRANSACTION));
+
+        // Ensure InitPid retries.
+        prepareInitPidResponse(error, false, producerId, epoch);
+        prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
+        runUntil(transactionManager::hasProducerId);
+
+        result.await();
+        transactionManager.beginTransaction();
+
+        // Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is 
handled differently here, we substitute.
+        Errors addPartitionsToTxnError = errorCode == 51 ? 
Errors.COORDINATOR_LOAD_IN_PROGRESS : error;
+        transactionManager.maybeAddPartition(tp0);
+        prepareAddPartitionsToTxnResponse(addPartitionsToTxnError, tp0, epoch, 
producerId);
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+        runUntil(() -> transactionManager.transactionContainsPartition(tp0));
+
+        // Ensure txnOffsetCommit retries is tested in 
testRetriableErrorInTxnOffsetCommit.
+
+        // Ensure EndTxn retries.
+        TransactionalRequestResult abortResult = 
transactionManager.beginCommit();
+        prepareEndTxnResponse(error, TransactionResult.COMMIT, producerId, 
epoch);
+        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 
producerId, epoch);
+        runUntil(abortResult::isCompleted);
+        assertTrue(abortResult.isSuccessful());
+    }
+
+    @Test
+    public void testCoordinatorNotAvailable() {

Review Comment:
   Oops, I added it to that test. I will revert that change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to