jolshan commented on code in PR #12915: URL: https://github.com/apache/kafka/pull/12915#discussion_r1038385341
########## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ########## @@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept assertTrue(secondResponseFuture.isDone()); } + @ParameterizedTest + @ValueSource(ints = {3, 7, 14, 51}) + public void testRetriableErrors(int errorCode) { + // Tests Errors.CONCURRENT_TRANSACTIONS, Errors.COORDINATOR_LOAD_IN_PROGRESS, + // Errors.REQUEST_TIMED_OUT, Errors.UNKNOWN_TOPIC_OR_PARTITION + // We skip COORDINATOR_NOT_AVAILABLE since it breaks the logic. + Errors error = Errors.forCode((short) errorCode); + + // Ensure FindCoordinator retries. + TransactionalRequestResult result = transactionManager.initializeTransactions(); + prepareFindCoordinatorResponse(error, false, CoordinatorType.TRANSACTION, transactionalId); + prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); + runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null); + assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); + + // Ensure InitPid retries. + prepareInitPidResponse(error, false, producerId, epoch); + prepareInitPidResponse(Errors.NONE, false, producerId, epoch); + runUntil(transactionManager::hasProducerId); + + result.await(); + transactionManager.beginTransaction(); + + // Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is handled differently here, we substitute. + Errors addPartitionsToTxnError = errorCode == 51 ? Errors.COORDINATOR_LOAD_IN_PROGRESS : error; + transactionManager.maybeAddPartition(tp0); + prepareAddPartitionsToTxnResponse(addPartitionsToTxnError, tp0, epoch, producerId); + prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId); + runUntil(() -> transactionManager.transactionContainsPartition(tp0)); + + // Ensure txnOffsetCommit retries is tested in testRetriableErrorInTxnOffsetCommit. + + // Ensure EndTxn retries. + TransactionalRequestResult abortResult = transactionManager.beginCommit(); + prepareEndTxnResponse(error, TransactionResult.COMMIT, producerId, epoch); + prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, producerId, epoch); + runUntil(abortResult::isCompleted); + assertTrue(abortResult.isSuccessful()); + } + + @Test + public void testCoordinatorNotAvailable() { Review Comment: Oops, I added it to that test. I will revert that change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org