[ https://issues.apache.org/jira/browse/KAFKA-6829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461754#comment-16461754 ]
ASF GitHub Bot commented on KAFKA-6829: --------------------------------------- guozhangwang closed pull request #4948: KAFKA-6829: Retry commits on unknown topic or partition URL: https://github.com/apache/kafka/pull/4948 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 3c99c966d54..eec070ec663 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -757,7 +757,8 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> futu // raise the error to the user future.raise(error); return; - } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) { + } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS + || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { // just retry future.raise(error); return; @@ -774,9 +775,6 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> futu resetGeneration(); future.raise(new CommitFailedException()); return; - } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { - future.raise(new KafkaException("Topic or Partition " + tp + " does not exist")); - return; } else { future.raise(new KafkaException("Unexpected error in commit: " + error.message())); return; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 3e3c423a428..030419075c6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -1373,13 +1373,15 @@ public void run() { assertEquals(Arrays.asList(firstOffset, secondOffset), committedOffsets); } - @Test(expected = KafkaException.class) - public void testCommitUnknownTopicOrPartition() { + @Test + public void testRetryCommitUnknownTopicOrPartition() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); - prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.UNKNOWN_TOPIC_OR_PARTITION); - coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE); + client.prepareResponse(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION))); + client.prepareResponse(offsetCommitResponse(singletonMap(t1p, Errors.NONE))); + + assertTrue(coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), 10000)); } @Test(expected = OffsetMetadataTooLarge.class) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Consumer should retry when encountering unknown topic or partition error > ------------------------------------------------------------------------ > > Key: KAFKA-6829 > URL: https://issues.apache.org/jira/browse/KAFKA-6829 > Project: Kafka > Issue Type: Bug > Components: clients > Reporter: Bill Bejeck > Assignee: Bill Bejeck > Priority: Minor > Fix For: 2.0.0 > > > For the UNKNOWN_TOPIC_OR_PARTITION error, we could change the consumer's > behavior to retry after this error. While this is a rare case since the user > would not commit offsets for topics unless they had been able to fetch from > them, but this doesn't really handle the situation where the broker hasn't > received _any_ metadata updates. -- This message was sent by Atlassian JIRA (v7.6.3#76005)