showuon commented on code in PR #13665: URL: https://github.com/apache/kafka/pull/13665#discussion_r1230345113
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -127,6 +127,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // hold onto request&future for committed offset requests to enable async calls. private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null; + // holds the offset metadata for assigned partitions to reduce remote calls thus speeding up fetching partition metadata + private final Map<TopicPartition, OffsetAndMetadata> committedTopicPartitionOffsetsCache; Review Comment: nit: the comment above should mention this is the `committed offset metadata` ########## clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java: ########## @@ -922,13 +922,9 @@ public void testCommitsFetchedDuringAssign() { // fetch offset for two topics Map<TopicPartition, Long> offsets = new HashMap<>(); - offsets.put(tp0, offset1); - client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator); - assertEquals(offset1, consumer.committed(Collections.singleton(tp0)).get(tp0).offset()); - - offsets.remove(tp0); offsets.put(tp1, offset2); Review Comment: Could we add a comment above about why we only need to respond with `tp1, offset2`? Something about it's been cached in previous committed offset fetch. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ########## @@ -2521,11 +2521,34 @@ public void testCommitOffsetMetadata() { AtomicBoolean success = new AtomicBoolean(false); - Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, new OffsetAndMetadata(100L, "hello")); + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(100L, "hello"); + Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, offsetAndMetadata); coordinator.commitOffsetsAsync(offsets, callback(offsets, success)); coordinator.invokeCompletedOffsetCommitCallbacks(); assertTrue(success.get()); assertEquals(coordinator.inFlightAsyncCommits.get(), 0); + Map<TopicPartition, OffsetAndMetadata> cache = coordinator.committedOffsetsCache(); + assertEquals(cache.size(), 1); + assertEquals(cache.get(t1p), offsetAndMetadata); + } + + @Test + public void testCommitOffsetMetadataSync() { Review Comment: Thanks for adding the sync test ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ########## @@ -2521,11 +2521,34 @@ public void testCommitOffsetMetadata() { AtomicBoolean success = new AtomicBoolean(false); - Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, new OffsetAndMetadata(100L, "hello")); + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(100L, "hello"); + Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, offsetAndMetadata); coordinator.commitOffsetsAsync(offsets, callback(offsets, success)); coordinator.invokeCompletedOffsetCommitCallbacks(); assertTrue(success.get()); assertEquals(coordinator.inFlightAsyncCommits.get(), 0); + Map<TopicPartition, OffsetAndMetadata> cache = coordinator.committedOffsetsCache(); + assertEquals(cache.size(), 1); + assertEquals(cache.get(t1p), offsetAndMetadata); Review Comment: assertEquals method signature is `assertEquals(int expected, int actual)`. Putting the parameter in the correct order will output the reasonable error message if any. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ########## @@ -2521,11 +2521,34 @@ public void testCommitOffsetMetadata() { AtomicBoolean success = new AtomicBoolean(false); - Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, new OffsetAndMetadata(100L, "hello")); + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(100L, "hello"); + Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, offsetAndMetadata); coordinator.commitOffsetsAsync(offsets, callback(offsets, success)); coordinator.invokeCompletedOffsetCommitCallbacks(); assertTrue(success.get()); assertEquals(coordinator.inFlightAsyncCommits.get(), 0); + Map<TopicPartition, OffsetAndMetadata> cache = coordinator.committedOffsetsCache(); + assertEquals(cache.size(), 1); + assertEquals(cache.get(t1p), offsetAndMetadata); Review Comment: Also, could we assert cache is empty before we `commitOffsetsAsync`? i.e. ``` assertTrue(cache.isEmpty()); coordinator.commitOffsetsAsync(...) ... ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -351,6 +359,10 @@ private Exception invokePartitionsRevoked(final SortedSet<TopicPartition> revoke final long startMs = time.milliseconds(); listener.onPartitionsRevoked(revokedPartitions); sensors.revokeCallbackSensor.record(time.milliseconds() - startMs); + // remove the offset metadata cache for revoked partitions + for (TopicPartition revokedPartition: revokedPartitions) { + this.committedTopicPartitionOffsetsCache.remove(revokedPartition); + } Review Comment: The revoke and lost partition cases should also be tested, to verify we indeed remove them from cache. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ########## @@ -3114,6 +3137,70 @@ public void testFetchCommittedOffsets() { assertEquals(new OffsetAndMetadata(offset, leaderEpoch, metadata), fetchedOffsets.get(t1p)); } + @Test + public void testPopulatingOffsetCacheForAssignedPartition() { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + Map<TopicPartition, OffsetAndMetadata> committedOffsetsCache = coordinator.committedOffsetsCache(); + // committedOffsetsCache should be empty + assertEquals(committedOffsetsCache.size(), 0); + + long offset = 500L; + String metadata = "blahblah"; + Optional<Integer> leaderEpoch = Optional.of(15); + OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, leaderEpoch, + metadata, Errors.NONE); + + client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t1p, data))); + subscriptions.assignFromUser(singleton(t1p)); + Map<TopicPartition, OffsetAndMetadata> fetchedOffsets = coordinator.fetchCommittedOffsets(singleton(t1p), + time.timer(Long.MAX_VALUE)); + + assertNotNull(fetchedOffsets); + OffsetAndMetadata expected = new OffsetAndMetadata(offset, leaderEpoch, metadata); + assertEquals(expected, fetchedOffsets.get(t1p)); + + // check committedOffsetsCache is populated + assertEquals(committedOffsetsCache.size(), 1); + assertEquals(expected, committedOffsetsCache.get(t1p)); + } + + @Test + public void testReturningCachedOffsetForAssignedPartition() { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + Map<TopicPartition, OffsetAndMetadata> committedOffsetsCache = coordinator.committedOffsetsCache(); + + long offset = 500L; + String metadata = "blahblah"; + Optional<Integer> leaderEpoch = Optional.of(15); + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset, leaderEpoch, metadata); + committedOffsetsCache.put(t1p, offsetAndMetadata); + Review Comment: additional line ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ########## @@ -3114,6 +3137,70 @@ public void testFetchCommittedOffsets() { assertEquals(new OffsetAndMetadata(offset, leaderEpoch, metadata), fetchedOffsets.get(t1p)); } + @Test + public void testPopulatingOffsetCacheForAssignedPartition() { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + Map<TopicPartition, OffsetAndMetadata> committedOffsetsCache = coordinator.committedOffsetsCache(); + // committedOffsetsCache should be empty + assertEquals(committedOffsetsCache.size(), 0); + + long offset = 500L; + String metadata = "blahblah"; + Optional<Integer> leaderEpoch = Optional.of(15); + OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, leaderEpoch, + metadata, Errors.NONE); + + client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t1p, data))); + subscriptions.assignFromUser(singleton(t1p)); + Map<TopicPartition, OffsetAndMetadata> fetchedOffsets = coordinator.fetchCommittedOffsets(singleton(t1p), + time.timer(Long.MAX_VALUE)); + + assertNotNull(fetchedOffsets); + OffsetAndMetadata expected = new OffsetAndMetadata(offset, leaderEpoch, metadata); + assertEquals(expected, fetchedOffsets.get(t1p)); + + // check committedOffsetsCache is populated + assertEquals(committedOffsetsCache.size(), 1); + assertEquals(expected, committedOffsetsCache.get(t1p)); + } + + @Test + public void testReturningCachedOffsetForAssignedPartition() { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + Map<TopicPartition, OffsetAndMetadata> committedOffsetsCache = coordinator.committedOffsetsCache(); + + long offset = 500L; + String metadata = "blahblah"; + Optional<Integer> leaderEpoch = Optional.of(15); + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset, leaderEpoch, metadata); + committedOffsetsCache.put(t1p, offsetAndMetadata); Review Comment: I'm thinking we can merge this test with the above `testPopulatingOffsetCacheForAssignedPartition` test, so that we don't have to initialize all the data again. Something like this: ``` // check committedOffsetsCache is populated assertEquals(committedOffsetsCache.size(), 1); assertEquals(expected, committedOffsetsCache.get(t1p)); client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t2p, data))); // fetch again with t1p + t2p, but will send fetch for t2p since t1p is in cache Map<TopicPartition, OffsetAndMetadata> fetchedOffsets = coordinator.fetchCommittedOffsets(new HashSet<>(Arrays.asList(t1p, t2p)), time.timer(Long.MAX_VALUE)); assertNotNull(fetchedOffsets); // return 2 results assertEquals(fetchedOffsets.size(), 2); // the cache size is still 1 assertEquals(committedOffsetsCache.size(), 1); ..... ``` WDYT? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ########## @@ -2521,11 +2521,34 @@ public void testCommitOffsetMetadata() { AtomicBoolean success = new AtomicBoolean(false); - Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, new OffsetAndMetadata(100L, "hello")); + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(100L, "hello"); + Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, offsetAndMetadata); coordinator.commitOffsetsAsync(offsets, callback(offsets, success)); coordinator.invokeCompletedOffsetCommitCallbacks(); assertTrue(success.get()); assertEquals(coordinator.inFlightAsyncCommits.get(), 0); + Map<TopicPartition, OffsetAndMetadata> cache = coordinator.committedOffsetsCache(); + assertEquals(cache.size(), 1); + assertEquals(cache.get(t1p), offsetAndMetadata); Review Comment: Same comments applies to below tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org