jolshan commented on code in PR #15237: URL: https://github.com/apache/kafka/pull/15237#discussion_r1470235881
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ########## @@ -2056,4 +2059,70 @@ public void testCompleteTransactionWithUnexpectedPartition() { assertFutureThrows(future, IllegalStateException.class); } + + @Test + public void testOnPartitionsDeleted() { + int partitionCount = 3; + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime, + new GroupCoordinatorMetrics() + ); + + service.startup(() -> partitionCount); + + when(runtime.partitions()).thenReturn( + IntStream + .range(0, partitionCount) + .mapToObj(i -> new TopicPartition("__consumer_offsets", i)) + .collect(Collectors.toSet()) + ); + + List<CompletableFuture<Void>> futures = IntStream + .range(0, partitionCount) + .mapToObj(__ -> new CompletableFuture<Void>()) + .collect(Collectors.toList()); + + IntStream.range(0, partitionCount).forEach(i -> { + CompletableFuture<Void> future = futures.get(i); + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("on-partition-deleted"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", i)), + ArgumentMatchers.eq(Duration.ofMillis(5000)), + ArgumentMatchers.any() + )).thenAnswer(__ -> future); + }); + + IntStream.range(0, partitionCount - 1).forEach(i -> { + futures.get(i).complete(null); + }); + + futures.get(partitionCount - 1).completeExceptionally(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception()); + + // The exception is logged and swallowed. + assertDoesNotThrow(() -> Review Comment: Should we have any other validations that the partition is deleted or not? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -903,6 +906,45 @@ public boolean cleanupExpiredOffsets(String groupId, List<Record> records) { return allOffsetsExpired.get() && !openTransactionsByGroup.containsKey(groupId); } + /** + * Remove offsets of the partitions that have been deleted. + * + * @param topicPartitions The partitions that have been deleted. + * @return The list of tombstones (offset commit) to append. + */ + public List<Record> onPartitionsDeleted( + List<TopicPartition> topicPartitions + ) { + List<Record> records = new ArrayList<>(); + + Map<String, List<Integer>> partitionsByTopic = new HashMap<>(); + topicPartitions.forEach(tp -> partitionsByTopic + .computeIfAbsent(tp.topic(), __ -> new ArrayList<>()) + .add(tp.partition()) + ); + + Consumer<Offsets> delete = offsets -> { Review Comment: This confused me for a bit because the Consumer here is not a kafka consumer but actually the java consumer class. These lines are creating a method called delete that takes all the offsets and appends the tombstone they need. Offsets (the variable) is overloaded a few times so it is a little confusing. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -1001,8 +1002,26 @@ public void onTransactionCompleted( public void onPartitionsDeleted( List<TopicPartition> topicPartitions, BufferSupplier bufferSupplier - ) { + ) throws ExecutionException, InterruptedException { Review Comment: To clarify -- is onPartitionsDeleted only called when the topic behind the partition is deleted (not reassigned etc)? Or are there other cases? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ########## @@ -3050,6 +3060,53 @@ public void testOffsetDeletionsSensor() { verify(context.metrics).record(OFFSET_DELETIONS_SENSOR_NAME, 2); } + @Test + public void testOnPartitionsDeleted() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + // Commit offsets. + context.commitOffset("grp-0", "foo", 1, 100, 1, context.time.milliseconds()); + context.commitOffset("grp-0", "foo", 2, 200, 1, context.time.milliseconds()); + context.commitOffset("grp-0", "foo", 3, 300, 1, context.time.milliseconds()); + + context.commitOffset("grp-1", "bar", 1, 100, 1, context.time.milliseconds()); + context.commitOffset("grp-1", "bar", 2, 200, 1, context.time.milliseconds()); + context.commitOffset("grp-1", "bar", 3, 300, 1, context.time.milliseconds()); + + context.commitOffset(100L, "grp-2", "foo", 1, 100, 1, context.time.milliseconds()); + context.commitOffset(100L, "grp-2", "foo", 2, 200, 1, context.time.milliseconds()); + context.commitOffset(100L, "grp-2", "foo", 3, 300, 1, context.time.milliseconds()); + + // Delete partitions. + List<Record> records = context.deletePartitions(Arrays.asList( + new TopicPartition("foo", 1), Review Comment: any particular reason why this is tested with 3 partitions per topic? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ########## @@ -589,6 +589,23 @@ private void scheduleGroupMetadataExpiration() { ); } + /** + * Remove offsets of the partitions that have been deleted. + * + * @param topicPartitions The partitions that have been deleted. + * @return The list of tombstones (offset commit) to append. Review Comment: Why does the comment specify (offset commit) Is it to distinguish from other tombstones? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ########## @@ -2056,4 +2059,70 @@ public void testCompleteTransactionWithUnexpectedPartition() { assertFutureThrows(future, IllegalStateException.class); } + + @Test + public void testOnPartitionsDeleted() { + int partitionCount = 3; + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime, + new GroupCoordinatorMetrics() + ); + + service.startup(() -> partitionCount); + + when(runtime.partitions()).thenReturn( + IntStream + .range(0, partitionCount) + .mapToObj(i -> new TopicPartition("__consumer_offsets", i)) + .collect(Collectors.toSet()) + ); + + List<CompletableFuture<Void>> futures = IntStream + .range(0, partitionCount) + .mapToObj(__ -> new CompletableFuture<Void>()) + .collect(Collectors.toList()); + + IntStream.range(0, partitionCount).forEach(i -> { + CompletableFuture<Void> future = futures.get(i); + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("on-partition-deleted"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", i)), + ArgumentMatchers.eq(Duration.ofMillis(5000)), + ArgumentMatchers.any() + )).thenAnswer(__ -> future); + }); + + IntStream.range(0, partitionCount - 1).forEach(i -> { Review Comment: Is this trying to get all but the last partition to succeed and the last one to fail with an error? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org