dajac commented on code in PR #14466: URL: https://github.com/apache/kafka/pull/14466#discussion_r1349894415
########## core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala: ########## @@ -63,4 +69,156 @@ class BaseAsyncConsumerTest extends AbstractConsumerTest { assertNull(committedOffset.get(tp)) assertTrue(consumer.assignment.contains(tp)) } + + @Test + def testCommitSyncAllConsumed(): Unit = { + val numRecords = 10000 + + val producer = createProducer() + val startingTimestamp = System.currentTimeMillis() + sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + + val consumer = createConsumer() + consumer.assign(List(tp).asJava) + consumer.seek(tp, 0) + consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, startingTimestamp = startingTimestamp) + + consumer.commitSync() + val committedOffset = consumer.committed(Set(tp).asJava) + assertNotNull(committedOffset) + assertNotNull(committedOffset.get(tp)) + assertEquals(numRecords, committedOffset.get(tp).offset()) + } + + @Test + def testSimpleConsume(): Unit = { + val numRecords = 10 + + val producer = createProducer() + val startingTimestamp = System.currentTimeMillis() + sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + + val consumer = createConsumer(configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) + consumer.assign(List(tp).asJava) + consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, startingTimestamp = startingTimestamp) + + assertEquals(numRecords, consumer.position(tp)) + } + + @Test + def testSimpleConsumeSkippingPosition(): Unit = { + val numRecords = 10 + + val producer = createProducer() + val startingTimestamp = System.currentTimeMillis() + sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + + val consumer = createConsumer(configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) + consumer.assign(List(tp).asJava) + val offset = 1 + consumer.seek(tp, offset) + consumeAndVerifyRecords(consumer = consumer, numRecords - offset, startingOffset = offset, + startingKeyAndValueIndex = offset, startingTimestamp = startingTimestamp + offset) + + assertEquals(numRecords, consumer.position(tp)) + } + + @Test + def testSimpleConsumeWithLeaderChangeValidatingPositions(): Unit = { + val numRecords = 10 + val producer = createProducer() + val startingTimestamp = System.currentTimeMillis() + sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + val consumer = createConsumer(configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) + consumer.assign(List(tp).asJava) + consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, startingTimestamp = startingTimestamp) + + // Force leader epoch change to trigger position validation + var parts: mutable.Buffer[PartitionInfo] = null + while (parts == null) + parts = consumer.partitionsFor(tp.topic()).asScala + val leader = parts.head.leader().id() + this.servers(leader).shutdown() + this.servers(leader).startup() + + // Consume after leader change + sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 10, + startingTimestamp = startingTimestamp) + } + + @Test + def testFetchCommittedOffsets(): Unit = { + val numRecords = 100 + val startingTimestamp = System.currentTimeMillis() + val producer = createProducer() + sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + val consumer = createAsyncConsumer() + consumer.assign(List(tp).asJava) + // First consumer consumes and commits offsets + consumer.seek(tp, 0) + consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, + startingTimestamp = startingTimestamp) + consumer.commitSync() + assertEquals(numRecords, consumer.committed(Set(tp).asJava).get(tp).offset) + // We should see the committed offsets from another consumer + val anotherConsumer = createAsyncConsumer() + anotherConsumer.assign(List(tp).asJava) + assertEquals(numRecords, anotherConsumer.committed(Set(tp).asJava).get(tp).offset) + } + + @Test + def testConsumeFromCommittedOffsets(): Unit = { + val producer = createProducer() + val numRecords = 100 + val startingTimestamp = System.currentTimeMillis() + sendRecords(producer, numRecords = numRecords, tp, startingTimestamp = startingTimestamp) + + // Commit offset with first consumer + val consumer = createConsumerWithGroupId("group1") + consumer.assign(List(tp).asJava) + val offset = 10 + consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(offset))) + .asJava) + assertEquals(offset, consumer.committed(Set(tp).asJava).get(tp).offset) + consumer.close() + + // Consume from committed offsets with another consumer in same group + val anotherConsumer = createConsumerWithGroupId("group1") + assertEquals(offset, anotherConsumer.committed(Set(tp).asJava).get(tp).offset) + anotherConsumer.assign(List(tp).asJava) + consumeAndVerifyRecords(consumer = anotherConsumer, numRecords - offset, + startingOffset = offset, startingKeyAndValueIndex = offset, + startingTimestamp = startingTimestamp + offset) + } + + @Test + def testRetrievingCommittedOffsetsMultipleTimes(): Unit = { + val numRecords = 100 + val startingTimestamp = System.currentTimeMillis() + val producer = createProducer() + sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + + val consumer = createAsyncConsumer() + consumer.assign(List(tp).asJava) + + // Consume and commit offsets + consumer.seek(tp, 0) + consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, + startingTimestamp = startingTimestamp) + consumer.commitSync() + + // Check committed offsets twice with same consumer + assertEquals(numRecords, consumer.committed(Set(tp).asJava).get(tp).offset) + assertEquals(numRecords, consumer.committed(Set(tp).asJava).get(tp).offset) + } + + @Test + def testEmptyGroupNotSupported(): Unit = { + // This replaces the existing testConsumingWithEmptyGroupId, given that empty group ID is not + // supported in the new consumer implementation + val consumer = createConsumerWithGroupId("") + consumer.assign(List(tp).asJava) + assertThrows(classOf[InvalidGroupIdException], () => consumer.commitSync()) Review Comment: I assume that this change about the empty group id was done on purpose? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org