smjn commented on code in PR #21246:
URL: https://github.com/apache/kafka/pull/21246#discussion_r2661703679
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -3411,6 +3413,95 @@ public void
testSharePartitionLagOnShareCoordinatorMovement() {
}
}
+ @ClusterTest
+ public void testSharePartitionLagAfterAlterShareGroupOffsets() {
+ String groupId = "group1";
+ try (Producer<byte[], byte[]> producer = createProducer();
+ Admin adminClient = createAdminClient()) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"Message".getBytes());
+ // Producing 100 records to the topic partition.
+ for (int i = 0; i < 100; i++) {
+ producer.send(record);
+ }
+ producer.flush();
+
+ // Create a new share consumer. Since the share.auto.offset.reset
is not altered, it should be latest by default.
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(groupId,
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+ shareConsumer.subscribe(List.of(tp.topic()));
+ // Polling share consumer to make sure it joins the group and
subscribes to the topic.
+ waitedPoll(shareConsumer, 2500L, 0, true, groupId, List.of(new
TopicPartition(tp.topic(), 0)));
+ // Producing 5 additional records to the topic partition.
+ for (int i = 0; i < 5; i++) {
+ producer.send(record);
+ }
+ producer.flush();
+ // Polling share consumer to make sure the records are consumed.
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 5);
+ assertEquals(5, records.count());
+ // Accept the record first to move the offset forward and register
the state with persister.
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+ // After accepting, the lag should be 0 because the record is
consumed successfully.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ // Closing the share consumer so that the offsets can be altered.
+ shareConsumer.close();
+ // Alter the start offset of the share partition to 0.
+ alterShareGroupOffsets(adminClient, groupId, tp, 0L);
+ // After altering, the share partition start offset should be 0.
+ verifySharePartitionStartOffset(adminClient, groupId, tp, 0L);
+ // Verify that the lag is now 105 since the start offset is
altered to 0 and there are total 105 records in the partition.
+ verifySharePartitionLag(adminClient, groupId, tp, 105L);
+ } catch (InterruptedException | ExecutionException e) {
+ fail("Test failed with exception: " + e.getMessage());
+ }
+ }
+
+ @ClusterTest
+ public void testSharePartitionLagAfterDeleteShareGroupOffsets() {
+ String groupId = "group1";
+ alterShareAutoOffsetReset(groupId, "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ Admin adminClient = createAdminClient()) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"Message".getBytes());
+ // Producing 5 records to the topic partition.
+ for (int i = 0; i < 5; i++) {
+ producer.send(record);
+ }
+ producer.flush();
+ // Create a new share consumer. Since the share.auto.offset.reset
is not altered, it should be latest by default.
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(groupId,
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+ shareConsumer.subscribe(List.of(tp.topic()));
+ // Polling share consumer to make sure it joins the group and
consumes the produced records.
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 5);
+ assertEquals(5, records.count());
+ // Accept the records first to move the offset forward and
register the state with persister.
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+ // After accepting, the lag should be 0 because the record is
consumed successfully.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ // Closing the share consumer so that the offsets can be deleted.
+ shareConsumer.close();
+ // Delete the start offset of the share partition to 0.
Review Comment:
comment does not make sense, what does "delete start offset mean"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]