dongnuo123 commented on code in PR #16845:
URL: https://github.com/apache/kafka/pull/16845#discussion_r1711618266


##########
tests/kafkatest/tests/client/consumer_test.py:
##########
@@ -416,8 +416,15 @@ def test_consumer_failure(self, clean_shutdown, 
enable_autocommit, metadata_quor
         consumer.start()
         self.await_all_members(consumer)
 
-        partition_owner = consumer.owner(partition)
-        assert partition_owner is not None
+        partition_owner_container = {partition: None}
+        def check_partition_owner(partition_owner_container):
+            partition_owner_container[partition] = consumer.owner(partition)
+            return partition_owner_container[partition] is not None
+
+        wait_until(lambda: check_partition_owner(partition_owner_container),
+                   timeout_sec=self.session_timeout_sec*2+5,
+                   err_msg="Timed out waiting for partition to be assigned.")
+        partition_owner = partition_owner_container[partition]

Review Comment:
   For new protocol consumer, it's possible that all members have joined but 
the members are not fully stablized and the partition currently doesn't have an 
owner. We need to wait for a while for the assertion.
   
   It's a bit complicated because the partition owner will be used on L433 so 
we want to keep a variable here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to