hachikuji commented on code in PR #12915: URL: https://github.com/apache/kafka/pull/12915#discussion_r1038362105
########## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ########## @@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept assertTrue(secondResponseFuture.isDone()); } + @ParameterizedTest + @ValueSource(ints = {3, 7, 14, 51}) Review Comment: I think another way to do this is like this: ```java @EnumSource(names = { "UNKNOWN_TOPIC_OR_PARTITION", "REQUEST_TIMED_OUT", "COORDINATOR_LOAD_IN_PROGRESS", "CONCURRENT_TRANSACTIONS" }) ``` ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ########## @@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept assertTrue(secondResponseFuture.isDone()); } + @ParameterizedTest + @ValueSource(ints = {3, 7, 14, 51}) + public void testRetriableErrors(int errorCode) { + // Tests Errors.CONCURRENT_TRANSACTIONS, Errors.COORDINATOR_LOAD_IN_PROGRESS, + // Errors.REQUEST_TIMED_OUT, Errors.UNKNOWN_TOPIC_OR_PARTITION + // We skip COORDINATOR_NOT_AVAILABLE since it breaks the logic. + Errors error = Errors.forCode((short) errorCode); + + // Ensure FindCoordinator retries. + TransactionalRequestResult result = transactionManager.initializeTransactions(); + prepareFindCoordinatorResponse(error, false, CoordinatorType.TRANSACTION, transactionalId); + prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); + runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null); + assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); + + // Ensure InitPid retries. + prepareInitPidResponse(error, false, producerId, epoch); + prepareInitPidResponse(Errors.NONE, false, producerId, epoch); + runUntil(transactionManager::hasProducerId); + + result.await(); + transactionManager.beginTransaction(); + + // Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is handled differently here, we substitute. + Errors addPartitionsToTxnError = errorCode == 51 ? Errors.COORDINATOR_LOAD_IN_PROGRESS : error; + transactionManager.maybeAddPartition(tp0); + prepareAddPartitionsToTxnResponse(addPartitionsToTxnError, tp0, epoch, producerId); + prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId); + runUntil(() -> transactionManager.transactionContainsPartition(tp0)); + + // Ensure txnOffsetCommit retries is tested in testRetriableErrorInTxnOffsetCommit. + + // Ensure EndTxn retries. + TransactionalRequestResult abortResult = transactionManager.beginCommit(); + prepareEndTxnResponse(error, TransactionResult.COMMIT, producerId, epoch); + prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, producerId, epoch); + runUntil(abortResult::isCompleted); + assertTrue(abortResult.isSuccessful()); + } + + @Test + public void testCoordinatorNotAvailable() { + // Ensure FindCoordinator with COORDINATOR_NOT_AVAILABLE error retries. + TransactionalRequestResult result = transactionManager.initializeTransactions(); + prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, false, CoordinatorType.TRANSACTION, transactionalId); + prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); + runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null); + assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); + + prepareInitPidResponse(Errors.NONE, false, producerId, epoch); + runUntil(transactionManager::hasProducerId); + + result.await(); + } + Review Comment: nit: extra new line ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ########## @@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept assertTrue(secondResponseFuture.isDone()); } + @ParameterizedTest + @ValueSource(ints = {3, 7, 14, 51}) + public void testRetriableErrors(int errorCode) { + // Tests Errors.CONCURRENT_TRANSACTIONS, Errors.COORDINATOR_LOAD_IN_PROGRESS, + // Errors.REQUEST_TIMED_OUT, Errors.UNKNOWN_TOPIC_OR_PARTITION + // We skip COORDINATOR_NOT_AVAILABLE since it breaks the logic. + Errors error = Errors.forCode((short) errorCode); + + // Ensure FindCoordinator retries. + TransactionalRequestResult result = transactionManager.initializeTransactions(); + prepareFindCoordinatorResponse(error, false, CoordinatorType.TRANSACTION, transactionalId); + prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); + runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null); + assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); + + // Ensure InitPid retries. + prepareInitPidResponse(error, false, producerId, epoch); + prepareInitPidResponse(Errors.NONE, false, producerId, epoch); + runUntil(transactionManager::hasProducerId); + + result.await(); + transactionManager.beginTransaction(); + + // Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is handled differently here, we substitute. + Errors addPartitionsToTxnError = errorCode == 51 ? Errors.COORDINATOR_LOAD_IN_PROGRESS : error; + transactionManager.maybeAddPartition(tp0); + prepareAddPartitionsToTxnResponse(addPartitionsToTxnError, tp0, epoch, producerId); + prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId); + runUntil(() -> transactionManager.transactionContainsPartition(tp0)); + + // Ensure txnOffsetCommit retries is tested in testRetriableErrorInTxnOffsetCommit. + + // Ensure EndTxn retries. + TransactionalRequestResult abortResult = transactionManager.beginCommit(); + prepareEndTxnResponse(error, TransactionResult.COMMIT, producerId, epoch); + prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, producerId, epoch); + runUntil(abortResult::isCompleted); + assertTrue(abortResult.isSuccessful()); + } + + @Test + public void testCoordinatorNotAvailable() { Review Comment: Do we have another test for `NOT_COORDINATOR`? I wonder if we can parameterize this one as well and cover both cases? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org