dongnuo123 commented on code in PR #16845: URL: https://github.com/apache/kafka/pull/16845#discussion_r1714495748
########## tests/kafkatest/services/verifiable_consumer.py: ########## @@ -176,6 +176,25 @@ def handle_partitions_assigned(self, event, node, logger): logger.debug("Partitions %s assigned to %s" % (assignment, node.account.hostname)) self.assignment.extend(assignment) +# This needs to be used for consumer protocol. +class ConsumerProtocolConsumerEventHandler(IncrementalAssignmentConsumerEventHandler): + def __init__(self, node, verify_offsets, idx): + super().__init__(node, verify_offsets, idx) + + def handle_partitions_revoked(self, event, node, logger): + self.revoked_count += 1 + self.position = {} + revoked = [] + + for topic_partition in event["partitions"]: + tp = _create_partition_from_dict(topic_partition) + # tp existing in self.assignment is not guaranteed in the new consumer + # if it shuts down when revoking partitions for reconciliation. Review Comment: We've seen failures when asserting the revoked partition exists in self.assignment. It was caused by a partition being shut down while running the reconciliation callback. (discussed here https://confluent.slack.com/archives/C03TR66G3BP/p1720537327894809) ########## tests/kafkatest/services/verifiable_consumer.py: ########## @@ -245,13 +264,22 @@ def __init__(self, context, num_nodes, kafka, topic, group_id, def java_class_name(self): return "VerifiableConsumer" + def create_event_handler(self, idx, node): + if self.is_consumer_group_protocol_enabled(): + return ConsumerProtocolConsumerEventHandler(node, self.verify_offsets, idx) + elif self.is_eager(): + return ConsumerEventHandler(node, self.verify_offsets, idx) + else: + return IncrementalAssignmentConsumerEventHandler(node, self.verify_offsets, idx) + def _worker(self, idx, node): with self.lock: if node not in self.event_handlers: - if self.is_eager(): - self.event_handlers[node] = ConsumerEventHandler(node, self.verify_offsets, idx) - else: - self.event_handlers[node] = IncrementalAssignmentConsumerEventHandler(node, self.verify_offsets, idx) + self.event_handlers[node] = self.create_event_handler(idx, node) + else: + new_event_handler = self.create_event_handler(idx, node) + if self.event_handlers[node].__class__.__name__ != new_event_handler.__class__.__name__: + self.event_handlers[node] = new_event_handler Review Comment: Yes it makes sense. Let me copy the states to the newly created handler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org