AndrewJSchofield commented on code in PR #20909:
URL: https://github.com/apache/kafka/pull/20909#discussion_r2539709848
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -3447,6 +3730,37 @@ private void alterShareIsolationLevel(String groupId,
String newValue) {
}
}
+ private List<Integer> topicPartitionLeader(Admin adminClient, String
topicName, int partition)
+ throws InterruptedException, ExecutionException {
+ return
adminClient.describeTopics(List.of(topicName)).allTopicNames().get().get(topicName)
+ .partitions().stream()
+ .filter(info -> info.partition() == partition)
+ .map(info -> info.leader().id())
+ .filter(info -> info != -1)
+ .toList();
+ }
+
+ private SharePartitionOffsetInfo sharePartitionDescription(Admin
adminClient, String groupId, TopicPartition tp)
Review Comment:
nit: Method name seems wrong. It's returning the share-partition offset
info, not the description.
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -3447,6 +3730,37 @@ private void alterShareIsolationLevel(String groupId,
String newValue) {
}
}
+ private List<Integer> topicPartitionLeader(Admin adminClient, String
topicName, int partition)
+ throws InterruptedException, ExecutionException {
Review Comment:
nit: The indentation on this method and the next is a bit odd because the
throws and the start of the method body are equally indented. I'd just put the
throws on the same line as the argument list for these methods.
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -3136,6 +3134,291 @@ private void verifyYammerMetricCount(String
filterString, int count) {
assertEquals(count, ((Meter) renewAck).count());
}
+ @ClusterTest
+ public void testDescribeShareGroupOffsetsForEmptySharePartition() {
+ String groupId = "group1";
+ try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(groupId);
+ Admin adminClient = createAdminClient()) {
+ shareConsumer.subscribe(List.of(tp.topic()));
+ // Polling share consumer to make sure the share partition in
created.
+ shareConsumer.poll(Duration.ofMillis(2000));
+ SharePartitionOffsetInfo sharePartitionDescription =
sharePartitionDescription(adminClient, groupId, tp);
+ // Since the partition is empty, and no records have been
consumed, the share partition startOffset will be
+ // -1. Thus, there will be no description for the share partition.
+ assertNull(sharePartitionDescription);
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @ClusterTest
+ public void testSharePartitionLagForSingleShareConsumer() {
+ String groupId = "group1";
+ alterShareAutoOffsetReset(groupId, "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(groupId);
+ Admin adminClient = createAdminClient()) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"Message".getBytes());
+ producer.send(record);
+ producer.flush();
+ shareConsumer.subscribe(List.of(tp.topic()));
+ // Polling share consumer to make sure the share partition in
created and teh record is consumed.
+ waitedPoll(shareConsumer, 2500L, 1);
+ // Acknowledge and commit the consumed record to update the share
partition state.
+ shareConsumer.commitSync();
+ // After the acknowledgement is successful, the share partition
lag should be 0 because the only produced record has been consumed.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ // Producing another record to the share partition.
+ producer.send(record);
+ producer.flush();
+ // Since the new record has not been consumed yet, the share
partition lag should be 1.
+ verifySharePartitionLag(adminClient, groupId, tp, 1L);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @ClusterTest
+ public void testSharePartitionLagForMultipleShareConsumers() {
+ String groupId = "group1";
+ alterShareAutoOffsetReset(groupId, "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer1 =
createShareConsumer(groupId);
+ ShareConsumer<byte[], byte[]> shareConsumer2 =
createShareConsumer(groupId);
+ Admin adminClient = createAdminClient()) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"Message".getBytes());
+ producer.send(record);
+ producer.flush();
+ producer.flush();
+ shareConsumer1.subscribe(List.of(tp.topic()));
+ shareConsumer2.subscribe(List.of(tp.topic()));
+ // Polling share consumer 1 to make sure the share partition in
created and the records are consumed.
+ waitedPoll(shareConsumer1, 2500L, 1);
+ // Acknowledge and commit the consumed records to update the share
partition state.
+ shareConsumer1.commitSync();
+ // After the acknowledgement is successful, the share partition
lag should be 0 because the all produced records have been consumed.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ // Producing more records to the share partition.
+ producer.send(record);
+ // Polling share consumer 2 this time.
+ waitedPoll(shareConsumer2, 2500L, 1);
+ // Since the consumed record hasn't been acknowledged yet, the
share partition lag should be 1.
+ verifySharePartitionLag(adminClient, groupId, tp, 1L);
+ // Acknowledge and commit the consumed records to update the share
partition state.
+ shareConsumer2.commitSync();
+ // After the acknowledgement is successful, the share partition
lag should be 0 because the all produced records have been consumed.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ // Producing another record to the share partition.
+ producer.send(record);
+ producer.flush();
+ // Since the new record has not been consumed yet, the share
partition lag should be 1.
+ verifySharePartitionLag(adminClient, groupId, tp, 1L);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @ClusterTest
+ public void testSharePartitionLagWithReleaseAcknowledgement() {
+ String groupId = "group1";
+ alterShareAutoOffsetReset(groupId, "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(groupId,
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+ Admin adminClient = createAdminClient()) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"Message".getBytes());
+ producer.send(record);
+ producer.flush();
+ shareConsumer.subscribe(List.of(tp.topic()));
+ // Polling share consumer to make sure the share partition is
created and the record is consumed.
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 1);
+ // Accept the record first to move the offset forward and register
the state with persister.
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+ // After accepting, the lag should be 0 because the record is
consumed successfully.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ // Producing another record to the share partition.
+ producer.send(record);
+ producer.flush();
+ // The produced record is consumed.
+ records = waitedPoll(shareConsumer, 2500L, 1);
+ // Now release the record - it should be available for redelivery.
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.RELEASE));
+ shareConsumer.commitSync();
+ // After releasing the lag should be 1, because the record is
released for redelivery.
+ verifySharePartitionLag(adminClient, groupId, tp, 1L);
+ // The record is now consumed again.
+ records = waitedPoll(shareConsumer, 2500L, 1);
+ // Accept the record to mark it as consumed.
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+ // After accepting the record, the lag should be 0 because all the
produced records have been consumed.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @ClusterTest
+ public void testSharePartitionLagWithRejectAcknowledgement() {
+ String groupId = "group1";
+ alterShareAutoOffsetReset(groupId, "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(groupId,
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+ Admin adminClient = createAdminClient()) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"Message".getBytes());
+ producer.send(record);
+ producer.flush();
+ shareConsumer.subscribe(List.of(tp.topic()));
+ // Polling share consumer to make sure the share partition is
created and the record is consumed.
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 1);
+ // Accept the record first to move the offset forward and register
the state with persister.
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+ // After accepting, the lag should be 0 because the record is
consumed successfully.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ // Producing another record to the share partition.
+ producer.send(record);
+ producer.flush();
+ // The produced record is consumed.
+ records = waitedPoll(shareConsumer, 2500L, 1);
+ // Now reject the record - it should not be available for
redelivery.
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.REJECT));
+ shareConsumer.commitSync();
+ // After rejecting the lag should be 0, because the record is
permanently rejected and offset moves forward.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @ClusterTest(
+ brokers = 3,
+ serverProperties = {
+ @ClusterConfigProperty(key = "offsets.topic.num.partitions", value
= "3"),
+ @ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "3"),
+ @ClusterConfigProperty(key =
"share.coordinator.state.topic.num.partitions", value = "3"),
+ @ClusterConfigProperty(key =
"share.coordinator.state.topic.replication.factor", value = "3")
+ }
+ )
+ public void testSharePartitionLagOnGroupCoordinatorMovement() {
+ String groupId = "group1";
+ alterShareAutoOffsetReset(groupId, "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(groupId);
+ Admin adminClient = createAdminClient()) {
+ String topicName = "testTopicWithReplicas";
+ // Create a topic with replication factor 3
+ Uuid tpId = createTopic(topicName, 1, 3);
+ TopicPartition tp = new TopicPartition(topicName, 0);
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"Message".getBytes());
+ // Produce first record and consume it
+ producer.send(record);
+ producer.flush();
+ shareConsumer.subscribe(List.of(tp.topic()));
+ // Polling share consumer to make sure the share partition is
created and the record is consumed.
+ waitedPoll(shareConsumer, 2500L, 1);
+ // Acknowledge and commit the consumed record to update the share
partition state.
+ shareConsumer.commitSync();
+ // After the acknowledgement is successful, the share partition
lag should be 0 because the only produced record has been consumed.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ // Producing another record to the share partition.
+ producer.send(record);
+ producer.flush();
+ // Since the new record has not been consumed yet, the share
partition lag should be 1.
+ verifySharePartitionLag(adminClient, groupId, tp, 1L);
+ SharePartitionKey key = SharePartitionKey.getInstance(groupId, new
TopicIdPartition(tpId, tp));
+ int shareGroupStateTp =
Utils.abs(key.asCoordinatorKey().hashCode()) % 3;
+ int consumerOffsetsTp = Utils.abs(groupId.hashCode()) % 3;
+ List<Integer> curGroupCoordNodeId;
+ // Find the broker which is the group coordinator for the share
group.
+ curGroupCoordNodeId = topicPartitionLeader(adminClient,
Topic.GROUP_METADATA_TOPIC_NAME, consumerOffsetsTp);
+ assertEquals(1, curGroupCoordNodeId.size());
+ // Shut down the coordinator broker
+ KafkaBroker broker =
cluster.brokers().get(curGroupCoordNodeId.get(0));
+ cluster.shutdownBroker(curGroupCoordNodeId.get(0));
+ // Wait for it to be completely shutdown
+ broker.awaitShutdown();
+ // Wait for the leaders of share coordinator, group coordinator
and topic partition to be elected, if needed, on a different broker.
+ TestUtils.waitForCondition(() -> {
+ List<Integer> newShareCoordNodeId =
topicPartitionLeader(adminClient, Topic.SHARE_GROUP_STATE_TOPIC_NAME,
shareGroupStateTp);
+ List<Integer> newGroupCoordNodeId =
topicPartitionLeader(adminClient, Topic.GROUP_METADATA_TOPIC_NAME,
consumerOffsetsTp);
+ List<Integer> newTopicPartitionLeader =
topicPartitionLeader(adminClient, tp.topic(), tp.partition());
+
+ return newShareCoordNodeId.size() == 1 &&
!Objects.equals(newShareCoordNodeId.get(0), curGroupCoordNodeId.get(0)) &&
+ newGroupCoordNodeId.size() == 1 &&
!Objects.equals(newGroupCoordNodeId.get(0), curGroupCoordNodeId.get(0)) &&
+ newTopicPartitionLeader.size() == 1 &&
!Objects.equals(newTopicPartitionLeader.get(0), curGroupCoordNodeId.get(0));
+ }, DEFAULT_MAX_WAIT_MS, DEFAULT_POLL_INTERVAL_MS, () -> "Failed to
elect new leaders after broker shutdown");
+ // After group coordinator shutdown, check that lag is still 1
+ verifySharePartitionLag(adminClient, groupId, tp, 1L);
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @ClusterTest(
+ brokers = 3,
+ serverProperties = {
+ @ClusterConfigProperty(key = "offsets.topic.num.partitions", value
= "3"),
+ @ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "3"),
+ @ClusterConfigProperty(key =
"share.coordinator.state.topic.num.partitions", value = "3"),
+ @ClusterConfigProperty(key =
"share.coordinator.state.topic.replication.factor", value = "3")
+ }
+ )
+ public void testSharePartitionLagOnShareCoordinatorMovement() {
+ String groupId = "group1";
+ alterShareAutoOffsetReset(groupId, "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(groupId);
+ Admin adminClient = createAdminClient()) {
+ String topicName = "testTopicWithReplicas";
+ // Create a topic with replication factor 3
+ Uuid tpId = createTopic(topicName, 1, 3);
+ TopicPartition tp = new TopicPartition(topicName, 0);
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"Message".getBytes());
+ // Produce first record and consume it
+ producer.send(record);
+ producer.flush();
+ shareConsumer.subscribe(List.of(tp.topic()));
+ // Polling share consumer to make sure the share partition is
created and the record is consumed.
+ waitedPoll(shareConsumer, 2500L, 1);
+ // Acknowledge and commit the consumed record to update the share
partition state.
+ shareConsumer.commitSync();
+ // After the acknowledgement is successful, the share partition
lag should be 0 because the only produced record has been consumed.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ // Producing another record to the share partition.
+ producer.send(record);
+ producer.flush();
+ // Since the new record has not been consumed yet, the share
partition lag should be 1.
+ verifySharePartitionLag(adminClient, groupId, tp, 1L);
+ SharePartitionKey key = SharePartitionKey.getInstance(groupId, new
TopicIdPartition(tpId, tp));
+ int shareGroupStateTp =
Utils.abs(key.asCoordinatorKey().hashCode()) % 3;
+ int consumerOffsetsTp = Utils.abs(groupId.hashCode()) % 3;
+ List<Integer> curShareCoordNodeId;
+ // Find the broker which is the share coordinator for the share
partition.
+ curShareCoordNodeId = topicPartitionLeader(adminClient,
Topic.SHARE_GROUP_STATE_TOPIC_NAME, shareGroupStateTp);
+ assertEquals(1, curShareCoordNodeId.size());
+ // Shut down the coordinator broker
+ KafkaBroker broker =
cluster.brokers().get(curShareCoordNodeId.get(0));
+ cluster.shutdownBroker(curShareCoordNodeId.get(0));
+ // Wait for it to be completely shutdown
+ broker.awaitShutdown();
+ // Wait for the leaders of share coordinator, group coordinator
and topic partition to be elected, if needed, on a different broker.
+ TestUtils.waitForCondition(() -> {
+ List<Integer> newShareCoordNodeId =
topicPartitionLeader(adminClient, Topic.SHARE_GROUP_STATE_TOPIC_NAME,
shareGroupStateTp);
+ List<Integer> newGroupCoordNodeId =
topicPartitionLeader(adminClient, Topic.GROUP_METADATA_TOPIC_NAME,
consumerOffsetsTp);
+ List<Integer> newTopicPartitionLeader =
topicPartitionLeader(adminClient, tp.topic(), tp.partition());
+
+ return newShareCoordNodeId.size() == 1 &&
!Objects.equals(newShareCoordNodeId.get(0), curShareCoordNodeId.get(0)) &&
+ newGroupCoordNodeId.size() == 1 &&
!Objects.equals(newGroupCoordNodeId.get(0), curShareCoordNodeId.get(0)) &&
+ newTopicPartitionLeader.size() == 1 &&
!Objects.equals(newTopicPartitionLeader.get(0), curShareCoordNodeId.get(0));
+ }, DEFAULT_MAX_WAIT_MS, DEFAULT_POLL_INTERVAL_MS, () -> "Failed to
elect new leaders after broker shutdown");
+ // After share coordinator shutdown and new leaderS election,
check that lag is still 1
Review Comment:
nit: new "leader's" not "leaderS"
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -3136,6 +3134,291 @@ private void verifyYammerMetricCount(String
filterString, int count) {
assertEquals(count, ((Meter) renewAck).count());
}
+ @ClusterTest
+ public void testDescribeShareGroupOffsetsForEmptySharePartition() {
+ String groupId = "group1";
+ try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(groupId);
+ Admin adminClient = createAdminClient()) {
+ shareConsumer.subscribe(List.of(tp.topic()));
+ // Polling share consumer to make sure the share partition in
created.
+ shareConsumer.poll(Duration.ofMillis(2000));
+ SharePartitionOffsetInfo sharePartitionDescription =
sharePartitionDescription(adminClient, groupId, tp);
+ // Since the partition is empty, and no records have been
consumed, the share partition startOffset will be
+ // -1. Thus, there will be no description for the share partition.
+ assertNull(sharePartitionDescription);
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @ClusterTest
+ public void testSharePartitionLagForSingleShareConsumer() {
+ String groupId = "group1";
+ alterShareAutoOffsetReset(groupId, "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(groupId);
+ Admin adminClient = createAdminClient()) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"Message".getBytes());
+ producer.send(record);
+ producer.flush();
+ shareConsumer.subscribe(List.of(tp.topic()));
+ // Polling share consumer to make sure the share partition in
created and teh record is consumed.
+ waitedPoll(shareConsumer, 2500L, 1);
+ // Acknowledge and commit the consumed record to update the share
partition state.
+ shareConsumer.commitSync();
+ // After the acknowledgement is successful, the share partition
lag should be 0 because the only produced record has been consumed.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ // Producing another record to the share partition.
+ producer.send(record);
+ producer.flush();
+ // Since the new record has not been consumed yet, the share
partition lag should be 1.
+ verifySharePartitionLag(adminClient, groupId, tp, 1L);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @ClusterTest
+ public void testSharePartitionLagForMultipleShareConsumers() {
+ String groupId = "group1";
+ alterShareAutoOffsetReset(groupId, "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer1 =
createShareConsumer(groupId);
+ ShareConsumer<byte[], byte[]> shareConsumer2 =
createShareConsumer(groupId);
+ Admin adminClient = createAdminClient()) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"Message".getBytes());
+ producer.send(record);
+ producer.flush();
+ producer.flush();
+ shareConsumer1.subscribe(List.of(tp.topic()));
+ shareConsumer2.subscribe(List.of(tp.topic()));
+ // Polling share consumer 1 to make sure the share partition in
created and the records are consumed.
+ waitedPoll(shareConsumer1, 2500L, 1);
+ // Acknowledge and commit the consumed records to update the share
partition state.
+ shareConsumer1.commitSync();
+ // After the acknowledgement is successful, the share partition
lag should be 0 because the all produced records have been consumed.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ // Producing more records to the share partition.
+ producer.send(record);
+ // Polling share consumer 2 this time.
+ waitedPoll(shareConsumer2, 2500L, 1);
+ // Since the consumed record hasn't been acknowledged yet, the
share partition lag should be 1.
+ verifySharePartitionLag(adminClient, groupId, tp, 1L);
+ // Acknowledge and commit the consumed records to update the share
partition state.
+ shareConsumer2.commitSync();
+ // After the acknowledgement is successful, the share partition
lag should be 0 because the all produced records have been consumed.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ // Producing another record to the share partition.
+ producer.send(record);
+ producer.flush();
+ // Since the new record has not been consumed yet, the share
partition lag should be 1.
+ verifySharePartitionLag(adminClient, groupId, tp, 1L);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @ClusterTest
+ public void testSharePartitionLagWithReleaseAcknowledgement() {
+ String groupId = "group1";
+ alterShareAutoOffsetReset(groupId, "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(groupId,
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+ Admin adminClient = createAdminClient()) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"Message".getBytes());
+ producer.send(record);
+ producer.flush();
+ shareConsumer.subscribe(List.of(tp.topic()));
+ // Polling share consumer to make sure the share partition is
created and the record is consumed.
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 1);
+ // Accept the record first to move the offset forward and register
the state with persister.
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+ // After accepting, the lag should be 0 because the record is
consumed successfully.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ // Producing another record to the share partition.
+ producer.send(record);
+ producer.flush();
+ // The produced record is consumed.
+ records = waitedPoll(shareConsumer, 2500L, 1);
+ // Now release the record - it should be available for redelivery.
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.RELEASE));
+ shareConsumer.commitSync();
+ // After releasing the lag should be 1, because the record is
released for redelivery.
+ verifySharePartitionLag(adminClient, groupId, tp, 1L);
+ // The record is now consumed again.
+ records = waitedPoll(shareConsumer, 2500L, 1);
+ // Accept the record to mark it as consumed.
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+ // After accepting the record, the lag should be 0 because all the
produced records have been consumed.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @ClusterTest
+ public void testSharePartitionLagWithRejectAcknowledgement() {
+ String groupId = "group1";
+ alterShareAutoOffsetReset(groupId, "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(groupId,
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+ Admin adminClient = createAdminClient()) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"Message".getBytes());
+ producer.send(record);
+ producer.flush();
+ shareConsumer.subscribe(List.of(tp.topic()));
+ // Polling share consumer to make sure the share partition is
created and the record is consumed.
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 1);
+ // Accept the record first to move the offset forward and register
the state with persister.
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+ // After accepting, the lag should be 0 because the record is
consumed successfully.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ // Producing another record to the share partition.
+ producer.send(record);
+ producer.flush();
+ // The produced record is consumed.
+ records = waitedPoll(shareConsumer, 2500L, 1);
+ // Now reject the record - it should not be available for
redelivery.
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.REJECT));
+ shareConsumer.commitSync();
+ // After rejecting the lag should be 0, because the record is
permanently rejected and offset moves forward.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @ClusterTest(
+ brokers = 3,
+ serverProperties = {
+ @ClusterConfigProperty(key = "offsets.topic.num.partitions", value
= "3"),
+ @ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "3"),
+ @ClusterConfigProperty(key =
"share.coordinator.state.topic.num.partitions", value = "3"),
+ @ClusterConfigProperty(key =
"share.coordinator.state.topic.replication.factor", value = "3")
Review Comment:
I read the code from the bottom so my comment about not needing 3 partitions
for these internal topics stands here too. You do need replication factor 3.
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -3136,6 +3134,291 @@ private void verifyYammerMetricCount(String
filterString, int count) {
assertEquals(count, ((Meter) renewAck).count());
}
+ @ClusterTest
+ public void testDescribeShareGroupOffsetsForEmptySharePartition() {
+ String groupId = "group1";
+ try (ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(groupId);
+ Admin adminClient = createAdminClient()) {
+ shareConsumer.subscribe(List.of(tp.topic()));
+ // Polling share consumer to make sure the share partition in
created.
+ shareConsumer.poll(Duration.ofMillis(2000));
+ SharePartitionOffsetInfo sharePartitionDescription =
sharePartitionDescription(adminClient, groupId, tp);
+ // Since the partition is empty, and no records have been
consumed, the share partition startOffset will be
+ // -1. Thus, there will be no description for the share partition.
+ assertNull(sharePartitionDescription);
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @ClusterTest
+ public void testSharePartitionLagForSingleShareConsumer() {
+ String groupId = "group1";
+ alterShareAutoOffsetReset(groupId, "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(groupId);
+ Admin adminClient = createAdminClient()) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"Message".getBytes());
+ producer.send(record);
+ producer.flush();
+ shareConsumer.subscribe(List.of(tp.topic()));
+ // Polling share consumer to make sure the share partition in
created and teh record is consumed.
+ waitedPoll(shareConsumer, 2500L, 1);
+ // Acknowledge and commit the consumed record to update the share
partition state.
+ shareConsumer.commitSync();
+ // After the acknowledgement is successful, the share partition
lag should be 0 because the only produced record has been consumed.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ // Producing another record to the share partition.
+ producer.send(record);
+ producer.flush();
+ // Since the new record has not been consumed yet, the share
partition lag should be 1.
+ verifySharePartitionLag(adminClient, groupId, tp, 1L);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @ClusterTest
+ public void testSharePartitionLagForMultipleShareConsumers() {
+ String groupId = "group1";
+ alterShareAutoOffsetReset(groupId, "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer1 =
createShareConsumer(groupId);
+ ShareConsumer<byte[], byte[]> shareConsumer2 =
createShareConsumer(groupId);
+ Admin adminClient = createAdminClient()) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"Message".getBytes());
+ producer.send(record);
+ producer.flush();
+ producer.flush();
+ shareConsumer1.subscribe(List.of(tp.topic()));
+ shareConsumer2.subscribe(List.of(tp.topic()));
+ // Polling share consumer 1 to make sure the share partition in
created and the records are consumed.
+ waitedPoll(shareConsumer1, 2500L, 1);
+ // Acknowledge and commit the consumed records to update the share
partition state.
+ shareConsumer1.commitSync();
+ // After the acknowledgement is successful, the share partition
lag should be 0 because the all produced records have been consumed.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ // Producing more records to the share partition.
+ producer.send(record);
+ // Polling share consumer 2 this time.
+ waitedPoll(shareConsumer2, 2500L, 1);
+ // Since the consumed record hasn't been acknowledged yet, the
share partition lag should be 1.
+ verifySharePartitionLag(adminClient, groupId, tp, 1L);
+ // Acknowledge and commit the consumed records to update the share
partition state.
+ shareConsumer2.commitSync();
+ // After the acknowledgement is successful, the share partition
lag should be 0 because the all produced records have been consumed.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ // Producing another record to the share partition.
+ producer.send(record);
+ producer.flush();
+ // Since the new record has not been consumed yet, the share
partition lag should be 1.
+ verifySharePartitionLag(adminClient, groupId, tp, 1L);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @ClusterTest
+ public void testSharePartitionLagWithReleaseAcknowledgement() {
+ String groupId = "group1";
+ alterShareAutoOffsetReset(groupId, "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(groupId,
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+ Admin adminClient = createAdminClient()) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"Message".getBytes());
+ producer.send(record);
+ producer.flush();
+ shareConsumer.subscribe(List.of(tp.topic()));
+ // Polling share consumer to make sure the share partition is
created and the record is consumed.
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 1);
+ // Accept the record first to move the offset forward and register
the state with persister.
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+ // After accepting, the lag should be 0 because the record is
consumed successfully.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ // Producing another record to the share partition.
+ producer.send(record);
+ producer.flush();
+ // The produced record is consumed.
+ records = waitedPoll(shareConsumer, 2500L, 1);
+ // Now release the record - it should be available for redelivery.
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.RELEASE));
+ shareConsumer.commitSync();
+ // After releasing the lag should be 1, because the record is
released for redelivery.
+ verifySharePartitionLag(adminClient, groupId, tp, 1L);
+ // The record is now consumed again.
+ records = waitedPoll(shareConsumer, 2500L, 1);
+ // Accept the record to mark it as consumed.
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+ // After accepting the record, the lag should be 0 because all the
produced records have been consumed.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @ClusterTest
+ public void testSharePartitionLagWithRejectAcknowledgement() {
+ String groupId = "group1";
+ alterShareAutoOffsetReset(groupId, "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(groupId,
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+ Admin adminClient = createAdminClient()) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"Message".getBytes());
+ producer.send(record);
+ producer.flush();
+ shareConsumer.subscribe(List.of(tp.topic()));
+ // Polling share consumer to make sure the share partition is
created and the record is consumed.
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 1);
+ // Accept the record first to move the offset forward and register
the state with persister.
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+ // After accepting, the lag should be 0 because the record is
consumed successfully.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ // Producing another record to the share partition.
+ producer.send(record);
+ producer.flush();
+ // The produced record is consumed.
+ records = waitedPoll(shareConsumer, 2500L, 1);
+ // Now reject the record - it should not be available for
redelivery.
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.REJECT));
+ shareConsumer.commitSync();
+ // After rejecting the lag should be 0, because the record is
permanently rejected and offset moves forward.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @ClusterTest(
+ brokers = 3,
+ serverProperties = {
+ @ClusterConfigProperty(key = "offsets.topic.num.partitions", value
= "3"),
+ @ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "3"),
+ @ClusterConfigProperty(key =
"share.coordinator.state.topic.num.partitions", value = "3"),
+ @ClusterConfigProperty(key =
"share.coordinator.state.topic.replication.factor", value = "3")
+ }
+ )
+ public void testSharePartitionLagOnGroupCoordinatorMovement() {
+ String groupId = "group1";
+ alterShareAutoOffsetReset(groupId, "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(groupId);
+ Admin adminClient = createAdminClient()) {
+ String topicName = "testTopicWithReplicas";
+ // Create a topic with replication factor 3
+ Uuid tpId = createTopic(topicName, 1, 3);
+ TopicPartition tp = new TopicPartition(topicName, 0);
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"Message".getBytes());
+ // Produce first record and consume it
+ producer.send(record);
+ producer.flush();
+ shareConsumer.subscribe(List.of(tp.topic()));
+ // Polling share consumer to make sure the share partition is
created and the record is consumed.
+ waitedPoll(shareConsumer, 2500L, 1);
+ // Acknowledge and commit the consumed record to update the share
partition state.
+ shareConsumer.commitSync();
+ // After the acknowledgement is successful, the share partition
lag should be 0 because the only produced record has been consumed.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ // Producing another record to the share partition.
+ producer.send(record);
+ producer.flush();
+ // Since the new record has not been consumed yet, the share
partition lag should be 1.
+ verifySharePartitionLag(adminClient, groupId, tp, 1L);
+ SharePartitionKey key = SharePartitionKey.getInstance(groupId, new
TopicIdPartition(tpId, tp));
+ int shareGroupStateTp =
Utils.abs(key.asCoordinatorKey().hashCode()) % 3;
+ int consumerOffsetsTp = Utils.abs(groupId.hashCode()) % 3;
+ List<Integer> curGroupCoordNodeId;
+ // Find the broker which is the group coordinator for the share
group.
+ curGroupCoordNodeId = topicPartitionLeader(adminClient,
Topic.GROUP_METADATA_TOPIC_NAME, consumerOffsetsTp);
+ assertEquals(1, curGroupCoordNodeId.size());
+ // Shut down the coordinator broker
+ KafkaBroker broker =
cluster.brokers().get(curGroupCoordNodeId.get(0));
+ cluster.shutdownBroker(curGroupCoordNodeId.get(0));
+ // Wait for it to be completely shutdown
+ broker.awaitShutdown();
+ // Wait for the leaders of share coordinator, group coordinator
and topic partition to be elected, if needed, on a different broker.
+ TestUtils.waitForCondition(() -> {
+ List<Integer> newShareCoordNodeId =
topicPartitionLeader(adminClient, Topic.SHARE_GROUP_STATE_TOPIC_NAME,
shareGroupStateTp);
+ List<Integer> newGroupCoordNodeId =
topicPartitionLeader(adminClient, Topic.GROUP_METADATA_TOPIC_NAME,
consumerOffsetsTp);
+ List<Integer> newTopicPartitionLeader =
topicPartitionLeader(adminClient, tp.topic(), tp.partition());
+
+ return newShareCoordNodeId.size() == 1 &&
!Objects.equals(newShareCoordNodeId.get(0), curGroupCoordNodeId.get(0)) &&
+ newGroupCoordNodeId.size() == 1 &&
!Objects.equals(newGroupCoordNodeId.get(0), curGroupCoordNodeId.get(0)) &&
+ newTopicPartitionLeader.size() == 1 &&
!Objects.equals(newTopicPartitionLeader.get(0), curGroupCoordNodeId.get(0));
+ }, DEFAULT_MAX_WAIT_MS, DEFAULT_POLL_INTERVAL_MS, () -> "Failed to
elect new leaders after broker shutdown");
+ // After group coordinator shutdown, check that lag is still 1
+ verifySharePartitionLag(adminClient, groupId, tp, 1L);
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @ClusterTest(
+ brokers = 3,
+ serverProperties = {
+ @ClusterConfigProperty(key = "offsets.topic.num.partitions", value
= "3"),
+ @ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "3"),
+ @ClusterConfigProperty(key =
"share.coordinator.state.topic.num.partitions", value = "3"),
Review Comment:
You do need replication factor 3, but you only need one partition for each
of these topics. Then you'd not have to fiddle around trying to calculate the
partitions used for the internal topics.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]