smjn commented on code in PR #21246:
URL: https://github.com/apache/kafka/pull/21246#discussion_r2661703679


##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -3411,6 +3413,95 @@ public void 
testSharePartitionLagOnShareCoordinatorMovement() {
         }
     }
 
+    @ClusterTest
+    public void testSharePartitionLagAfterAlterShareGroupOffsets() {
+        String groupId = "group1";
+        try (Producer<byte[], byte[]> producer = createProducer();
+             Admin adminClient = createAdminClient()) {
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"Message".getBytes());
+            // Producing 100 records to the topic partition.
+            for (int i = 0; i < 100; i++) {
+                producer.send(record);
+            }
+            producer.flush();
+
+            // Create a new share consumer. Since the share.auto.offset.reset 
is not altered, it should be latest by default.
+            ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId, 
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+            shareConsumer.subscribe(List.of(tp.topic()));
+            // Polling share consumer to make sure it joins the group and 
subscribes to the topic.
+            waitedPoll(shareConsumer, 2500L, 0, true, groupId, List.of(new 
TopicPartition(tp.topic(), 0)));
+            // Producing 5 additional records to the topic partition.
+            for (int i = 0; i < 5; i++) {
+                producer.send(record);
+            }
+            producer.flush();
+            // Polling share consumer to make sure the records are consumed.
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 5);
+            assertEquals(5, records.count());
+            // Accept the record first to move the offset forward and register 
the state with persister.
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.ACCEPT));
+            shareConsumer.commitSync();
+            // After accepting, the lag should be 0 because the record is 
consumed successfully.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+            // Closing the share consumer so that the offsets can be altered.
+            shareConsumer.close();
+            // Alter the start offset of the share partition to 0.
+            alterShareGroupOffsets(adminClient, groupId, tp, 0L);
+            // After altering, the share partition start offset should be 0.
+            verifySharePartitionStartOffset(adminClient, groupId, tp, 0L);
+            // Verify that the lag is now 105 since the start offset is 
altered to 0 and there are total 105 records in the partition.
+            verifySharePartitionLag(adminClient, groupId, tp, 105L);
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Test failed with exception: " + e.getMessage());
+        }
+    }
+
+    @ClusterTest
+    public void testSharePartitionLagAfterDeleteShareGroupOffsets() {
+        String groupId = "group1";
+        alterShareAutoOffsetReset(groupId, "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             Admin adminClient = createAdminClient()) {
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"Message".getBytes());
+            // Producing 5 records to the topic partition.
+            for (int i = 0; i < 5; i++) {
+                producer.send(record);
+            }
+            producer.flush();
+            // Create a new share consumer. Since the share.auto.offset.reset 
is not altered, it should be latest by default.
+            ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId, 
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+            shareConsumer.subscribe(List.of(tp.topic()));
+            // Polling share consumer to make sure it joins the group and 
consumes the produced records.
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 5);
+            assertEquals(5, records.count());
+            // Accept the records first to move the offset forward and 
register the state with persister.
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.ACCEPT));
+            shareConsumer.commitSync();
+            // After accepting, the lag should be 0 because the record is 
consumed successfully.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+            // Closing the share consumer so that the offsets can be deleted.
+            shareConsumer.close();
+            // Delete the start offset of the share partition to 0.

Review Comment:
   comment does not make sense, what does "delete start offset mean"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to