[ 
https://issues.apache.org/jira/browse/KAFKA-6829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461754#comment-16461754
 ] 

ASF GitHub Bot commented on KAFKA-6829:
---------------------------------------

guozhangwang closed pull request #4948: KAFKA-6829: Retry commits on unknown 
topic or partition
URL: https://github.com/apache/kafka/pull/4948
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 3c99c966d54..eec070ec663 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -757,7 +757,8 @@ public void handle(OffsetCommitResponse commitResponse, 
RequestFuture<Void> futu
                         // raise the error to the user
                         future.raise(error);
                         return;
-                    } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+                    } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS
+                            || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                         // just retry
                         future.raise(error);
                         return;
@@ -774,9 +775,6 @@ public void handle(OffsetCommitResponse commitResponse, 
RequestFuture<Void> futu
                         resetGeneration();
                         future.raise(new CommitFailedException());
                         return;
-                    } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
-                        future.raise(new KafkaException("Topic or Partition " 
+ tp + " does not exist"));
-                        return;
                     } else {
                         future.raise(new KafkaException("Unexpected error in 
commit: " + error.message()));
                         return;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 3e3c423a428..030419075c6 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1373,13 +1373,15 @@ public void run() {
         assertEquals(Arrays.asList(firstOffset, secondOffset), 
committedOffsets);
     }
 
-    @Test(expected = KafkaException.class)
-    public void testCommitUnknownTopicOrPartition() {
+    @Test
+    public void testRetryCommitUnknownTopicOrPartition() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
-        prepareOffsetCommitRequest(singletonMap(t1p, 100L), 
Errors.UNKNOWN_TOPIC_OR_PARTITION);
-        coordinator.commitOffsetsSync(singletonMap(t1p, new 
OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
+        client.prepareResponse(offsetCommitResponse(singletonMap(t1p, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+        client.prepareResponse(offsetCommitResponse(singletonMap(t1p, 
Errors.NONE)));
+
+        assertTrue(coordinator.commitOffsetsSync(singletonMap(t1p, new 
OffsetAndMetadata(100L, "metadata")), 10000));
     }
 
     @Test(expected = OffsetMetadataTooLarge.class)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Consumer should retry when encountering unknown topic or partition error
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-6829
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6829
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>            Reporter: Bill Bejeck
>            Assignee: Bill Bejeck
>            Priority: Minor
>             Fix For: 2.0.0
>
>
> For the UNKNOWN_TOPIC_OR_PARTITION error, we could change the consumer's 
> behavior to retry after this error.  While this is a rare case since the user 
> would not commit offsets for topics unless they had been able to fetch from 
> them, but this doesn't really handle the situation where the broker hasn't 
> received _any_ metadata updates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to