chirag-wadhwa5 commented on code in PR #21246:
URL: https://github.com/apache/kafka/pull/21246#discussion_r2663649779
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -3411,6 +3413,95 @@ public void
testSharePartitionLagOnShareCoordinatorMovement() {
}
}
+ @ClusterTest
+ public void testSharePartitionLagAfterAlterShareGroupOffsets() {
+ String groupId = "group1";
+ try (Producer<byte[], byte[]> producer = createProducer();
+ Admin adminClient = createAdminClient()) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"Message".getBytes());
+ // Producing 100 records to the topic partition.
+ for (int i = 0; i < 100; i++) {
+ producer.send(record);
+ }
+ producer.flush();
+
+ // Create a new share consumer. Since the share.auto.offset.reset
is not altered, it should be latest by default.
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(groupId,
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+ shareConsumer.subscribe(List.of(tp.topic()));
+ // Polling share consumer to make sure it joins the group and
subscribes to the topic.
+ waitedPoll(shareConsumer, 2500L, 0, true, groupId, List.of(new
TopicPartition(tp.topic(), 0)));
+ // Producing 5 additional records to the topic partition.
+ for (int i = 0; i < 5; i++) {
+ producer.send(record);
+ }
+ producer.flush();
+ // Polling share consumer to make sure the records are consumed.
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 5);
+ assertEquals(5, records.count());
+ // Accept the record first to move the offset forward and register
the state with persister.
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+ // After accepting, the lag should be 0 because the record is
consumed successfully.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ // Closing the share consumer so that the offsets can be altered.
+ shareConsumer.close();
+ // Alter the start offset of the share partition to 40.
+ alterShareGroupOffsets(adminClient, groupId, tp, 40L);
+ // After altering, the share partition start offset should be 40.
+ verifySharePartitionStartOffset(adminClient, groupId, tp, 40L);
+ // Verify that the lag is now 65 since the start offset is altered
to 40 and there are total 105 records in the partition.
+ verifySharePartitionLag(adminClient, groupId, tp, 65L);
+ } catch (InterruptedException | ExecutionException e) {
+ fail("Test failed with exception: " + e.getMessage());
+ }
+ }
+
+ @ClusterTest
+ public void testSharePartitionLagAfterDeleteShareGroupOffsets() {
+ String groupId = "group1";
+ alterShareAutoOffsetReset(groupId, "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ Admin adminClient = createAdminClient()) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"Message".getBytes());
+ // Producing 5 records to the topic partition.
+ for (int i = 0; i < 5; i++) {
+ producer.send(record);
+ }
+ producer.flush();
+ // Create a new share consumer. Since the share.auto.offset.reset
is not altered, it should be latest by default.
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(groupId,
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+ shareConsumer.subscribe(List.of(tp.topic()));
+ // Polling share consumer to make sure it joins the group and
consumes the produced records.
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 5);
+ assertEquals(5, records.count());
+ // Accept the records first to move the offset forward and
register the state with persister.
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+ // After accepting, the lag should be 0 because the record is
consumed successfully.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ // Closing the share consumer so that the offsets can be deleted.
+ shareConsumer.close();
+ // Delete the share group offsets.
+ deleteShareGroupOffsets(adminClient, groupId, tp.topic());
+ // Create a new share consumer.
Review Comment:
Thanks for the review. The issue with that is although
DescribeShareGroupOffsets would return default value for startOffset (-1) after
the offsets are deleted, but adminClient.listOffsets does not return such
partitions. It returns null for such partitions. I can maybe have a check that
the future value returned for this particular partition Is null.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]