AndrewJSchofield commented on code in PR #18718:
URL: https://github.com/apache/kafka/pull/18718#discussion_r1933731004


##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -1832,83 +1835,129 @@ public void 
testShareAutoOffsetResetByDurationInvalidFormat() throws Exception {
             @ClusterConfigProperty(key = "unstable.api.versions.enable", value 
= "true")
         }
     )
+    @Timeout(90)
     public void testShareConsumerAfterCoordinatorMovement() throws Exception {
         setup();
         String topicName = "multipart";
         String groupId = "multipartGrp";
         Uuid topicId = createTopic(topicName, 3, 3);
         alterShareAutoOffsetReset(groupId, "earliest");
+        ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
 
         try (Admin admin = createAdminClient()) {
             TopicPartition tpMulti = new TopicPartition(topicName, 0);
 
             // produce some messages
-            try (Producer<byte[], byte[]> producer = createProducer()) {
-                ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
-                    tpMulti.topic(),
-                    tpMulti.partition(),
-                    null,
-                    "key".getBytes(),
-                    "value".getBytes()
-                );
-                IntStream.range(0, 10).forEach(__ -> producer.send(record));
-                producer.flush();
-            }
-
-            // consume messages
-            try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId)) {
-                shareConsumer.subscribe(List.of(topicName));
-                ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
-                assertEquals(10, records.count());
-            }
+            ClientState prodState = new ClientState();
+            final Set<String> produced = new HashSet<>();
+            service.execute(() -> {
+                    int i = 0;
+                    try (Producer<String, String> producer = 
createProducer(Map.of(
+                        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName(),
+                        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName()
+                    ))) {
+                        while (!prodState.done().get()) {
+                            String key = "key-" + (i++);
+                            ProducerRecord<String, String> record = new 
ProducerRecord<>(
+                                tpMulti.topic(),
+                                tpMulti.partition(),
+                                null,
+                                key,
+                                "value"
+                            );
+                            try {
+                                producer.send(record);
+                                producer.flush();
+                                // count only correctly produced records
+                                prodState.count().incrementAndGet();
+                                produced.add(key);
+                            } catch (Exception e) {
+                                // ignore
+                            }
+                        }
+                    }
+                }
+            );
 
-            // get current share coordinator node
-            SharePartitionKey key = SharePartitionKey.getInstance(groupId, new 
TopicIdPartition(topicId, tpMulti));
-            int shareGroupStateTp = 
Utils.abs(key.asCoordinatorKey().hashCode()) % 3;
-            List<Integer> curShareCoordNodeId = 
admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).allTopicNames().get().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME)
-                .partitions().stream()
-                .filter(info -> info.partition() == shareGroupStateTp)
-                .map(info -> info.leader().id())
-                .toList();
+            // consume messages - start after small delay
+            ClientState consState = new ClientState();
+            // using map here if we want to debug specific keys
+            Map<String, Integer> consumed = new HashMap<>();
+            service.schedule(() -> {
+                    try (ShareConsumer<String, String> shareConsumer = 
createShareConsumer(groupId, Map.of(
+                        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName(),
+                        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName()
+                    ))) {
+                        shareConsumer.subscribe(List.of(topicName));
+                        while (!consState.done().get()) {
+                            ConsumerRecords<String, String> records = 
shareConsumer.poll(Duration.ofMillis(2000L));
+                            consState.count().addAndGet(records.count());
+                            records.forEach(rec -> consumed.compute(rec.key(), 
(k, v) -> v == null ? 1 : v + 1));
+                            if (prodState.done().get() && records.count() == 
0) {
+                                consState.done().set(true);
+                            }
+                        }
+                    }
+                }, 100L, TimeUnit.MILLISECONDS
+            );
 
-            assertEquals(1, curShareCoordNodeId.size());
+            service.schedule(() -> {
+                    // get current share coordinator node
+                    SharePartitionKey key = 
SharePartitionKey.getInstance(groupId, new TopicIdPartition(topicId, tpMulti));
+                    int shareGroupStateTp = 
Utils.abs(key.asCoordinatorKey().hashCode()) % 3;
+                    List<Integer> curShareCoordNodeId = null;
+                    try {
+                        curShareCoordNodeId = 
admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).allTopicNames().get().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME)
+                            .partitions().stream()
+                            .filter(info -> info.partition() == 
shareGroupStateTp)
+                            .map(info -> info.leader().id())
+                            .toList();
+                    } catch (Exception e) {
+                        fail(e);
+                    }
+                    assertEquals(1, curShareCoordNodeId.size());
+
+                    // shutdown the coordinator
+                    KafkaBroker broker = 
cluster.brokers().get(curShareCoordNodeId.get(0));
+                    cluster.shutdownBroker(curShareCoordNodeId.get(0));
+
+                    // give some breathing time
+                    broker.awaitShutdown();
+
+                    List<Integer> newShareCoordNodeId = null;
+                    try {
+                        newShareCoordNodeId = 
admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).allTopicNames().get().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME)
+                            .partitions().stream()
+                            .filter(info -> info.partition() == 
shareGroupStateTp)
+                            .map(info -> info.leader().id())
+                            .toList();
+                    } catch (Exception e) {
+                        fail(e);
+                    }
 
-            // shutdown the coordinator
-            KafkaBroker broker = 
cluster.brokers().get(curShareCoordNodeId.get(0));
-            cluster.shutdownBroker(curShareCoordNodeId.get(0));
+                    assertEquals(1, newShareCoordNodeId.size());
+                    assertNotEquals(curShareCoordNodeId.get(0), 
newShareCoordNodeId.get(0));
+                }, 5L, TimeUnit.SECONDS
+            );
 
-            // give some breathing time
-            broker.awaitShutdown();
+            service.schedule(() -> {
+                    // stop the produce
+                    prodState.done().set(true);
+                }, 10L, TimeUnit.SECONDS
+            );
 
-            List<Integer> newShareCoordNodeId = 
admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).allTopicNames().get().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME)
-                .partitions().stream()
-                .filter(info -> info.partition() == shareGroupStateTp)
-                .map(info -> info.leader().id())
-                .toList();
+            TestUtils.waitForCondition(

Review Comment:
   And another comment please such as "Stop the producers after a few seconds".



##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -1832,83 +1835,129 @@ public void 
testShareAutoOffsetResetByDurationInvalidFormat() throws Exception {
             @ClusterConfigProperty(key = "unstable.api.versions.enable", value 
= "true")
         }
     )
+    @Timeout(90)
     public void testShareConsumerAfterCoordinatorMovement() throws Exception {
         setup();
         String topicName = "multipart";
         String groupId = "multipartGrp";
         Uuid topicId = createTopic(topicName, 3, 3);
         alterShareAutoOffsetReset(groupId, "earliest");
+        ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
 
         try (Admin admin = createAdminClient()) {
             TopicPartition tpMulti = new TopicPartition(topicName, 0);
 
             // produce some messages
-            try (Producer<byte[], byte[]> producer = createProducer()) {
-                ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
-                    tpMulti.topic(),
-                    tpMulti.partition(),
-                    null,
-                    "key".getBytes(),
-                    "value".getBytes()
-                );
-                IntStream.range(0, 10).forEach(__ -> producer.send(record));
-                producer.flush();
-            }
-
-            // consume messages
-            try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId)) {
-                shareConsumer.subscribe(List.of(topicName));
-                ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
-                assertEquals(10, records.count());
-            }
+            ClientState prodState = new ClientState();
+            final Set<String> produced = new HashSet<>();
+            service.execute(() -> {
+                    int i = 0;
+                    try (Producer<String, String> producer = 
createProducer(Map.of(
+                        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName(),
+                        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName()
+                    ))) {
+                        while (!prodState.done().get()) {
+                            String key = "key-" + (i++);
+                            ProducerRecord<String, String> record = new 
ProducerRecord<>(
+                                tpMulti.topic(),
+                                tpMulti.partition(),
+                                null,
+                                key,
+                                "value"
+                            );
+                            try {
+                                producer.send(record);
+                                producer.flush();
+                                // count only correctly produced records
+                                prodState.count().incrementAndGet();
+                                produced.add(key);
+                            } catch (Exception e) {
+                                // ignore
+                            }
+                        }
+                    }
+                }
+            );
 
-            // get current share coordinator node
-            SharePartitionKey key = SharePartitionKey.getInstance(groupId, new 
TopicIdPartition(topicId, tpMulti));
-            int shareGroupStateTp = 
Utils.abs(key.asCoordinatorKey().hashCode()) % 3;
-            List<Integer> curShareCoordNodeId = 
admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).allTopicNames().get().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME)
-                .partitions().stream()
-                .filter(info -> info.partition() == shareGroupStateTp)
-                .map(info -> info.leader().id())
-                .toList();
+            // consume messages - start after small delay
+            ClientState consState = new ClientState();
+            // using map here if we want to debug specific keys
+            Map<String, Integer> consumed = new HashMap<>();
+            service.schedule(() -> {
+                    try (ShareConsumer<String, String> shareConsumer = 
createShareConsumer(groupId, Map.of(
+                        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName(),
+                        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName()
+                    ))) {
+                        shareConsumer.subscribe(List.of(topicName));
+                        while (!consState.done().get()) {
+                            ConsumerRecords<String, String> records = 
shareConsumer.poll(Duration.ofMillis(2000L));
+                            consState.count().addAndGet(records.count());
+                            records.forEach(rec -> consumed.compute(rec.key(), 
(k, v) -> v == null ? 1 : v + 1));
+                            if (prodState.done().get() && records.count() == 
0) {
+                                consState.done().set(true);
+                            }
+                        }
+                    }
+                }, 100L, TimeUnit.MILLISECONDS
+            );
 
-            assertEquals(1, curShareCoordNodeId.size());
+            service.schedule(() -> {

Review Comment:
   A comment above here please. Something like "Shut down the current share 
coordinator node after a short delay" just to aid with readability.



##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -1832,83 +1835,129 @@ public void 
testShareAutoOffsetResetByDurationInvalidFormat() throws Exception {
             @ClusterConfigProperty(key = "unstable.api.versions.enable", value 
= "true")
         }
     )
+    @Timeout(90)
     public void testShareConsumerAfterCoordinatorMovement() throws Exception {
         setup();
         String topicName = "multipart";
         String groupId = "multipartGrp";
         Uuid topicId = createTopic(topicName, 3, 3);
         alterShareAutoOffsetReset(groupId, "earliest");
+        ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
 
         try (Admin admin = createAdminClient()) {
             TopicPartition tpMulti = new TopicPartition(topicName, 0);
 
             // produce some messages
-            try (Producer<byte[], byte[]> producer = createProducer()) {
-                ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
-                    tpMulti.topic(),
-                    tpMulti.partition(),
-                    null,
-                    "key".getBytes(),
-                    "value".getBytes()
-                );
-                IntStream.range(0, 10).forEach(__ -> producer.send(record));
-                producer.flush();
-            }
-
-            // consume messages
-            try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId)) {
-                shareConsumer.subscribe(List.of(topicName));
-                ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
-                assertEquals(10, records.count());
-            }
+            ClientState prodState = new ClientState();
+            final Set<String> produced = new HashSet<>();
+            service.execute(() -> {
+                    int i = 0;
+                    try (Producer<String, String> producer = 
createProducer(Map.of(
+                        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName(),
+                        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName()
+                    ))) {
+                        while (!prodState.done().get()) {
+                            String key = "key-" + (i++);
+                            ProducerRecord<String, String> record = new 
ProducerRecord<>(
+                                tpMulti.topic(),
+                                tpMulti.partition(),
+                                null,
+                                key,
+                                "value"
+                            );
+                            try {
+                                producer.send(record);
+                                producer.flush();
+                                // count only correctly produced records
+                                prodState.count().incrementAndGet();
+                                produced.add(key);
+                            } catch (Exception e) {
+                                // ignore
+                            }
+                        }
+                    }
+                }
+            );
 
-            // get current share coordinator node
-            SharePartitionKey key = SharePartitionKey.getInstance(groupId, new 
TopicIdPartition(topicId, tpMulti));
-            int shareGroupStateTp = 
Utils.abs(key.asCoordinatorKey().hashCode()) % 3;
-            List<Integer> curShareCoordNodeId = 
admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).allTopicNames().get().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME)
-                .partitions().stream()
-                .filter(info -> info.partition() == shareGroupStateTp)
-                .map(info -> info.leader().id())
-                .toList();
+            // consume messages - start after small delay
+            ClientState consState = new ClientState();
+            // using map here if we want to debug specific keys
+            Map<String, Integer> consumed = new HashMap<>();
+            service.schedule(() -> {
+                    try (ShareConsumer<String, String> shareConsumer = 
createShareConsumer(groupId, Map.of(
+                        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName(),
+                        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName()
+                    ))) {
+                        shareConsumer.subscribe(List.of(topicName));
+                        while (!consState.done().get()) {
+                            ConsumerRecords<String, String> records = 
shareConsumer.poll(Duration.ofMillis(2000L));
+                            consState.count().addAndGet(records.count());
+                            records.forEach(rec -> consumed.compute(rec.key(), 
(k, v) -> v == null ? 1 : v + 1));
+                            if (prodState.done().get() && records.count() == 
0) {
+                                consState.done().set(true);
+                            }
+                        }
+                    }
+                }, 100L, TimeUnit.MILLISECONDS
+            );
 
-            assertEquals(1, curShareCoordNodeId.size());
+            service.schedule(() -> {
+                    // get current share coordinator node
+                    SharePartitionKey key = 
SharePartitionKey.getInstance(groupId, new TopicIdPartition(topicId, tpMulti));
+                    int shareGroupStateTp = 
Utils.abs(key.asCoordinatorKey().hashCode()) % 3;
+                    List<Integer> curShareCoordNodeId = null;
+                    try {
+                        curShareCoordNodeId = 
admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).allTopicNames().get().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME)
+                            .partitions().stream()
+                            .filter(info -> info.partition() == 
shareGroupStateTp)
+                            .map(info -> info.leader().id())
+                            .toList();
+                    } catch (Exception e) {
+                        fail(e);
+                    }
+                    assertEquals(1, curShareCoordNodeId.size());
+
+                    // shutdown the coordinator
+                    KafkaBroker broker = 
cluster.brokers().get(curShareCoordNodeId.get(0));
+                    cluster.shutdownBroker(curShareCoordNodeId.get(0));
+
+                    // give some breathing time
+                    broker.awaitShutdown();
+
+                    List<Integer> newShareCoordNodeId = null;
+                    try {
+                        newShareCoordNodeId = 
admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).allTopicNames().get().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME)
+                            .partitions().stream()
+                            .filter(info -> info.partition() == 
shareGroupStateTp)
+                            .map(info -> info.leader().id())
+                            .toList();
+                    } catch (Exception e) {
+                        fail(e);
+                    }
 
-            // shutdown the coordinator
-            KafkaBroker broker = 
cluster.brokers().get(curShareCoordNodeId.get(0));
-            cluster.shutdownBroker(curShareCoordNodeId.get(0));
+                    assertEquals(1, newShareCoordNodeId.size());
+                    assertNotEquals(curShareCoordNodeId.get(0), 
newShareCoordNodeId.get(0));
+                }, 5L, TimeUnit.SECONDS
+            );
 
-            // give some breathing time
-            broker.awaitShutdown();
+            service.schedule(() -> {
+                    // stop the produce
+                    prodState.done().set(true);
+                }, 10L, TimeUnit.SECONDS
+            );
 
-            List<Integer> newShareCoordNodeId = 
admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).allTopicNames().get().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME)
-                .partitions().stream()
-                .filter(info -> info.partition() == shareGroupStateTp)
-                .map(info -> info.leader().id())
-                .toList();
+            TestUtils.waitForCondition(
+                () -> prodState.done().get() && consState.done().get(),
+                45_000L,
+                500L,
+                () -> "prod/cons not done yet"
+            );
 
-            assertEquals(1, newShareCoordNodeId.size());
-            assertNotEquals(curShareCoordNodeId.get(0), 
newShareCoordNodeId.get(0));
+            assertTrue(prodState.count().get() <= consState.count().get());

Review Comment:
   And we are making sure that we consumed at least as many records as were 
produced, not worrying about which records were redelivered (which I think is a 
perfectly good predicate for this test).



##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -1832,83 +1835,129 @@ public void 
testShareAutoOffsetResetByDurationInvalidFormat() throws Exception {
             @ClusterConfigProperty(key = "unstable.api.versions.enable", value 
= "true")
         }
     )
+    @Timeout(90)
     public void testShareConsumerAfterCoordinatorMovement() throws Exception {
         setup();
         String topicName = "multipart";
         String groupId = "multipartGrp";
         Uuid topicId = createTopic(topicName, 3, 3);
         alterShareAutoOffsetReset(groupId, "earliest");
+        ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
 
         try (Admin admin = createAdminClient()) {

Review Comment:
   I think the admin client is only needed in the runnable that moves the 
coordinator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to