AndrewJSchofield commented on code in PR #18679:
URL: https://github.com/apache/kafka/pull/18679#discussion_r1928461498


##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -1806,6 +1816,181 @@ public void 
testShareAutoOffsetResetByDurationInvalidFormat() throws Exception {
         }
     }
 
+    @ClusterTest(
+        brokers = 3,
+        serverProperties = {
+            @ClusterConfigProperty(key = "auto.create.topics.enable", value = 
"false"),
+            @ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+            @ClusterConfigProperty(key = "group.share.enable", value = "true"),
+            @ClusterConfigProperty(key = 
"group.share.partition.max.record.locks", value = "10000"),
+            @ClusterConfigProperty(key = 
"group.share.record.lock.duration.ms", value = "15000"),
+            @ClusterConfigProperty(key = "offsets.topic.num.partitions", value 
= "3"),
+            @ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.min.isr", value = "1"),

Review Comment:
   It seems to me that this override is unnecessary with 3 brokers and the 
default of 2 is better.



##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -1806,6 +1816,181 @@ public void 
testShareAutoOffsetResetByDurationInvalidFormat() throws Exception {
         }
     }
 
+    @ClusterTest(
+        brokers = 3,
+        serverProperties = {
+            @ClusterConfigProperty(key = "auto.create.topics.enable", value = 
"false"),
+            @ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+            @ClusterConfigProperty(key = "group.share.enable", value = "true"),
+            @ClusterConfigProperty(key = 
"group.share.partition.max.record.locks", value = "10000"),
+            @ClusterConfigProperty(key = 
"group.share.record.lock.duration.ms", value = "15000"),
+            @ClusterConfigProperty(key = "offsets.topic.num.partitions", value 
= "3"),
+            @ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.min.isr", value = "1"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.num.partitions", value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.replication.factor", value = "3"),
+            @ClusterConfigProperty(key = "transaction.state.log.min.isr", 
value = "1"),
+            @ClusterConfigProperty(key = 
"transaction.state.log.replication.factor", value = "3"),
+            @ClusterConfigProperty(key = "unstable.api.versions.enable", value 
= "true")
+        }
+    )
+    public void testShareConsumerAfterCoordinatorMovement() throws Exception {
+        setup();
+        String topicName = "multipart";
+        String groupId = "multipartGrp";
+        createTopic(topicName, 3, 3);

Review Comment:
   Actually this `createTopic` method knows the topic ID and discards it. I 
suggest changing the signature of this method to return a Uuid and then the 
topic ID is known without the extra call to `Admin.describeTopics`.



##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -1806,6 +1816,181 @@ public void 
testShareAutoOffsetResetByDurationInvalidFormat() throws Exception {
         }
     }
 
+    @ClusterTest(
+        brokers = 3,
+        serverProperties = {
+            @ClusterConfigProperty(key = "auto.create.topics.enable", value = 
"false"),
+            @ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+            @ClusterConfigProperty(key = "group.share.enable", value = "true"),
+            @ClusterConfigProperty(key = 
"group.share.partition.max.record.locks", value = "10000"),
+            @ClusterConfigProperty(key = 
"group.share.record.lock.duration.ms", value = "15000"),
+            @ClusterConfigProperty(key = "offsets.topic.num.partitions", value 
= "3"),
+            @ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.min.isr", value = "1"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.num.partitions", value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.replication.factor", value = "3"),
+            @ClusterConfigProperty(key = "transaction.state.log.min.isr", 
value = "1"),
+            @ClusterConfigProperty(key = 
"transaction.state.log.replication.factor", value = "3"),
+            @ClusterConfigProperty(key = "unstable.api.versions.enable", value 
= "true")
+        }
+    )
+    public void testShareConsumerAfterCoordinatorMovement() throws Exception {
+        setup();
+        String topicName = "multipart";
+        String groupId = "multipartGrp";
+        createTopic(topicName, 3, 3);
+
+        try (Admin admin = createAdminClient()) {
+            TopicPartition tpMulti = new TopicPartition(topicName, 0);
+
+            // get topic id
+            Uuid topicId = 
admin.describeTopics(List.of(topicName)).topicNameValues().get(topicName).get().topicId();
+
+            // produce some messages
+            try (Producer<byte[], byte[]> producer = createProducer()) {
+                ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
+                    tpMulti.topic(),
+                    tpMulti.partition(),
+                    null,
+                    "key".getBytes(),
+                    "value".getBytes()
+                );
+                IntStream.range(0, 10).forEach(__ -> producer.send(record));
+                producer.flush();
+            }
+
+            // consume messages
+            try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId)) {
+                shareConsumer.subscribe(List.of(topicName));
+                alterShareAutoOffsetReset(groupId, "earliest");
+                ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+                assertEquals(10, records.count());
+            }
+
+            // get current share coordinator node
+            SharePartitionKey key = SharePartitionKey.getInstance(groupId, new 
TopicIdPartition(topicId, tpMulti));
+            int shareGroupStateTp = 
Utils.abs(key.asCoordinatorKey().hashCode()) % 3;
+            List<Integer> curShareCoordNodeId = 
admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).topicNameValues().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME).get()
+                .partitions().stream()
+                .filter(info -> info.partition() == shareGroupStateTp)
+                .map(info -> info.leader().id())
+                .toList();
+
+            assertEquals(1, curShareCoordNodeId.size());
+
+            // shutdown the coordinator
+            cluster.shutdownBroker(curShareCoordNodeId.get(0));
+
+            // give some breathing time

Review Comment:
   Could you refactor this to wait for the condition rather than sleep for 2 
seconds and expect immediate compliance?



##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -2015,4 +2204,88 @@ private void alterShareAutoOffsetReset(String groupId, 
String newValue) {
                 .get(60, TimeUnit.SECONDS), "Failed to alter configs");
         }
     }
+
+    private static class ComplexShareConsumer<K, V> implements Runnable {

Review Comment:
   This really needs a comment about what it's up to.



##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -1806,6 +1816,181 @@ public void 
testShareAutoOffsetResetByDurationInvalidFormat() throws Exception {
         }
     }
 
+    @ClusterTest(
+        brokers = 3,
+        serverProperties = {
+            @ClusterConfigProperty(key = "auto.create.topics.enable", value = 
"false"),
+            @ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+            @ClusterConfigProperty(key = "group.share.enable", value = "true"),
+            @ClusterConfigProperty(key = 
"group.share.partition.max.record.locks", value = "10000"),
+            @ClusterConfigProperty(key = 
"group.share.record.lock.duration.ms", value = "15000"),
+            @ClusterConfigProperty(key = "offsets.topic.num.partitions", value 
= "3"),
+            @ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.min.isr", value = "1"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.num.partitions", value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.replication.factor", value = "3"),
+            @ClusterConfigProperty(key = "transaction.state.log.min.isr", 
value = "1"),

Review Comment:
   Leave this as the default of 2 also I think.



##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -1806,6 +1816,181 @@ public void 
testShareAutoOffsetResetByDurationInvalidFormat() throws Exception {
         }
     }
 
+    @ClusterTest(
+        brokers = 3,
+        serverProperties = {
+            @ClusterConfigProperty(key = "auto.create.topics.enable", value = 
"false"),
+            @ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+            @ClusterConfigProperty(key = "group.share.enable", value = "true"),
+            @ClusterConfigProperty(key = 
"group.share.partition.max.record.locks", value = "10000"),
+            @ClusterConfigProperty(key = 
"group.share.record.lock.duration.ms", value = "15000"),
+            @ClusterConfigProperty(key = "offsets.topic.num.partitions", value 
= "3"),
+            @ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.min.isr", value = "1"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.num.partitions", value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.replication.factor", value = "3"),
+            @ClusterConfigProperty(key = "transaction.state.log.min.isr", 
value = "1"),
+            @ClusterConfigProperty(key = 
"transaction.state.log.replication.factor", value = "3"),

Review Comment:
   Actually, the default replication factor of all three of these internal 
topics is 3 also, which should be ideal for this test with 3 brokers.



##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -1806,6 +1816,181 @@ public void 
testShareAutoOffsetResetByDurationInvalidFormat() throws Exception {
         }
     }
 
+    @ClusterTest(
+        brokers = 3,
+        serverProperties = {
+            @ClusterConfigProperty(key = "auto.create.topics.enable", value = 
"false"),
+            @ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+            @ClusterConfigProperty(key = "group.share.enable", value = "true"),
+            @ClusterConfigProperty(key = 
"group.share.partition.max.record.locks", value = "10000"),
+            @ClusterConfigProperty(key = 
"group.share.record.lock.duration.ms", value = "15000"),
+            @ClusterConfigProperty(key = "offsets.topic.num.partitions", value 
= "3"),
+            @ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.min.isr", value = "1"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.num.partitions", value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.replication.factor", value = "3"),
+            @ClusterConfigProperty(key = "transaction.state.log.min.isr", 
value = "1"),
+            @ClusterConfigProperty(key = 
"transaction.state.log.replication.factor", value = "3"),
+            @ClusterConfigProperty(key = "unstable.api.versions.enable", value 
= "true")
+        }
+    )
+    public void testShareConsumerAfterCoordinatorMovement() throws Exception {
+        setup();
+        String topicName = "multipart";
+        String groupId = "multipartGrp";
+        createTopic(topicName, 3, 3);
+
+        try (Admin admin = createAdminClient()) {
+            TopicPartition tpMulti = new TopicPartition(topicName, 0);
+
+            // get topic id
+            Uuid topicId = 
admin.describeTopics(List.of(topicName)).topicNameValues().get(topicName).get().topicId();
+
+            // produce some messages
+            try (Producer<byte[], byte[]> producer = createProducer()) {
+                ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
+                    tpMulti.topic(),
+                    tpMulti.partition(),
+                    null,
+                    "key".getBytes(),
+                    "value".getBytes()
+                );
+                IntStream.range(0, 10).forEach(__ -> producer.send(record));
+                producer.flush();
+            }
+
+            // consume messages
+            try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId)) {
+                shareConsumer.subscribe(List.of(topicName));
+                alterShareAutoOffsetReset(groupId, "earliest");

Review Comment:
   Experience with other tests tells us that it's best to set the auto-offset 
reset earlier in the test. It's an asynchronous action that may or may not take 
effect by the time consumption begins.



##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -1806,6 +1816,181 @@ public void 
testShareAutoOffsetResetByDurationInvalidFormat() throws Exception {
         }
     }
 
+    @ClusterTest(
+        brokers = 3,
+        serverProperties = {
+            @ClusterConfigProperty(key = "auto.create.topics.enable", value = 
"false"),
+            @ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+            @ClusterConfigProperty(key = "group.share.enable", value = "true"),
+            @ClusterConfigProperty(key = 
"group.share.partition.max.record.locks", value = "10000"),
+            @ClusterConfigProperty(key = 
"group.share.record.lock.duration.ms", value = "15000"),
+            @ClusterConfigProperty(key = "offsets.topic.num.partitions", value 
= "3"),
+            @ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.min.isr", value = "1"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.num.partitions", value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.replication.factor", value = "3"),
+            @ClusterConfigProperty(key = "transaction.state.log.min.isr", 
value = "1"),
+            @ClusterConfigProperty(key = 
"transaction.state.log.replication.factor", value = "3"),
+            @ClusterConfigProperty(key = "unstable.api.versions.enable", value 
= "true")
+        }
+    )
+    public void testShareConsumerAfterCoordinatorMovement() throws Exception {
+        setup();
+        String topicName = "multipart";
+        String groupId = "multipartGrp";
+        createTopic(topicName, 3, 3);
+
+        try (Admin admin = createAdminClient()) {
+            TopicPartition tpMulti = new TopicPartition(topicName, 0);
+
+            // get topic id
+            Uuid topicId = 
admin.describeTopics(List.of(topicName)).topicNameValues().get(topicName).get().topicId();
+
+            // produce some messages
+            try (Producer<byte[], byte[]> producer = createProducer()) {
+                ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
+                    tpMulti.topic(),
+                    tpMulti.partition(),
+                    null,
+                    "key".getBytes(),
+                    "value".getBytes()
+                );
+                IntStream.range(0, 10).forEach(__ -> producer.send(record));
+                producer.flush();
+            }
+
+            // consume messages
+            try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId)) {
+                shareConsumer.subscribe(List.of(topicName));
+                alterShareAutoOffsetReset(groupId, "earliest");
+                ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+                assertEquals(10, records.count());
+            }
+
+            // get current share coordinator node
+            SharePartitionKey key = SharePartitionKey.getInstance(groupId, new 
TopicIdPartition(topicId, tpMulti));
+            int shareGroupStateTp = 
Utils.abs(key.asCoordinatorKey().hashCode()) % 3;
+            List<Integer> curShareCoordNodeId = 
admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).topicNameValues().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME).get()

Review Comment:
   I'd prefer 
`admin.describeTopics(...).allTopicNames().get(Topic.SHARE_GROUP_TOPIC_NAME)`. 
I think that yields the same result more tersely.



##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -2015,4 +2204,88 @@ private void alterShareAutoOffsetReset(String groupId, 
String newValue) {
                 .get(60, TimeUnit.SECONDS), "Failed to alter configs");
         }
     }
+
+    private static class ComplexShareConsumer<K, V> implements Runnable {
+        public static final int POLL_TIMEOUT_MS = 15000;
+        public static final int MAX_DELIVERY_COUNT = 5;

Review Comment:
   This 5 is really `ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT`.



##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -1806,6 +1816,181 @@ public void 
testShareAutoOffsetResetByDurationInvalidFormat() throws Exception {
         }
     }
 
+    @ClusterTest(
+        brokers = 3,
+        serverProperties = {
+            @ClusterConfigProperty(key = "auto.create.topics.enable", value = 
"false"),
+            @ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+            @ClusterConfigProperty(key = "group.share.enable", value = "true"),
+            @ClusterConfigProperty(key = 
"group.share.partition.max.record.locks", value = "10000"),
+            @ClusterConfigProperty(key = 
"group.share.record.lock.duration.ms", value = "15000"),
+            @ClusterConfigProperty(key = "offsets.topic.num.partitions", value 
= "3"),
+            @ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.min.isr", value = "1"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.num.partitions", value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.replication.factor", value = "3"),
+            @ClusterConfigProperty(key = "transaction.state.log.min.isr", 
value = "1"),
+            @ClusterConfigProperty(key = 
"transaction.state.log.replication.factor", value = "3"),
+            @ClusterConfigProperty(key = "unstable.api.versions.enable", value 
= "true")
+        }
+    )
+    public void testShareConsumerAfterCoordinatorMovement() throws Exception {
+        setup();
+        String topicName = "multipart";
+        String groupId = "multipartGrp";
+        createTopic(topicName, 3, 3);
+
+        try (Admin admin = createAdminClient()) {
+            TopicPartition tpMulti = new TopicPartition(topicName, 0);
+
+            // get topic id
+            Uuid topicId = 
admin.describeTopics(List.of(topicName)).topicNameValues().get(topicName).get().topicId();
+
+            // produce some messages
+            try (Producer<byte[], byte[]> producer = createProducer()) {
+                ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
+                    tpMulti.topic(),
+                    tpMulti.partition(),
+                    null,
+                    "key".getBytes(),
+                    "value".getBytes()
+                );
+                IntStream.range(0, 10).forEach(__ -> producer.send(record));
+                producer.flush();
+            }
+
+            // consume messages
+            try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId)) {
+                shareConsumer.subscribe(List.of(topicName));
+                alterShareAutoOffsetReset(groupId, "earliest");
+                ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+                assertEquals(10, records.count());
+            }
+
+            // get current share coordinator node
+            SharePartitionKey key = SharePartitionKey.getInstance(groupId, new 
TopicIdPartition(topicId, tpMulti));
+            int shareGroupStateTp = 
Utils.abs(key.asCoordinatorKey().hashCode()) % 3;
+            List<Integer> curShareCoordNodeId = 
admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).topicNameValues().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME).get()
+                .partitions().stream()
+                .filter(info -> info.partition() == shareGroupStateTp)
+                .map(info -> info.leader().id())
+                .toList();
+
+            assertEquals(1, curShareCoordNodeId.size());
+
+            // shutdown the coordinator
+            cluster.shutdownBroker(curShareCoordNodeId.get(0));
+
+            // give some breathing time
+            TimeUnit.SECONDS.sleep(2L);
+
+            List<Integer> newShareCoordNodeId = 
admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).topicNameValues().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME).get()
+                .partitions().stream()
+                .filter(info -> info.partition() == shareGroupStateTp)
+                .map(info -> info.leader().id())
+                .toList();
+
+            assertEquals(1, newShareCoordNodeId.size());
+            assertNotEquals(curShareCoordNodeId.get(0), 
newShareCoordNodeId.get(0));
+
+            // again produce to same topic partition
+            try (Producer<byte[], byte[]> producer = createProducer()) {
+                ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
+                    tpMulti.topic(),
+                    tpMulti.partition(),
+                    null,
+                    "key".getBytes(),
+                    "value".getBytes()
+                );
+                IntStream.range(0, 10).forEach(__ -> producer.send(record));
+                producer.flush();
+            }
+
+            // consume messages should only be possible if partition and share 
coord has moved
+            // from shutdown broker since we are only producing to partition 0 
of topic.
+            try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId)) {
+                shareConsumer.subscribe(List.of(topicName));
+                alterShareAutoOffsetReset(groupId, "earliest");
+                ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+                assertEquals(20, records.count());
+            }
+
+            verifyShareGroupStateTopicRecordsProduced();
+        }
+    }
+
+    @ClusterTest(
+        brokers = 3,
+        serverProperties = {
+            @ClusterConfigProperty(key = "auto.create.topics.enable", value = 
"false"),
+            @ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+            @ClusterConfigProperty(key = "group.share.enable", value = "true"),
+            @ClusterConfigProperty(key = 
"group.share.partition.max.record.locks", value = "10000"),
+            @ClusterConfigProperty(key = 
"group.share.record.lock.duration.ms", value = "15000"),
+            @ClusterConfigProperty(key = "offsets.topic.num.partitions", value 
= "3"),
+            @ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.min.isr", value = "1"),

Review Comment:
   Same comments as previous test regarding min.isr and replication factor 
defaults which are appropriate without overrides.



##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -1806,6 +1816,181 @@ public void 
testShareAutoOffsetResetByDurationInvalidFormat() throws Exception {
         }
     }
 
+    @ClusterTest(
+        brokers = 3,
+        serverProperties = {
+            @ClusterConfigProperty(key = "auto.create.topics.enable", value = 
"false"),
+            @ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+            @ClusterConfigProperty(key = "group.share.enable", value = "true"),
+            @ClusterConfigProperty(key = 
"group.share.partition.max.record.locks", value = "10000"),
+            @ClusterConfigProperty(key = 
"group.share.record.lock.duration.ms", value = "15000"),
+            @ClusterConfigProperty(key = "offsets.topic.num.partitions", value 
= "3"),
+            @ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.min.isr", value = "1"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.num.partitions", value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.replication.factor", value = "3"),
+            @ClusterConfigProperty(key = "transaction.state.log.min.isr", 
value = "1"),
+            @ClusterConfigProperty(key = 
"transaction.state.log.replication.factor", value = "3"),
+            @ClusterConfigProperty(key = "unstable.api.versions.enable", value 
= "true")
+        }
+    )
+    public void testShareConsumerAfterCoordinatorMovement() throws Exception {
+        setup();
+        String topicName = "multipart";
+        String groupId = "multipartGrp";
+        createTopic(topicName, 3, 3);
+
+        try (Admin admin = createAdminClient()) {
+            TopicPartition tpMulti = new TopicPartition(topicName, 0);
+
+            // get topic id
+            Uuid topicId = 
admin.describeTopics(List.of(topicName)).topicNameValues().get(topicName).get().topicId();
+
+            // produce some messages
+            try (Producer<byte[], byte[]> producer = createProducer()) {
+                ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
+                    tpMulti.topic(),
+                    tpMulti.partition(),
+                    null,
+                    "key".getBytes(),
+                    "value".getBytes()
+                );
+                IntStream.range(0, 10).forEach(__ -> producer.send(record));
+                producer.flush();
+            }
+
+            // consume messages
+            try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId)) {
+                shareConsumer.subscribe(List.of(topicName));
+                alterShareAutoOffsetReset(groupId, "earliest");
+                ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+                assertEquals(10, records.count());
+            }
+
+            // get current share coordinator node
+            SharePartitionKey key = SharePartitionKey.getInstance(groupId, new 
TopicIdPartition(topicId, tpMulti));
+            int shareGroupStateTp = 
Utils.abs(key.asCoordinatorKey().hashCode()) % 3;
+            List<Integer> curShareCoordNodeId = 
admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).topicNameValues().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME).get()
+                .partitions().stream()
+                .filter(info -> info.partition() == shareGroupStateTp)
+                .map(info -> info.leader().id())
+                .toList();
+
+            assertEquals(1, curShareCoordNodeId.size());
+
+            // shutdown the coordinator
+            cluster.shutdownBroker(curShareCoordNodeId.get(0));
+
+            // give some breathing time
+            TimeUnit.SECONDS.sleep(2L);
+
+            List<Integer> newShareCoordNodeId = 
admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).topicNameValues().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME).get()
+                .partitions().stream()
+                .filter(info -> info.partition() == shareGroupStateTp)
+                .map(info -> info.leader().id())
+                .toList();
+
+            assertEquals(1, newShareCoordNodeId.size());
+            assertNotEquals(curShareCoordNodeId.get(0), 
newShareCoordNodeId.get(0));
+
+            // again produce to same topic partition
+            try (Producer<byte[], byte[]> producer = createProducer()) {
+                ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
+                    tpMulti.topic(),
+                    tpMulti.partition(),
+                    null,
+                    "key".getBytes(),
+                    "value".getBytes()
+                );
+                IntStream.range(0, 10).forEach(__ -> producer.send(record));
+                producer.flush();
+            }
+
+            // consume messages should only be possible if partition and share 
coord has moved
+            // from shutdown broker since we are only producing to partition 0 
of topic.
+            try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId)) {
+                shareConsumer.subscribe(List.of(topicName));
+                alterShareAutoOffsetReset(groupId, "earliest");
+                ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+                assertEquals(20, records.count());
+            }
+
+            verifyShareGroupStateTopicRecordsProduced();
+        }
+    }
+
+    @ClusterTest(
+        brokers = 3,
+        serverProperties = {
+            @ClusterConfigProperty(key = "auto.create.topics.enable", value = 
"false"),
+            @ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+            @ClusterConfigProperty(key = "group.share.enable", value = "true"),
+            @ClusterConfigProperty(key = 
"group.share.partition.max.record.locks", value = "10000"),
+            @ClusterConfigProperty(key = 
"group.share.record.lock.duration.ms", value = "15000"),
+            @ClusterConfigProperty(key = "offsets.topic.num.partitions", value 
= "3"),
+            @ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.min.isr", value = "1"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.num.partitions", value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.replication.factor", value = "3"),
+            @ClusterConfigProperty(key = "transaction.state.log.min.isr", 
value = "1"),
+            @ClusterConfigProperty(key = 
"transaction.state.log.replication.factor", value = "3"),
+            @ClusterConfigProperty(key = "unstable.api.versions.enable", value 
= "true")
+        }
+    )
+    public void testComplexShareConsumer() throws Exception {
+        setup();
+        String topicName = "multipart";
+        String groupId = "multipartGrp";
+        createTopic(topicName, 3, 3);
+        TopicPartition multiTp = new TopicPartition(topicName, 0);
+
+        ExecutorService executer = Executors.newCachedThreadPool();
+
+        AtomicBoolean prodDone = new AtomicBoolean(false);
+        AtomicInteger sentCount = new AtomicInteger(0);
+
+        // produce messages until we want
+        executer.execute(() -> {
+            while (!prodDone.get()) {

Review Comment:
   This makes a separate producer per loop iteration. I suggest re-using the 
producer across loop iterations. You can still use try-with-resources.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to