dongnuo123 commented on code in PR #16845:
URL: https://github.com/apache/kafka/pull/16845#discussion_r1714495748


##########
tests/kafkatest/services/verifiable_consumer.py:
##########
@@ -176,6 +176,25 @@ def handle_partitions_assigned(self, event, node, logger):
         logger.debug("Partitions %s assigned to %s" % (assignment, 
node.account.hostname))
         self.assignment.extend(assignment)
 
+# This needs to be used for consumer protocol.
+class 
ConsumerProtocolConsumerEventHandler(IncrementalAssignmentConsumerEventHandler):
+    def __init__(self, node, verify_offsets, idx):
+        super().__init__(node, verify_offsets, idx)
+
+    def handle_partitions_revoked(self, event, node, logger):
+        self.revoked_count += 1
+        self.position = {}
+        revoked = []
+
+        for topic_partition in event["partitions"]:
+            tp = _create_partition_from_dict(topic_partition)
+            # tp existing in self.assignment is not guaranteed in the new 
consumer
+            # if it shuts down when revoking partitions for reconciliation.

Review Comment:
   We've seen failures when asserting the revoked partition exists in 
self.assignment. It was caused by a partition being shut down while running the 
reconciliation callback. (discussed here 
https://confluent.slack.com/archives/C03TR66G3BP/p1720537327894809)



##########
tests/kafkatest/services/verifiable_consumer.py:
##########
@@ -245,13 +264,22 @@ def __init__(self, context, num_nodes, kafka, topic, 
group_id,
     def java_class_name(self):
         return "VerifiableConsumer"
 
+    def create_event_handler(self, idx, node):
+        if self.is_consumer_group_protocol_enabled():
+            return ConsumerProtocolConsumerEventHandler(node, 
self.verify_offsets, idx)
+        elif self.is_eager():
+            return ConsumerEventHandler(node, self.verify_offsets, idx)
+        else:
+            return IncrementalAssignmentConsumerEventHandler(node, 
self.verify_offsets, idx)
+
     def _worker(self, idx, node):
         with self.lock:
             if node not in self.event_handlers:
-                if self.is_eager():
-                    self.event_handlers[node] = ConsumerEventHandler(node, 
self.verify_offsets, idx)
-                else:
-                    self.event_handlers[node] = 
IncrementalAssignmentConsumerEventHandler(node, self.verify_offsets, idx)
+                self.event_handlers[node] = self.create_event_handler(idx, 
node)
+            else:
+                new_event_handler = self.create_event_handler(idx, node)
+                if self.event_handlers[node].__class__.__name__ != 
new_event_handler.__class__.__name__:
+                    self.event_handlers[node] = new_event_handler

Review Comment:
   Yes it makes sense. Let me copy the states to the newly created handler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to