jeffkbkim commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1343258721
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3071,6 +3071,33 @@ private void removeCurrentMemberFromGenericGroup(
group.remove(member.memberId());
}
+ /**
+ * Handles a DeleteGroups request.
+ * Populates the record list passed in with record to update the state
machine.
+ * Validations are done in {@link
GroupCoordinatorShard#deleteGroups(RequestContext, List)} by
+ * calling {@link GroupMetadataManager#validateDeleteGroup(String)}.
+ *
+ * @param groupId The ID of the group to be deleted. It has been checked
in {@link GroupMetadataManager#validateDeleteGroup}.
+ * @param records The record list to populate.
+ */
+ public void deleteGroup(
+ String groupId,
+ List<Record> records
+ ) {
+ // In this method, we only populate records with tombstone records, so
we don't expect an exception to be thrown here.
Review Comment:
"At this point, we have already validated the group id so we know that the
group exists and that no exception will be thrown."
how's this?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -341,6 +384,22 @@ public CoordinatorResult<LeaveGroupResponseData, Record>
genericGroupLeave(
return groupMetadataManager.genericGroupLeave(context, request);
}
+ /**
+ * Handles a OffsetDelete request.
Review Comment:
nit: an
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +349,94 @@ public CoordinatorResult<OffsetCommitResponseData, Record>
commitOffset(
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Handles an OffsetDelete request.
+ *
+ * @param request The OffsetDelete request.
+ *
+ * @return A Result containing the OffsetDeleteResponseData response and
+ * a list of records to update the state machine.
+ */
+ public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+ OffsetDeleteRequestData request
+ ) throws ApiException {
+ final Group group = validateOffsetDelete(request);
+ final List<Record> records = new ArrayList<>();
+ final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
responseTopicCollection =
Review Comment:
what's the benefit of using final variables here?
##########
clients/src/test/java/org/apache/kafka/common/requests/DeleteGroupsRequestTest.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.DeleteGroupsRequestData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static
org.apache.kafka.common.requests.DeleteGroupsRequest.getErrorResultCollection;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class DeleteGroupsRequestTest {
+
+ protected static String groupId1 = "group-id-1";
+ protected static String groupId2 = "group-id-2";
Review Comment:
we can move these into the test as well
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3071,6 +3071,33 @@ private void removeCurrentMemberFromGenericGroup(
group.remove(member.memberId());
}
+ /**
+ * Handles a DeleteGroups request.
+ * Populates the record list passed in with record to update the state
machine.
+ * Validations are done in {@link
GroupCoordinatorShard#deleteGroups(RequestContext, List)} by
+ * calling {@link GroupMetadataManager#validateDeleteGroup(String)}.
+ *
+ * @param groupId The ID of the group to be deleted. It has been checked
in {@link GroupMetadataManager#validateDeleteGroup}.
Review Comment:
nit: can we change all usages of "ID" to "id"?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -936,4 +938,253 @@ public void
testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
assertEquals(expectedResponse, future.get());
}
+
+ @Test
+ public void testDeleteOffsets() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ ).iterator());
+ OffsetDeleteRequestData request = new OffsetDeleteRequestData()
+ .setGroupId("group")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
responsePartitionCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(Collections.singletonList(
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0)
+ ).iterator());
+ OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
responseTopicCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(Collections.singletonList(
+ new
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection)
+ ).iterator());
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setTopics(responseTopicCollection);
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offsets"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(response));
+
+ CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(response, future.get());
+ }
+
+ @Test
+ public void testDeleteOffsetsInvalidGroupId() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ ).iterator());
+ OffsetDeleteRequestData request = new
OffsetDeleteRequestData().setGroupId("")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offsets"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(response));
+
+ CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(response, future.get());
+ }
+
+ private static Stream<Arguments> testDeleteOffsetsWithExceptionSource() {
+ return testConsumerGroupHeartbeatWithExceptionSource();
+ }
+
+ @ParameterizedTest
+ @MethodSource("testDeleteOffsetsWithExceptionSource")
+ public void testDeleteOffsetsWithException(
+ Throwable exception,
+ short expectedErrorCode
+ ) throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ ).iterator());
+ OffsetDeleteRequestData request = new OffsetDeleteRequestData()
+ .setGroupId("group")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setErrorCode(expectedErrorCode);
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offsets"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(FutureUtils.failedFuture(exception));
+
+ CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(response, future.get());
+ }
+
+ @Test
+ public void testDeleteGroups() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 3);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection1 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ DeleteGroupsResponseData.DeletableGroupResult result1 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-1");
+ resultCollection1.add(result1);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection2 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ DeleteGroupsResponseData.DeletableGroupResult result2 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-2");
+ resultCollection2.add(result2);
+
+ DeleteGroupsResponseData.DeletableGroupResult result3 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-3")
+ .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.addAll(Arrays.asList(
+ new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(null).setErrorCode(Errors.INVALID_GROUP_ID.code()),
+ result2.duplicate(),
+ result3.duplicate(),
+ result1.duplicate()
+ ));
+
+ when(runtime.partitions()).thenReturn(Sets.newSet(
+ new TopicPartition("__consumer_offsets", 0),
+ new TopicPartition("__consumer_offsets", 1),
+ new TopicPartition("__consumer_offsets", 2)
+ ));
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+ CompletableFuture<Object> resultCollectionFuture = new
CompletableFuture<>();
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(resultCollectionFuture);
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)),
+ ArgumentMatchers.any()
+
)).thenReturn(FutureUtils.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception()));
+
+ List<String> groupIds = Arrays.asList("group-id-1", "group-id-2",
"group-id-3", null);
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
+ service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS),
groupIds, BufferSupplier.NO_CACHING);
+
+ assertFalse(future.isDone());
+ resultCollectionFuture.complete(resultCollection2);
+
Review Comment:
can we assert true that the future is done?
##########
clients/src/test/java/org/apache/kafka/common/requests/DeleteGroupsRequestTest.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.DeleteGroupsRequestData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static
org.apache.kafka.common.requests.DeleteGroupsRequest.getErrorResultCollection;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class DeleteGroupsRequestTest {
+
+ protected static String groupId1 = "group-id-1";
+ protected static String groupId2 = "group-id-2";
+
+ private static DeleteGroupsRequestData data;
+
+ @BeforeEach
+ public void setUp() {
+ data = new DeleteGroupsRequestData()
+ .setGroupsNames(Arrays.asList(groupId1, groupId2));
+ }
Review Comment:
was this addressed?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -936,4 +938,253 @@ public void
testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
assertEquals(expectedResponse, future.get());
}
+
+ @Test
+ public void testDeleteOffsets() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ ).iterator());
+ OffsetDeleteRequestData request = new OffsetDeleteRequestData()
+ .setGroupId("group")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
responsePartitionCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(Collections.singletonList(
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0)
+ ).iterator());
+ OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
responseTopicCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(Collections.singletonList(
+ new
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection)
+ ).iterator());
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setTopics(responseTopicCollection);
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offsets"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(response));
+
+ CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(response, future.get());
+ }
+
+ @Test
+ public void testDeleteOffsetsInvalidGroupId() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ ).iterator());
+ OffsetDeleteRequestData request = new
OffsetDeleteRequestData().setGroupId("")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offsets"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(response));
+
+ CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(response, future.get());
+ }
+
+ private static Stream<Arguments> testDeleteOffsetsWithExceptionSource() {
+ return testConsumerGroupHeartbeatWithExceptionSource();
+ }
+
+ @ParameterizedTest
+ @MethodSource("testDeleteOffsetsWithExceptionSource")
+ public void testDeleteOffsetsWithException(
+ Throwable exception,
+ short expectedErrorCode
+ ) throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ ).iterator());
+ OffsetDeleteRequestData request = new OffsetDeleteRequestData()
+ .setGroupId("group")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setErrorCode(expectedErrorCode);
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offsets"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(FutureUtils.failedFuture(exception));
+
+ CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(response, future.get());
+ }
+
+ @Test
+ public void testDeleteGroups() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 3);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection1 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ DeleteGroupsResponseData.DeletableGroupResult result1 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-1");
+ resultCollection1.add(result1);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection2 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ DeleteGroupsResponseData.DeletableGroupResult result2 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-2");
+ resultCollection2.add(result2);
+
+ DeleteGroupsResponseData.DeletableGroupResult result3 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-3")
+ .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.addAll(Arrays.asList(
+ new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(null).setErrorCode(Errors.INVALID_GROUP_ID.code()),
+ result2.duplicate(),
+ result3.duplicate(),
+ result1.duplicate()
+ ));
+
+ when(runtime.partitions()).thenReturn(Sets.newSet(
+ new TopicPartition("__consumer_offsets", 0),
+ new TopicPartition("__consumer_offsets", 1),
+ new TopicPartition("__consumer_offsets", 2)
+ ));
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+ CompletableFuture<Object> resultCollectionFuture = new
CompletableFuture<>();
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(resultCollectionFuture);
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)),
+ ArgumentMatchers.any()
+
)).thenReturn(FutureUtils.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception()));
+
+ List<String> groupIds = Arrays.asList("group-id-1", "group-id-2",
"group-id-3", null);
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
+ service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS),
groupIds, BufferSupplier.NO_CACHING);
+
+ assertFalse(future.isDone());
+ resultCollectionFuture.complete(resultCollection2);
+
+ assertEquals(expectedResultCollection, future.get());
+ }
+
+ private static Stream<Arguments> testDeleteGroupsWithExceptionSource() {
+ return testConsumerGroupHeartbeatWithExceptionSource();
+ }
+
+ @ParameterizedTest
+ @MethodSource("testDeleteGroupsWithExceptionSource")
Review Comment:
can we use
```
@MethodSource("testConsumerGroupHeartbeatWithExceptionSource")
```
and remove the helper?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -523,9 +507,50 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ final
List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>>
futures =
+ new ArrayList<>(groupIds.size());
+
+ final Map<TopicPartition, List<String>> groupsByTopicPartition = new
HashMap<>();
+ groupIds.forEach(groupId -> {
+ // For backwards compatibility, we support DeleteGroups for the
empty group id.
+ if (groupId == null) {
+
futures.add(CompletableFuture.completedFuture(DeleteGroupsRequest.getErrorResultCollection(
+ Collections.singletonList(null),
+ Errors.INVALID_GROUP_ID
+ )));
+ } else {
+ final TopicPartition topicPartition =
topicPartitionFor(groupId);
+ groupsByTopicPartition
+ .computeIfAbsent(topicPartition, __ -> new ArrayList<>())
+ .add(groupId);
+ }
+ });
+
+ groupsByTopicPartition.forEach((topicPartition, groupList) -> {
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
+ runtime.scheduleWriteOperation(
+ "delete-groups",
+ topicPartition,
+ coordinator -> coordinator.deleteGroups(context, groupList)
+ ).exceptionally(exception ->
+ DeleteGroupsRequest.getErrorResultCollection(groupList,
getErrorsForException(exception))
+ );
+
+ futures.add(future);
+ });
+
+ final CompletableFuture<Void> allFutures =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+ return allFutures.thenApply(v -> {
Review Comment:
we can remove the "v"
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -90,4 +92,28 @@ void validateOffsetFetch(
int memberEpoch,
long lastCommittedOffset
) throws KafkaException;
+
+ /**
+ * Validates the OffsetDelete request.
+ */
+ void validateOffsetDelete() throws KafkaException;
+
+ /**
+ * Validates the DeleteGroups request.
+ */
+ void validateDeleteGroup() throws KafkaException;
+
+ /**
+ * Returns true if the group is actively subscribed to the topic.
+ *
+ * @param topic The topic name.
+ * @return Whether the group is subscribed to the topic.
+ */
+ boolean isSubscribedToTopic(String topic);
+ /**
Review Comment:
nit: newline
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -523,9 +526,84 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ final
List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>>
futures =
+ new ArrayList<>(groupIds.size());
+
+ final Map<TopicPartition, List<String>> groupsByTopicPartition = new
HashMap<>();
+ groupIds.forEach(groupId -> {
+ // For backwards compatibility, we support DeleteGroups for the
empty group id.
+ if (groupId == null) {
+
futures.add(CompletableFuture.completedFuture(DeleteGroupsRequest.getErrorResultCollection(
+ Collections.singletonList(null),
+ Errors.INVALID_GROUP_ID
+ )));
+ } else {
+ final TopicPartition topicPartition =
topicPartitionFor(groupId);
+ groupsByTopicPartition
+ .computeIfAbsent(topicPartition, __ -> new ArrayList<>())
+ .add(groupId);
+ }
+ });
+
+ groupsByTopicPartition.forEach((topicPartition, groupList) -> {
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
+ runtime.scheduleWriteOperation(
+ "delete-groups",
+ topicPartition,
+ coordinator -> coordinator.deleteGroups(context, groupList)
+ ).exceptionally(exception -> {
+ if (exception instanceof UnknownTopicOrPartitionException
||
+ exception instanceof NotEnoughReplicasException) {
+ return DeleteGroupsRequest.getErrorResultCollection(
+ groupList,
+ Errors.COORDINATOR_NOT_AVAILABLE
+ );
+ }
+
+ if (exception instanceof NotLeaderOrFollowerException ||
+ exception instanceof KafkaStorageException) {
+ return DeleteGroupsRequest.getErrorResultCollection(
+ groupList,
+ Errors.NOT_COORDINATOR
+ );
+ }
+
+ if (exception instanceof RecordTooLargeException ||
+ exception instanceof RecordBatchTooLargeException ||
+ exception instanceof InvalidFetchSizeException) {
+ return DeleteGroupsRequest.getErrorResultCollection(
+ groupList,
+ Errors.UNKNOWN_SERVER_ERROR
+ );
+ }
+
+ return DeleteGroupsRequest.getErrorResultCollection(
+ groupList,
+ Errors.forException(exception)
+ );
+ });
+
+ futures.add(future);
+ });
+
+ final CompletableFuture<Void> allFutures =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+ final
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
resFuture = allFutures.thenApply(v -> {
Review Comment:
also, we can remove the `v`
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +349,94 @@ public CoordinatorResult<OffsetCommitResponseData, Record>
commitOffset(
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Handles an OffsetDelete request.
+ *
+ * @param request The OffsetDelete request.
+ *
+ * @return A Result containing the OffsetDeleteResponseData response and
+ * a list of records to update the state machine.
+ */
+ public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+ OffsetDeleteRequestData request
+ ) throws ApiException {
+ final Group group = validateOffsetDelete(request);
+ final List<Record> records = new ArrayList<>();
+ final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
responseTopicCollection =
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+ final TimelineHashMap<String, TimelineHashMap<Integer,
OffsetAndMetadata>> offsetsByTopic =
+ offsetsByGroup.get(request.groupId());
+
+ request.topics().forEach(topic -> {
+ final
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
responsePartitionCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+
+ if (group.isSubscribedToTopic(topic.name())) {
+ topic.partitions().forEach(partition ->
+ responsePartitionCollection.add(new
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+ .setPartitionIndex(partition.partitionIndex())
+ .setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code())
+ )
+ );
+ } else {
+ final TimelineHashMap<Integer, OffsetAndMetadata>
offsetsByPartition = offsetsByTopic == null ?
+ null : offsetsByTopic.get(topic.name());
+ if (offsetsByPartition != null) {
+ topic.partitions().forEach(partition -> {
+ if
(offsetsByPartition.containsKey(partition.partitionIndex())) {
+ responsePartitionCollection.add(new
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+ .setPartitionIndex(partition.partitionIndex())
+ );
+
records.add(RecordHelpers.newOffsetCommitTombstoneRecord(
+ request.groupId(),
+ topic.name(),
+ partition.partitionIndex()
+ ));
+ }
+ });
+ }
+ }
+
+ responseTopicCollection.add(new
OffsetDeleteResponseData.OffsetDeleteResponseTopic()
+ .setName(topic.name())
+ .setPartitions(responsePartitionCollection)
+ );
+ });
+
+ return new CoordinatorResult<>(
+ records,
+ new OffsetDeleteResponseData().setTopics(responseTopicCollection)
+ );
+ }
+
+ /**
+ * Deletes offsets as part of a DeleteGroups request.
+ * Populates the record list passed in with records to update the state
machine.
+ * Validations are done in {@link
GroupCoordinatorShard#deleteGroups(RequestContext, List)}
+ *
+ * @param groupId The ID of the given group.
+ * @param records The record list to populate.
+ *
+ * @return The number of offsets to be deleted.
+ */
+ public int deleteAllOffsets(
+ String groupId,
+ List<Record> records
+ ) {
+ TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>>
offsetsByTopic = offsetsByGroup.get(groupId);
+ AtomicInteger numDeletedOffsets = new AtomicInteger();
+
+ if (offsetsByTopic != null) {
+ offsetsByTopic.forEach((topic, offsetsByPartition) ->
Review Comment:
we can use
```
offsetsByPartition.keySet().forEach(partition ->
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -592,6 +607,33 @@ public void validateOffsetFetch(
validateMemberEpoch(memberEpoch, member.memberEpoch());
}
+ /**
+ * Validates the OffsetDelete request.
+ */
+ @Override
+ public void validateOffsetDelete() {}
+
+ /**
+ * Validates the DeleteGroups request.
+ */
+ @Override
+ public void validateDeleteGroup() throws ApiException {
+ if (state() != ConsumerGroupState.EMPTY) {
+ throw Errors.NON_EMPTY_GROUP.exception();
+ }
+ }
+
+ /**
+ * Populates the list of records with tombstone(s) for deleting the group.
+ *
+ * @param records The list of records.
+ */
+ public void createGroupTombstoneRecords(List<Record> records) {
Review Comment:
@Override?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,11 +119,126 @@ public void testCommitOffset() {
assertEquals(result, coordinator.commitOffset(context, request));
}
+ @Test
+ public void testDeleteGroups() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
+ groupMetadataManager,
+ offsetMetadataManager
+ );
+
+ RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+ List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection = new
DeleteGroupsResponseData.DeletableGroupResultCollection();
+ List<Record> expectedRecords = new ArrayList<>();
+ for (String groupId : groupIds) {
+ expectedResultCollection.add(new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(groupId));
+ expectedRecords.addAll(Arrays.asList(
+ RecordHelpers.newOffsetCommitTombstoneRecord(groupId,
"topic-name", 0),
+ RecordHelpers.newGroupMetadataTombstoneRecord(groupId)
+ ));
+ }
+
+
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> expectedResult = new CoordinatorResult<>(
+ expectedRecords,
+ expectedResultCollection
+ );
+
+
doNothing().when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.anyString());
Review Comment:
can we change all of the `doSomething...when...` to `when().doSomething`?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -936,4 +938,253 @@ public void
testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
assertEquals(expectedResponse, future.get());
}
+
+ @Test
+ public void testDeleteOffsets() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ ).iterator());
+ OffsetDeleteRequestData request = new OffsetDeleteRequestData()
+ .setGroupId("group")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
responsePartitionCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(Collections.singletonList(
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0)
+ ).iterator());
+ OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
responseTopicCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(Collections.singletonList(
+ new
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection)
+ ).iterator());
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setTopics(responseTopicCollection);
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offsets"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(response));
+
+ CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(response, future.get());
+ }
+
+ @Test
+ public void testDeleteOffsetsInvalidGroupId() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ ).iterator());
+ OffsetDeleteRequestData request = new
OffsetDeleteRequestData().setGroupId("")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offsets"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(response));
+
+ CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(response, future.get());
+ }
+
+ private static Stream<Arguments> testDeleteOffsetsWithExceptionSource() {
+ return testConsumerGroupHeartbeatWithExceptionSource();
+ }
+
+ @ParameterizedTest
+ @MethodSource("testDeleteOffsetsWithExceptionSource")
Review Comment:
can we use
```
@MethodSource("testConsumerGroupHeartbeatWithExceptionSource")
```
and remove the helper method?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]