dongnuo123 commented on code in PR #16845: URL: https://github.com/apache/kafka/pull/16845#discussion_r1711618266
########## tests/kafkatest/tests/client/consumer_test.py: ########## @@ -416,8 +416,15 @@ def test_consumer_failure(self, clean_shutdown, enable_autocommit, metadata_quor consumer.start() self.await_all_members(consumer) - partition_owner = consumer.owner(partition) - assert partition_owner is not None + partition_owner_container = {partition: None} + def check_partition_owner(partition_owner_container): + partition_owner_container[partition] = consumer.owner(partition) + return partition_owner_container[partition] is not None + + wait_until(lambda: check_partition_owner(partition_owner_container), + timeout_sec=self.session_timeout_sec*2+5, + err_msg="Timed out waiting for partition to be assigned.") + partition_owner = partition_owner_container[partition] Review Comment: For new protocol consumer, it's possible that all members have joined but the members are not fully stablized and the partition currently doesn't have an owner. We need to wait for a while for the assertion. It's a bit complicated because the partition owner will be used on L433 so we want to keep a variable here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org