jeffkbkim commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1336348269
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -90,4 +92,29 @@ void validateOffsetFetch(
int memberEpoch,
long lastCommittedOffset
) throws KafkaException;
+
+ /**
+ * Validates the OffsetDelete request.
+ */
+ void validateOffsetDelete() throws KafkaException;
+
+ /**
+ * Validates the GroupDelete request.
+ */
+ void validateGroupDelete() throws KafkaException;
+
+ /**
+ * Returns true if the group is actively subscribed to the topic.
+ *
+ * @param topic The topic name.
+ * @return whether the group is subscribed to the topic.
+ */
+ boolean isSubscribedToTopic(String topic);
+
+ /**
+ * Creates tombstone(s) for deleting the group.
+ *
+ * @return The list of tombstone record(s).
+ */
+ List<Record> createMetadataTombstoneRecords();
Review Comment:
i wonder if createGroupTombstoneRecords() makes more sense
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,6 +114,70 @@ public void testCommitOffset() {
assertEquals(result, coordinator.commitOffset(context, request));
}
+ @Test
+ public void testDeleteGroup() {
Review Comment:
nit: testDeleteGroups
also, can we verify the number of method invocations and also test that we
append records correctly for multiple groups?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1561,6 +1567,156 @@ public void
testConsumerGroupOffsetFetchWithStaleMemberEpoch() {
() -> context.fetchAllOffsets("group", "member", 10,
Long.MAX_VALUE));
}
+ private void testOffsetDeleteWith(
Review Comment:
should this be a static method?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java:
##########
@@ -1026,6 +1028,30 @@ public void testValidateOffsetCommit() {
() -> group.validateOffsetCommit("member-id", "new-instance-id",
1));
}
+ @Test
+ public void testValidateOffsetDelete() {
+ group.transitionTo(PREPARING_REBALANCE);
+ assertThrows(GroupNotEmptyException.class, () ->
group.validateOffsetDelete());
+ group.transitionTo(COMPLETING_REBALANCE);
+ assertThrows(GroupNotEmptyException.class, () ->
group.validateOffsetDelete());
+ group.transitionTo(STABLE);
+ assertThrows(GroupNotEmptyException.class, () ->
group.validateOffsetDelete());
+ group.transitionTo(DEAD);
+ assertThrows(GroupIdNotFoundException.class, () ->
group.validateOffsetDelete());
Review Comment:
should we add EMPTY test case? also for testValidateGroupDelete
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -262,6 +267,44 @@ public HeartbeatResponseData genericGroupHeartbeat(
);
}
+ /**
+ * Handles a GroupDelete request.
Review Comment:
nit: "DeleteGroups" request.
This should reflect the actual ApiKeys#DELETE_GROUPS name
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,6 +114,70 @@ public void testCommitOffset() {
assertEquals(result, coordinator.commitOffset(context, request));
}
+ @Test
+ public void testDeleteGroup() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ groupMetadataManager,
+ offsetMetadataManager
+ );
+
+ RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+ List<String> groupIds = Collections.singletonList("group-id");
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection = new
DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.add(new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id"));
+ List<Record> expectedRecords = Arrays.asList(
+ RecordHelpers.newOffsetCommitTombstoneRecord("group-id",
"topic-name", 0),
+ RecordHelpers.newGroupMetadataTombstoneRecord("group-id")
+ );
+
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> expectedResult = new CoordinatorResult<>(
+ expectedRecords,
+ expectedResultCollection
+ );
+
+
doNothing().when(groupMetadataManager).validateGroupDelete(ArgumentMatchers.eq("group-id"));
+ doAnswer(invocation -> {
+ List<Record> records = invocation.getArgument(1);
+
records.add(RecordHelpers.newOffsetCommitTombstoneRecord("group-id",
"topic-name", 0));
+ return null;
+
}).when(offsetMetadataManager).deleteAllOffsets(ArgumentMatchers.eq("group-id"),
anyList());
+ doAnswer(invocation -> {
+ List<Record> records = invocation.getArgument(1);
+
records.add(RecordHelpers.newGroupMetadataTombstoneRecord("group-id"));
+ return null;
+
}).when(groupMetadataManager).deleteGroup(ArgumentMatchers.eq("group-id"),
anyList());
+
+ assertEquals(expectedResult, coordinator.deleteGroups(context,
groupIds));
+ }
+
+ @Test
+ public void testDeleteInvalidGroup() {
Review Comment:
nit: testDeleteGroupsInvalidGroupId
can we also add a valid group id and verify the first stores invalid group
id error and the second stores NONE?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,81 @@ public CoordinatorResult<OffsetCommitResponseData, Record>
commitOffset(
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Handles an OffsetDelete request.
+ *
+ * @param request The OffsetDelete request.
+ *
+ * @return A Result containing the OffsetDeleteResponseData response and
+ * a list of records to update the state machine.
+ */
+ public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+ OffsetDeleteRequestData request
+ ) throws ApiException {
+ final Group group = validateOffsetDelete(request);
+ final List<Record> records = new ArrayList<>();
+ final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
responseTopicCollection =
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+ final OffsetDeleteResponseData response = new
OffsetDeleteResponseData();
+ final TimelineHashMap<String, TimelineHashMap<Integer,
OffsetAndMetadata>> offsetsByTopic =
+ offsetsByGroup.get(request.groupId());
+
+ request.topics().forEach(topic -> {
+ final
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
responsePartitionCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+ final boolean subscribedToTopic =
group.isSubscribedToTopic(topic.name());
Review Comment:
we can inline this to L380
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -673,4 +675,32 @@ public void testValidateOffsetFetch() {
// This should succeed.
group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE);
}
+
+ @Test
+ public void testValidateGroupDelete() {
+ Uuid fooTopicId = Uuid.randomUuid();
Review Comment:
this can be removed
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -673,4 +675,32 @@ public void testValidateOffsetFetch() {
// This should succeed.
group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE);
}
+
+ @Test
+ public void testValidateGroupDelete() {
+ Uuid fooTopicId = Uuid.randomUuid();
+ ConsumerGroup consumerGroup = createConsumerGroup("foo");
+ assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY,
consumerGroup.state());
+ assertDoesNotThrow(() -> consumerGroup.validateGroupDelete());
Review Comment:
we can do `consumerGroup::validateGroupDelete` for this along with the other
invocations in the test
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -936,4 +939,206 @@ public void
testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
assertEquals(expectedResponse, future.get());
}
+
+ @Test
+ public void testDeleteOffsets() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+ requestTopicCollection.add(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ );
+ OffsetDeleteRequestData request = new
OffsetDeleteRequestData().setGroupId("group")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
responsePartitionCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+ responsePartitionCollection.add(
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0)
+ );
+ OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
responseTopicCollection =
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+ responseTopicCollection.add(
+ new
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection)
+ );
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setTopics(responseTopicCollection);
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offset"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(response));
+
+ CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(response, future.get());
+ }
+
+ @Test
+ public void testDeleteOffsetsInvalidGroupId() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+ requestTopicCollection.add(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ );
+ OffsetDeleteRequestData request = new
OffsetDeleteRequestData().setGroupId("")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offset"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(response));
+
+ CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(response, future.get());
+ }
+
+ @Test
+ public void testDeleteOffsetsCoordinatorNotAvailableException() throws
Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+ requestTopicCollection.add(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ );
+ OffsetDeleteRequestData request = new
OffsetDeleteRequestData().setGroupId("group")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offset"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(FutureUtils.failedFuture(
+ new CoordinatorLoadInProgressException(null)
+ ));
+
+ CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(response, future.get());
+ }
+
+ @Test
+ public void testDeleteGroups() throws Exception {
Review Comment:
can we add a test with three __consumer_offsets topic partitions where one
finishes immediately, another takes a while, and the last coordinator throws an
exception?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]