jolshan commented on code in PR #12915: URL: https://github.com/apache/kafka/pull/12915#discussion_r1038368961
########## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ########## @@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept assertTrue(secondResponseFuture.isDone()); } + @ParameterizedTest + @ValueSource(ints = {3, 7, 14, 51}) + public void testRetriableErrors(int errorCode) { + // Tests Errors.CONCURRENT_TRANSACTIONS, Errors.COORDINATOR_LOAD_IN_PROGRESS, + // Errors.REQUEST_TIMED_OUT, Errors.UNKNOWN_TOPIC_OR_PARTITION + // We skip COORDINATOR_NOT_AVAILABLE since it breaks the logic. + Errors error = Errors.forCode((short) errorCode); + + // Ensure FindCoordinator retries. + TransactionalRequestResult result = transactionManager.initializeTransactions(); + prepareFindCoordinatorResponse(error, false, CoordinatorType.TRANSACTION, transactionalId); + prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); + runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null); + assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); + + // Ensure InitPid retries. + prepareInitPidResponse(error, false, producerId, epoch); + prepareInitPidResponse(Errors.NONE, false, producerId, epoch); + runUntil(transactionManager::hasProducerId); + + result.await(); + transactionManager.beginTransaction(); + + // Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is handled differently here, we substitute. + Errors addPartitionsToTxnError = errorCode == 51 ? Errors.COORDINATOR_LOAD_IN_PROGRESS : error; + transactionManager.maybeAddPartition(tp0); + prepareAddPartitionsToTxnResponse(addPartitionsToTxnError, tp0, epoch, producerId); + prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId); + runUntil(() -> transactionManager.transactionContainsPartition(tp0)); + + // Ensure txnOffsetCommit retries is tested in testRetriableErrorInTxnOffsetCommit. + + // Ensure EndTxn retries. + TransactionalRequestResult abortResult = transactionManager.beginCommit(); + prepareEndTxnResponse(error, TransactionResult.COMMIT, producerId, epoch); + prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, producerId, epoch); + runUntil(abortResult::isCompleted); + assertTrue(abortResult.isSuccessful()); + } + + @Test + public void testCoordinatorNotAvailable() { Review Comment: I suppose we have `testLookupCoordinatorOnNotCoordinatorError` Is this what you were thinking of? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org