chirag-wadhwa5 commented on code in PR #21246:
URL: https://github.com/apache/kafka/pull/21246#discussion_r2663649779


##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -3411,6 +3413,95 @@ public void 
testSharePartitionLagOnShareCoordinatorMovement() {
         }
     }
 
+    @ClusterTest
+    public void testSharePartitionLagAfterAlterShareGroupOffsets() {
+        String groupId = "group1";
+        try (Producer<byte[], byte[]> producer = createProducer();
+             Admin adminClient = createAdminClient()) {
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"Message".getBytes());
+            // Producing 100 records to the topic partition.
+            for (int i = 0; i < 100; i++) {
+                producer.send(record);
+            }
+            producer.flush();
+
+            // Create a new share consumer. Since the share.auto.offset.reset 
is not altered, it should be latest by default.
+            ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId, 
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+            shareConsumer.subscribe(List.of(tp.topic()));
+            // Polling share consumer to make sure it joins the group and 
subscribes to the topic.
+            waitedPoll(shareConsumer, 2500L, 0, true, groupId, List.of(new 
TopicPartition(tp.topic(), 0)));
+            // Producing 5 additional records to the topic partition.
+            for (int i = 0; i < 5; i++) {
+                producer.send(record);
+            }
+            producer.flush();
+            // Polling share consumer to make sure the records are consumed.
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 5);
+            assertEquals(5, records.count());
+            // Accept the record first to move the offset forward and register 
the state with persister.
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.ACCEPT));
+            shareConsumer.commitSync();
+            // After accepting, the lag should be 0 because the record is 
consumed successfully.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+            // Closing the share consumer so that the offsets can be altered.
+            shareConsumer.close();
+            // Alter the start offset of the share partition to 40.
+            alterShareGroupOffsets(adminClient, groupId, tp, 40L);
+            // After altering, the share partition start offset should be 40.
+            verifySharePartitionStartOffset(adminClient, groupId, tp, 40L);
+            // Verify that the lag is now 65 since the start offset is altered 
to 40 and there are total 105 records in the partition.
+            verifySharePartitionLag(adminClient, groupId, tp, 65L);
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Test failed with exception: " + e.getMessage());
+        }
+    }
+
+    @ClusterTest
+    public void testSharePartitionLagAfterDeleteShareGroupOffsets() {
+        String groupId = "group1";
+        alterShareAutoOffsetReset(groupId, "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             Admin adminClient = createAdminClient()) {
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"Message".getBytes());
+            // Producing 5 records to the topic partition.
+            for (int i = 0; i < 5; i++) {
+                producer.send(record);
+            }
+            producer.flush();
+            // Create a new share consumer. Since the share.auto.offset.reset 
is not altered, it should be latest by default.
+            ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId, 
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+            shareConsumer.subscribe(List.of(tp.topic()));
+            // Polling share consumer to make sure it joins the group and 
consumes the produced records.
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 5);
+            assertEquals(5, records.count());
+            // Accept the records first to move the offset forward and 
register the state with persister.
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.ACCEPT));
+            shareConsumer.commitSync();
+            // After accepting, the lag should be 0 because the record is 
consumed successfully.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+            // Closing the share consumer so that the offsets can be deleted.
+            shareConsumer.close();
+            // Delete the share group offsets.
+            deleteShareGroupOffsets(adminClient, groupId, tp.topic());
+            // Create a new share consumer.

Review Comment:
   Thanks for the review. The issue with that is although 
DescribeShareGroupOffsets would return default value for startOffset (-1) after 
the offsets are deleted, but adminClient.listOffsets does not return such 
partitions. It returns null for such partitions. I can maybe have a check that 
the future value returned for this particular partition Is null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to