showuon commented on code in PR #13665: URL: https://github.com/apache/kafka/pull/13665#discussion_r1230868853
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ########## @@ -2521,11 +2524,36 @@ public void testCommitOffsetMetadata() { AtomicBoolean success = new AtomicBoolean(false); - Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, new OffsetAndMetadata(100L, "hello")); + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(100L, "hello"); + Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, offsetAndMetadata); + Map<TopicPartition, OffsetAndMetadata> cache = coordinator.committedOffsetsCache(); + assertTrue(cache.isEmpty()); coordinator.commitOffsetsAsync(offsets, callback(offsets, success)); coordinator.invokeCompletedOffsetCommitCallbacks(); assertTrue(success.get()); assertEquals(coordinator.inFlightAsyncCommits.get(), 0); Review Comment: I know this is not your change, but please also update it. Thanks. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ########## @@ -3114,6 +3142,54 @@ public void testFetchCommittedOffsets() { assertEquals(new OffsetAndMetadata(offset, leaderEpoch, metadata), fetchedOffsets.get(t1p)); } + @Test + public void testPopulatingOffsetCacheForAssignedPartition() { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + Map<TopicPartition, OffsetAndMetadata> committedOffsetsCache = coordinator.committedOffsetsCache(); + // committedOffsetsCache should be empty + assertTrue(committedOffsetsCache.isEmpty()); + + long offset = 500L; + String metadata = "blahblah"; + Optional<Integer> leaderEpoch = Optional.of(15); + OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, leaderEpoch, + metadata, Errors.NONE); + + client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t1p, data))); + subscriptions.assignFromUser(singleton(t1p)); + Map<TopicPartition, OffsetAndMetadata> fetchedOffsets = coordinator.fetchCommittedOffsets(singleton(t1p), + time.timer(Long.MAX_VALUE)); + + assertNotNull(fetchedOffsets); + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset, leaderEpoch, metadata); + assertEquals(offsetAndMetadata, fetchedOffsets.get(t1p)); + + // check committedOffsetsCache is populated + assertEquals(committedOffsetsCache.size(), 1); Review Comment: ` assertEquals( 1, committedOffsetsCache.size());` ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ########## @@ -3114,6 +3142,54 @@ public void testFetchCommittedOffsets() { assertEquals(new OffsetAndMetadata(offset, leaderEpoch, metadata), fetchedOffsets.get(t1p)); } + @Test + public void testPopulatingOffsetCacheForAssignedPartition() { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + Map<TopicPartition, OffsetAndMetadata> committedOffsetsCache = coordinator.committedOffsetsCache(); + // committedOffsetsCache should be empty + assertTrue(committedOffsetsCache.isEmpty()); + + long offset = 500L; + String metadata = "blahblah"; + Optional<Integer> leaderEpoch = Optional.of(15); + OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, leaderEpoch, + metadata, Errors.NONE); + + client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t1p, data))); + subscriptions.assignFromUser(singleton(t1p)); + Map<TopicPartition, OffsetAndMetadata> fetchedOffsets = coordinator.fetchCommittedOffsets(singleton(t1p), + time.timer(Long.MAX_VALUE)); + + assertNotNull(fetchedOffsets); + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset, leaderEpoch, metadata); + assertEquals(offsetAndMetadata, fetchedOffsets.get(t1p)); + + // check committedOffsetsCache is populated + assertEquals(committedOffsetsCache.size(), 1); + assertEquals(offsetAndMetadata, committedOffsetsCache.get(t1p)); + + // fetch again with t1p + t2p, but will send fetch for t2p since t1p is in cache + long offsetPartition2 = 50L; + String metadataPartition2 = "foobar"; + Optional<Integer> leaderEpochPartition2 = Optional.of(19909); + data = new OffsetFetchResponse.PartitionData(offsetPartition2, leaderEpochPartition2, + metadataPartition2, Errors.NONE); + client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t2p, data))); + + fetchedOffsets = coordinator.fetchCommittedOffsets(new HashSet<>(Arrays.asList(t1p, t2p)), time.timer(Long.MAX_VALUE)); + + assertNotNull(fetchedOffsets); + + assertEquals(fetchedOffsets.size(), 2); // tp1 and tp2 should be returned with tp1 coming from cache + assertEquals(committedOffsetsCache.size(), 1); // cache size is still 1 since only tp1 is an owned partition Review Comment: parameter order in assertEquals method. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ########## @@ -3818,13 +3899,16 @@ private void gracefulCloseTest(ConsumerCoordinator coordinator, boolean shouldLe return commitRequest.data().groupId().equals(groupId); }, new OffsetCommitResponse(new OffsetCommitResponseData())); + // add t1p to the committed offset metadata cache, we'll then check that the cache is invalidated after revocation which happens during close + coordinator.committedOffsetsCache().put(t1p, new OffsetAndMetadata(1L)); Review Comment: I know you've added a test to verify revoke case, but it's an instance close case. Could we add one more revoke test in `testRejoinGroup` for a usual revoke case? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org