dajac commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1340716345
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -262,6 +267,44 @@ public HeartbeatResponseData genericGroupHeartbeat(
);
}
+ /**
+ * Handles a DeleteGroups request.
+ *
+ * @param context The request context.
+ * @param groupIds The groupIds of the groups to be deleted
+ * @return A Result containing the
DeleteGroupsResponseData.DeletableGroupResultCollection response and
+ * a list of records to update the state machine.
+ */
+ public
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> deleteGroups(
+ RequestContext context,
+ List<String> groupIds
+ ) throws ApiException {
+ final DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
Review Comment:
nit: Should we set the expected size here?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##########
@@ -849,6 +853,42 @@ public void validateOffsetFetch(
}
}
+ /**
+ * Validates the OffsetDelete request.
+ */
+ @Override
+ public void validateOffsetDelete() throws ApiException {
+ if (isInState(DEAD)) {
Review Comment:
@jeffkbkim Do we ever transition to Dead? If not, I wonder if we should just
remove this and remove the Dead state. What do you think?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,6 +115,107 @@ public void testCommitOffset() {
assertEquals(result, coordinator.commitOffset(context, request));
}
+ @Test
+ public void testDeleteGroups() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ groupMetadataManager,
+ offsetMetadataManager
+ );
+
+ RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+ List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection = new
DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.add(new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-1"));
Review Comment:
ditto for those two.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1561,6 +1604,156 @@ public void
testConsumerGroupOffsetFetchWithStaleMemberEpoch() {
() -> context.fetchAllOffsets("group", "member", 10,
Long.MAX_VALUE));
}
+ static private void testOffsetDeleteWith(
+ OffsetMetadataManagerTestContext context,
+ String groupId,
+ String topic,
+ int partition,
+ Errors error
+ ) {
+ final OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+ requestTopicCollection.add(
Review Comment:
We could also apply my formatting suggestion here.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -90,4 +92,29 @@ void validateOffsetFetch(
int memberEpoch,
long lastCommittedOffset
) throws KafkaException;
+
+ /**
+ * Validates the OffsetDelete request.
+ */
+ void validateOffsetDelete() throws KafkaException;
+
+ /**
+ * Validates the DeleteGroups request.
+ */
+ void validateDeleteGroup() throws KafkaException;
+
+ /**
+ * Returns true if the group is actively subscribed to the topic.
+ *
+ * @param topic The topic name.
+ * @return Whether the group is subscribed to the topic.
+ */
+ boolean isSubscribedToTopic(String topic);
+
+ /**
+ * Creates tombstone(s) for deleting the group.
+ *
+ * @return The list of tombstone record(s).
+ */
+ List<Record> createGroupTombstoneRecords();
Review Comment:
I wonder if we should rather pass the list of records as an argument in
order to avoid having to copy the records afterwards. Have you considered this?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3071,6 +3071,35 @@ private void removeCurrentMemberFromGenericGroup(
group.remove(member.memberId());
}
+ /**
+ * Handles a DeleteGroups request.
+ * Populates the record list passed in with record to update the state
machine.
+ * Validations are done in {@link
GroupCoordinatorShard#deleteGroups(RequestContext, List)} by
+ * calling {@link GroupMetadataManager#validateDeleteGroup(String)}.
+ *
+ * @param groupId The ID of the group to be deleted.
+ * @param records The record list to populate.
+ */
+ public void deleteGroup(
+ String groupId,
+ List<Record> records
+ ) {
+ // groupId has been checked in
GroupMetadataManager#validateDeleteGroup.
+ // In this method, we only populate records with tombstone records, so
we don't expect an exception to be thrown here.
+
Review Comment:
nit: Let's remove this empty line.
##########
clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java:
##########
@@ -78,4 +79,18 @@ public static DeleteGroupsRequest parse(ByteBuffer buffer,
short version) {
public DeleteGroupsRequestData data() {
return data;
}
+
+ public static DeleteGroupsResponseData.DeletableGroupResultCollection
getErrorResultCollection(
Review Comment:
nit: Could we refactor `getErrorResponse` to use this new method as well?
Should we also add a unit test for this one?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3071,6 +3071,35 @@ private void removeCurrentMemberFromGenericGroup(
group.remove(member.memberId());
}
+ /**
+ * Handles a DeleteGroups request.
+ * Populates the record list passed in with record to update the state
machine.
+ * Validations are done in {@link
GroupCoordinatorShard#deleteGroups(RequestContext, List)} by
+ * calling {@link GroupMetadataManager#validateDeleteGroup(String)}.
+ *
+ * @param groupId The ID of the group to be deleted.
+ * @param records The record list to populate.
+ */
+ public void deleteGroup(
+ String groupId,
+ List<Record> records
+ ) {
+ // groupId has been checked in
GroupMetadataManager#validateDeleteGroup.
Review Comment:
nit: Should we just add this to the document of the groupId field?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##########
@@ -849,6 +853,42 @@ public void validateOffsetFetch(
}
}
+ /**
+ * Validates the OffsetDelete request.
+ */
+ @Override
+ public void validateOffsetDelete() throws ApiException {
+ if (isInState(DEAD)) {
+ throw new GroupIdNotFoundException(String.format("Group %s is in
dead state.", groupId));
+ } else if (!usesConsumerGroupProtocol()
+ && (isInState(STABLE) || isInState(PREPARING_REBALANCE) ||
isInState(COMPLETING_REBALANCE))) {
+ throw Errors.NON_EMPTY_GROUP.exception();
+ }
+ }
+
+ /**
+ * Validates the DeleteGroups request.
+ */
+ @Override
+ public void validateDeleteGroup() throws ApiException {
+ if (isInState(DEAD)) {
+ throw new GroupIdNotFoundException(String.format("Group %s is in
dead state.", groupId));
+ } else if (isInState(STABLE)
Review Comment:
nit: ditto for the switch.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##########
@@ -849,6 +853,42 @@ public void validateOffsetFetch(
}
}
+ /**
+ * Validates the OffsetDelete request.
+ */
+ @Override
+ public void validateOffsetDelete() throws ApiException {
+ if (isInState(DEAD)) {
+ throw new GroupIdNotFoundException(String.format("Group %s is in
dead state.", groupId));
+ } else if (!usesConsumerGroupProtocol()
+ && (isInState(STABLE) || isInState(PREPARING_REBALANCE) ||
isInState(COMPLETING_REBALANCE))) {
Review Comment:
nit: I wonder if using a switch would be better here. Have you considered it?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -936,4 +940,216 @@ public void
testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
assertEquals(expectedResponse, future.get());
}
+
+ @Test
+ public void testDeleteOffsets() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+ requestTopicCollection.add(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ );
+ OffsetDeleteRequestData request = new
OffsetDeleteRequestData().setGroupId("group")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
responsePartitionCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+ responsePartitionCollection.add(
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0)
+ );
+ OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
responseTopicCollection =
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+ responseTopicCollection.add(
+ new
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection)
+ );
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setTopics(responseTopicCollection);
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offsets"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(response));
+
+ CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(response, future.get());
+ }
+
+ @Test
+ public void testDeleteOffsetsInvalidGroupId() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+ requestTopicCollection.add(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ );
+ OffsetDeleteRequestData request = new
OffsetDeleteRequestData().setGroupId("")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offsets"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(response));
+
+ CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(response, future.get());
+ }
+
+ @Test
+ public void testDeleteOffsetsCoordinatorNotAvailableException() throws
Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+ requestTopicCollection.add(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ );
+ OffsetDeleteRequestData request = new
OffsetDeleteRequestData().setGroupId("group")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offsets"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(FutureUtils.failedFuture(
+ new CoordinatorLoadInProgressException(null)
+ ));
+
+ CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(response, future.get());
+ }
+
+ @Test
+ public void testDeleteGroups() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 3);
+ CountDownLatch latch = new CountDownLatch(1);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection1 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ DeleteGroupsResponseData.DeletableGroupResult result1 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-1");
+ resultCollection1.add(result1);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection2 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ DeleteGroupsResponseData.DeletableGroupResult result2 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-2");
+ resultCollection2.add(result2);
+
+ DeleteGroupsResponseData.DeletableGroupResult result3 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-3")
+ .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.addAll(Arrays.asList(
+ new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(null).setErrorCode(Errors.INVALID_GROUP_ID.code()),
+ result1.duplicate(),
+ result2.duplicate(),
+ result3.duplicate()
+ ));
+
+ when(runtime.partitions()).thenReturn(Sets.newSet(
+ new TopicPartition("__consumer_offsets", 0),
+ new TopicPartition("__consumer_offsets", 1),
+ new TopicPartition("__consumer_offsets", 2)
+ ));
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenAnswer(invocation -> CompletableFuture.supplyAsync(() -> {
+ try {
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ } catch (InterruptedException ignored) { }
+ return resultCollection2;
+ }));
Review Comment:
I am not sure to understand what you are trying to achieve here. Could you
elaborate?
If you want to delay the completion of the future, the best would be to
create a CompletableFuture, use thenReturn(future), and then complete the
future at L1149.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,6 +115,107 @@ public void testCommitOffset() {
assertEquals(result, coordinator.commitOffset(context, request));
}
+ @Test
+ public void testDeleteGroups() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ groupMetadataManager,
+ offsetMetadataManager
+ );
+
+ RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+ List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection = new
DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.add(new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-1"));
+ expectedResultCollection.add(new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-2"));
+ List<Record> expectedRecords = Arrays.asList(
Review Comment:
Could we also use `groupIds` to generate the list here?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -936,4 +940,216 @@ public void
testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
assertEquals(expectedResponse, future.get());
}
+
+ @Test
+ public void testDeleteOffsets() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+ requestTopicCollection.add(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ );
+ OffsetDeleteRequestData request = new
OffsetDeleteRequestData().setGroupId("group")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
responsePartitionCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+ responsePartitionCollection.add(
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0)
+ );
+ OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
responseTopicCollection =
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+ responseTopicCollection.add(
+ new
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection)
+ );
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setTopics(responseTopicCollection);
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offsets"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(response));
+
+ CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(response, future.get());
+ }
+
+ @Test
+ public void testDeleteOffsetsInvalidGroupId() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+ requestTopicCollection.add(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ );
+ OffsetDeleteRequestData request = new
OffsetDeleteRequestData().setGroupId("")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offsets"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(response));
+
+ CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(response, future.get());
+ }
+
+ @Test
+ public void testDeleteOffsetsCoordinatorNotAvailableException() throws
Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+ requestTopicCollection.add(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ );
+ OffsetDeleteRequestData request = new
OffsetDeleteRequestData().setGroupId("group")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offsets"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(FutureUtils.failedFuture(
+ new CoordinatorLoadInProgressException(null)
Review Comment:
nit: The `null` here is not ideal. Could we put a string instead? Or you
could also use COORDINATOR_LOAD_IN_PROGRESS.exception().
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,6 +115,107 @@ public void testCommitOffset() {
assertEquals(result, coordinator.commitOffset(context, request));
}
+ @Test
+ public void testDeleteGroups() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ groupMetadataManager,
+ offsetMetadataManager
+ );
+
+ RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+ List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection = new
DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.add(new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-1"));
+ expectedResultCollection.add(new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-2"));
+ List<Record> expectedRecords = Arrays.asList(
+ RecordHelpers.newOffsetCommitTombstoneRecord("group-id-1",
"topic-name", 0),
+ RecordHelpers.newGroupMetadataTombstoneRecord("group-id-1"),
+ RecordHelpers.newOffsetCommitTombstoneRecord("group-id-2",
"topic-name", 0),
+ RecordHelpers.newGroupMetadataTombstoneRecord("group-id-2")
+ );
+
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> expectedResult = new CoordinatorResult<>(
+ expectedRecords,
+ expectedResultCollection
+ );
+
+
doNothing().when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.anyString());
+ doAnswer(invocation -> {
Review Comment:
Is there a reason why you don't use when().thenAnswer(...)?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -936,4 +940,216 @@ public void
testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
assertEquals(expectedResponse, future.get());
}
+
+ @Test
+ public void testDeleteOffsets() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+ requestTopicCollection.add(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ );
+ OffsetDeleteRequestData request = new
OffsetDeleteRequestData().setGroupId("group")
Review Comment:
nit: Could we put `setGroupId` on a new line as well?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,6 +115,107 @@ public void testCommitOffset() {
assertEquals(result, coordinator.commitOffset(context, request));
}
+ @Test
+ public void testDeleteGroups() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ groupMetadataManager,
+ offsetMetadataManager
+ );
+
+ RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+ List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection = new
DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.add(new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-1"));
+ expectedResultCollection.add(new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-2"));
+ List<Record> expectedRecords = Arrays.asList(
+ RecordHelpers.newOffsetCommitTombstoneRecord("group-id-1",
"topic-name", 0),
+ RecordHelpers.newGroupMetadataTombstoneRecord("group-id-1"),
+ RecordHelpers.newOffsetCommitTombstoneRecord("group-id-2",
"topic-name", 0),
+ RecordHelpers.newGroupMetadataTombstoneRecord("group-id-2")
+ );
+
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> expectedResult = new CoordinatorResult<>(
+ expectedRecords,
+ expectedResultCollection
+ );
+
+
doNothing().when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.anyString());
+ doAnswer(invocation -> {
+ String groupId = invocation.getArgument(0);
+ List<Record> records = invocation.getArgument(1);
+ records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId,
"topic-name", 0));
+ return null;
+ }).when(offsetMetadataManager).deleteAllOffsets(anyString(),
anyList());
+ doAnswer(invocation -> {
+ String groupId = invocation.getArgument(0);
+ List<Record> records = invocation.getArgument(1);
+
records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId));
+ return null;
+ }).when(groupMetadataManager).deleteGroup(anyString(), anyList());
+
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> coordinatorResult =
+ coordinator.deleteGroups(context, groupIds);
+
+ for (String groupId : groupIds) {
+ verify(groupMetadataManager,
times(1)).validateDeleteGroup(ArgumentMatchers.eq(groupId));
+ verify(groupMetadataManager,
times(1)).deleteGroup(ArgumentMatchers.eq(groupId), anyList());
+ verify(offsetMetadataManager,
times(1)).deleteAllOffsets(ArgumentMatchers.eq(groupId), anyList());
+ }
+ assertEquals(expectedResult, coordinatorResult);
+ }
+
+ @Test
+ public void testDeleteGroupsInvalidGroupId() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ groupMetadataManager,
+ offsetMetadataManager
+ );
+
+ RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+ List<String> groupIds = Arrays.asList("group-id-1", "group-id-2",
"group-id-3");
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection = new
DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.add(new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-1"));
+ expectedResultCollection.add(new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-2")
+ .setErrorCode(Errors.INVALID_GROUP_ID.code())
+ );
+ expectedResultCollection.add(new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-3"));
Review Comment:
small nit: It may be a bit easier to read if we create the expected response
as follow? What do you think? If you find it better, we could also update the
other test cases.
```
new DeleteGroupsResponseData.DeletableGroupResultCollection(Arrays.asList(
new DeleteGroupsResponseData.DeletableGroupResult()
.setGroupId("group-id-1"),
....
).iterator());
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,6 +115,107 @@ public void testCommitOffset() {
assertEquals(result, coordinator.commitOffset(context, request));
}
+ @Test
+ public void testDeleteGroups() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ groupMetadataManager,
+ offsetMetadataManager
+ );
+
+ RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+ List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection = new
DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.add(new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-1"));
+ expectedResultCollection.add(new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-2"));
+ List<Record> expectedRecords = Arrays.asList(
+ RecordHelpers.newOffsetCommitTombstoneRecord("group-id-1",
"topic-name", 0),
+ RecordHelpers.newGroupMetadataTombstoneRecord("group-id-1"),
+ RecordHelpers.newOffsetCommitTombstoneRecord("group-id-2",
"topic-name", 0),
+ RecordHelpers.newGroupMetadataTombstoneRecord("group-id-2")
+ );
+
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> expectedResult = new CoordinatorResult<>(
+ expectedRecords,
+ expectedResultCollection
+ );
+
+
doNothing().when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.anyString());
+ doAnswer(invocation -> {
+ String groupId = invocation.getArgument(0);
+ List<Record> records = invocation.getArgument(1);
+ records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId,
"topic-name", 0));
+ return null;
+ }).when(offsetMetadataManager).deleteAllOffsets(anyString(),
anyList());
+ doAnswer(invocation -> {
+ String groupId = invocation.getArgument(0);
+ List<Record> records = invocation.getArgument(1);
+
records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId));
+ return null;
+ }).when(groupMetadataManager).deleteGroup(anyString(), anyList());
+
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> coordinatorResult =
Review Comment:
nit: Could we put an empty line before this one? I find the code a bit hard
to read because all the lines are all together.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -177,6 +183,29 @@ public CoordinatorResult<OffsetCommitResponseData, Record>
commitOffset(
return result;
}
+ public CoordinatorResult<OffsetDeleteResponseData, Record>
deleteOffsets(
+ OffsetDeleteRequestData request
+ ) {
+ snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset); // TODO:
lastCommitedOffset or lastWrittenOffset?
Review Comment:
In the GroupMetadataManagerTestContext, we actually moved this to the replay
method. See
[here](https://github.com/apache/kafka/blob/ad7956170bcaf093ea8b2f725126d42cf7fb522b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java#L1144).
It may be better to do the same here. What do you think?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,6 +115,107 @@ public void testCommitOffset() {
assertEquals(result, coordinator.commitOffset(context, request));
}
+ @Test
+ public void testDeleteGroups() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ groupMetadataManager,
+ offsetMetadataManager
+ );
+
+ RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+ List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection = new
DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.add(new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-1"));
+ expectedResultCollection.add(new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-2"));
+ List<Record> expectedRecords = Arrays.asList(
+ RecordHelpers.newOffsetCommitTombstoneRecord("group-id-1",
"topic-name", 0),
+ RecordHelpers.newGroupMetadataTombstoneRecord("group-id-1"),
+ RecordHelpers.newOffsetCommitTombstoneRecord("group-id-2",
"topic-name", 0),
+ RecordHelpers.newGroupMetadataTombstoneRecord("group-id-2")
+ );
+
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> expectedResult = new CoordinatorResult<>(
+ expectedRecords,
+ expectedResultCollection
+ );
+
+
doNothing().when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.anyString());
+ doAnswer(invocation -> {
+ String groupId = invocation.getArgument(0);
+ List<Record> records = invocation.getArgument(1);
+ records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId,
"topic-name", 0));
+ return null;
+ }).when(offsetMetadataManager).deleteAllOffsets(anyString(),
anyList());
+ doAnswer(invocation -> {
+ String groupId = invocation.getArgument(0);
+ List<Record> records = invocation.getArgument(1);
+
records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId));
+ return null;
+ }).when(groupMetadataManager).deleteGroup(anyString(), anyList());
+
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> coordinatorResult =
+ coordinator.deleteGroups(context, groupIds);
+
+ for (String groupId : groupIds) {
+ verify(groupMetadataManager,
times(1)).validateDeleteGroup(ArgumentMatchers.eq(groupId));
+ verify(groupMetadataManager,
times(1)).deleteGroup(ArgumentMatchers.eq(groupId), anyList());
+ verify(offsetMetadataManager,
times(1)).deleteAllOffsets(ArgumentMatchers.eq(groupId), anyList());
+ }
+ assertEquals(expectedResult, coordinatorResult);
+ }
+
+ @Test
+ public void testDeleteGroupsInvalidGroupId() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ groupMetadataManager,
+ offsetMetadataManager
+ );
+
+ RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+ List<String> groupIds = Arrays.asList("group-id-1", "group-id-2",
"group-id-3");
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection = new
DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.add(new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-1"));
+ expectedResultCollection.add(new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-2")
+ .setErrorCode(Errors.INVALID_GROUP_ID.code())
+ );
+ expectedResultCollection.add(new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-3"));
+ List<Record> expectedRecords = Arrays.asList(
+ RecordHelpers.newOffsetCommitTombstoneRecord("group-id-1",
"topic-name", 0),
+ RecordHelpers.newGroupMetadataTombstoneRecord("group-id-1"),
+ RecordHelpers.newOffsetCommitTombstoneRecord("group-id-3",
"topic-name", 0),
+ RecordHelpers.newGroupMetadataTombstoneRecord("group-id-3")
+ );
+
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> expectedResult = new CoordinatorResult<>(
+ expectedRecords,
+ expectedResultCollection
+ );
+
+ doThrow(Errors.INVALID_GROUP_ID.exception())
+
.when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.eq("group-id-2"));
+ doAnswer(invocation -> {
+ String groupId = invocation.getArgument(0);
+ List<Record> records = invocation.getArgument(1);
+ records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId,
"topic-name", 0));
+ return null;
+ }).when(offsetMetadataManager).deleteAllOffsets(anyString(),
anyList());
+ doAnswer(invocation -> {
+ String groupId = invocation.getArgument(0);
+ List<Record> records = invocation.getArgument(1);
+
records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId));
+ return null;
+ }).when(groupMetadataManager).deleteGroup(anyString(), anyList());
+
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> coordinatorResult =
+ coordinator.deleteGroups(context, groupIds);
+
+ verify(groupMetadataManager,
times(3)).validateDeleteGroup(anyString());
+ verify(groupMetadataManager, times(2)).deleteGroup(anyString(),
anyList());
+ verify(offsetMetadataManager, times(2)).deleteAllOffsets(anyString(),
anyList());
Review Comment:
Should we at minimum verify the group ids here?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -274,6 +303,20 @@ public void commitOffset(
));
}
+ public void deleteOffset(
+ String groupId,
+ String topic,
+ int partition
+ ) {
+ snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset); // TODO:
lastCommitedOffset or lastWrittenOffset?
+
+ replay(RecordHelpers.newOffsetCommitTombstoneRecord(
+ groupId,
+ topic,
+ partition
Review Comment:
nit: Indentation should be 4 spaces.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1561,6 +1604,156 @@ public void
testConsumerGroupOffsetFetchWithStaleMemberEpoch() {
() -> context.fetchAllOffsets("group", "member", 10,
Long.MAX_VALUE));
}
+ static private void testOffsetDeleteWith(
Review Comment:
Should we move this method to the test context?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1561,6 +1604,156 @@ public void
testConsumerGroupOffsetFetchWithStaleMemberEpoch() {
() -> context.fetchAllOffsets("group", "member", 10,
Long.MAX_VALUE));
}
+ static private void testOffsetDeleteWith(
+ OffsetMetadataManagerTestContext context,
+ String groupId,
+ String topic,
+ int partition,
+ Errors error
+ ) {
+ final OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+ requestTopicCollection.add(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName(topic)
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(partition)
+ ))
+ );
+
+ final OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
expectedResponsePartitionCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+ expectedResponsePartitionCollection.add(
+ new OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+ .setPartitionIndex(partition)
+ .setErrorCode(error.code())
+ );
+ final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
expectedResponseTopicCollection =
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+ expectedResponseTopicCollection.add(
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopic()
+ .setName(topic)
+ .setPartitions(expectedResponsePartitionCollection)
+ );
+
+ final List<Record> expectedRecords = error == Errors.NONE &&
+ context.offsetMetadataManager.offset(groupId, topic, partition) !=
null ?
+ Collections.singletonList(
+ RecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic,
partition)
+ ) :
+ Collections.emptyList();
Review Comment:
This block is really hard to parse.
```
final List<Record> expectedRecords =
error == Errors.NONE && context.offsetMetadataManager.offset(groupId,
topic, partition) != null ?
Collections.singletonList(RecordHelpers.newOffsetCommitTombstoneRecord(groupId,
topic, partition)) :
Collections.emptyList();
```
Would it be better like this? Otherwise, I would use a regular if statement.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1561,6 +1604,156 @@ public void
testConsumerGroupOffsetFetchWithStaleMemberEpoch() {
() -> context.fetchAllOffsets("group", "member", 10,
Long.MAX_VALUE));
}
+ static private void testOffsetDeleteWith(
+ OffsetMetadataManagerTestContext context,
+ String groupId,
+ String topic,
+ int partition,
+ Errors error
+ ) {
+ final OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+ requestTopicCollection.add(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName(topic)
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(partition)
+ ))
+ );
+
+ final OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
expectedResponsePartitionCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+ expectedResponsePartitionCollection.add(
+ new OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+ .setPartitionIndex(partition)
+ .setErrorCode(error.code())
+ );
+ final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
expectedResponseTopicCollection =
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+ expectedResponseTopicCollection.add(
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopic()
+ .setName(topic)
+ .setPartitions(expectedResponsePartitionCollection)
+ );
+
+ final List<Record> expectedRecords = error == Errors.NONE &&
+ context.offsetMetadataManager.offset(groupId, topic, partition) !=
null ?
+ Collections.singletonList(
+ RecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic,
partition)
+ ) :
+ Collections.emptyList();
+
+ CoordinatorResult<OffsetDeleteResponseData, Record> coordinatorResult
= context.deleteOffsets(
+ new OffsetDeleteRequestData()
+ .setGroupId(groupId)
+ .setTopics(requestTopicCollection)
+ );
+
+ assertEquals(new
OffsetDeleteResponseData().setTopics(expectedResponseTopicCollection),
coordinatorResult.response());
+ assertEquals(expectedRecords, coordinatorResult.records());
+ }
+
+ @Test
+ public void testGenericGroupOffsetDelete() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ GenericGroup group =
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+ "foo",
+ true
+ );
+ context.commitOffset("foo", "bar", 0, 100L, 0);
+ group.setSubscribedTopics(Optional.of(Collections.emptySet()));
+ testOffsetDeleteWith(context, "foo", "bar", 0, Errors.NONE);
+ }
+
+ @Test
+ public void testGenericGroupOffsetDeleteWithInvalidOffsets() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ GenericGroup group =
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+ "foo",
+ true
+ );
+ group.setSubscribedTopics(Optional.of(Collections.singleton("bar")));
+ context.commitOffset("foo", "bar", 0, 100L, 0);
+
+ // Delete the offset whose topic partition doesn't exist.
+ testOffsetDeleteWith(context, "foo", "bar1", 0, Errors.NONE);
+ // Delete the offset from the topic that the group is subscribed to.
+ testOffsetDeleteWith(context, "foo", "bar", 0,
Errors.GROUP_SUBSCRIBED_TO_TOPIC);
+ }
+
+ @Test
+ public void testConsumerGroupOffsetDelete() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+ "foo",
+ true
+ );
+ context.commitOffset("foo", "bar", 0, 100L, 0);
+ assertFalse(group.isSubscribedToTopic("bar"));
+ testOffsetDeleteWith(context, "foo", "bar", 0, Errors.NONE);
+ }
+
+ @Test
+ public void testConsumerGroupOffsetDeleteWithInvalidOffsets() {
Review Comment:
ditto.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1561,6 +1604,156 @@ public void
testConsumerGroupOffsetFetchWithStaleMemberEpoch() {
() -> context.fetchAllOffsets("group", "member", 10,
Long.MAX_VALUE));
}
+ static private void testOffsetDeleteWith(
+ OffsetMetadataManagerTestContext context,
+ String groupId,
+ String topic,
+ int partition,
+ Errors error
+ ) {
+ final OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+ requestTopicCollection.add(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName(topic)
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(partition)
+ ))
+ );
+
+ final OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
expectedResponsePartitionCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+ expectedResponsePartitionCollection.add(
+ new OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+ .setPartitionIndex(partition)
+ .setErrorCode(error.code())
+ );
+ final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
expectedResponseTopicCollection =
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+ expectedResponseTopicCollection.add(
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopic()
+ .setName(topic)
+ .setPartitions(expectedResponsePartitionCollection)
+ );
+
+ final List<Record> expectedRecords = error == Errors.NONE &&
+ context.offsetMetadataManager.offset(groupId, topic, partition) !=
null ?
+ Collections.singletonList(
+ RecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic,
partition)
+ ) :
+ Collections.emptyList();
+
+ CoordinatorResult<OffsetDeleteResponseData, Record> coordinatorResult
= context.deleteOffsets(
+ new OffsetDeleteRequestData()
+ .setGroupId(groupId)
+ .setTopics(requestTopicCollection)
+ );
+
+ assertEquals(new
OffsetDeleteResponseData().setTopics(expectedResponseTopicCollection),
coordinatorResult.response());
+ assertEquals(expectedRecords, coordinatorResult.records());
+ }
+
+ @Test
+ public void testGenericGroupOffsetDelete() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ GenericGroup group =
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+ "foo",
+ true
+ );
+ context.commitOffset("foo", "bar", 0, 100L, 0);
+ group.setSubscribedTopics(Optional.of(Collections.emptySet()));
+ testOffsetDeleteWith(context, "foo", "bar", 0, Errors.NONE);
+ }
+
+ @Test
+ public void testGenericGroupOffsetDeleteWithInvalidOffsets() {
Review Comment:
What does `InvalidOffset` mean here?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1561,6 +1604,156 @@ public void
testConsumerGroupOffsetFetchWithStaleMemberEpoch() {
() -> context.fetchAllOffsets("group", "member", 10,
Long.MAX_VALUE));
}
+ static private void testOffsetDeleteWith(
+ OffsetMetadataManagerTestContext context,
+ String groupId,
+ String topic,
+ int partition,
+ Errors error
+ ) {
+ final OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+ requestTopicCollection.add(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName(topic)
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(partition)
+ ))
+ );
+
+ final OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
expectedResponsePartitionCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+ expectedResponsePartitionCollection.add(
+ new OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+ .setPartitionIndex(partition)
+ .setErrorCode(error.code())
+ );
+ final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
expectedResponseTopicCollection =
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+ expectedResponseTopicCollection.add(
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopic()
+ .setName(topic)
+ .setPartitions(expectedResponsePartitionCollection)
+ );
+
+ final List<Record> expectedRecords = error == Errors.NONE &&
+ context.offsetMetadataManager.offset(groupId, topic, partition) !=
null ?
+ Collections.singletonList(
+ RecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic,
partition)
+ ) :
+ Collections.emptyList();
+
+ CoordinatorResult<OffsetDeleteResponseData, Record> coordinatorResult
= context.deleteOffsets(
+ new OffsetDeleteRequestData()
+ .setGroupId(groupId)
+ .setTopics(requestTopicCollection)
+ );
+
+ assertEquals(new
OffsetDeleteResponseData().setTopics(expectedResponseTopicCollection),
coordinatorResult.response());
+ assertEquals(expectedRecords, coordinatorResult.records());
+ }
+
+ @Test
+ public void testGenericGroupOffsetDelete() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ GenericGroup group =
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+ "foo",
+ true
+ );
+ context.commitOffset("foo", "bar", 0, 100L, 0);
+ group.setSubscribedTopics(Optional.of(Collections.emptySet()));
+ testOffsetDeleteWith(context, "foo", "bar", 0, Errors.NONE);
+ }
+
+ @Test
+ public void testGenericGroupOffsetDeleteWithInvalidOffsets() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ GenericGroup group =
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+ "foo",
+ true
+ );
+ group.setSubscribedTopics(Optional.of(Collections.singleton("bar")));
+ context.commitOffset("foo", "bar", 0, 100L, 0);
+
+ // Delete the offset whose topic partition doesn't exist.
+ testOffsetDeleteWith(context, "foo", "bar1", 0, Errors.NONE);
+ // Delete the offset from the topic that the group is subscribed to.
+ testOffsetDeleteWith(context, "foo", "bar", 0,
Errors.GROUP_SUBSCRIBED_TO_TOPIC);
+ }
+
+ @Test
+ public void testConsumerGroupOffsetDelete() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+ "foo",
+ true
+ );
+ context.commitOffset("foo", "bar", 0, 100L, 0);
+ assertFalse(group.isSubscribedToTopic("bar"));
+ testOffsetDeleteWith(context, "foo", "bar", 0, Errors.NONE);
+ }
+
+ @Test
+ public void testConsumerGroupOffsetDeleteWithInvalidOffsets() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+ "foo",
+ true
+ );
+ MetadataImage image = new
GroupMetadataManagerTest.MetadataImageBuilder()
+ .addTopic(Uuid.randomUuid(), "foo", 1)
+ .addRacks()
+ .build();
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder("member1")
+ .setSubscribedTopicNames(Collections.singletonList("bar"))
+ .build();
+ group.computeSubscriptionMetadata(
+ null,
+ member1,
+ image.topics(),
+ image.cluster()
+ );
+ group.updateMember(member1);
+ context.commitOffset("foo", "bar", 0, 100L, 0);
+ assertTrue(group.isSubscribedToTopic("bar"));
+
+ // Delete the offset whose topic partition doesn't exist.
+ testOffsetDeleteWith(context, "foo", "bar1", 0, Errors.NONE);
+ // Delete the offset from the topic that the group is subscribed to.
+ testOffsetDeleteWith(context, "foo", "bar", 0,
Errors.GROUP_SUBSCRIBED_TO_TOPIC);
+ }
+
+ @ParameterizedTest
+ @ValueSource(classes = {GenericGroup.class, ConsumerGroup.class})
+ public void testDeleteGroupAllOffsets(Class groupClass) {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ Group group = null;
+ if (groupClass == GenericGroup.class) {
+ group = context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+ "foo",
+ true
+ );
+ } else {
+ group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+ "foo",
+ true
+ );
+ }
+ context.commitOffset("foo", "bar-0", 0, 100L, 0);
+ context.commitOffset("foo", "bar-0", 1, 100L, 0);
+ context.commitOffset("foo", "bar-1", 0, 100L, 0);
+
+ List<Record> expectedRecords = Arrays.asList(
+ RecordHelpers.newOffsetCommitTombstoneRecord("foo", "bar-0", 0),
+ RecordHelpers.newOffsetCommitTombstoneRecord("foo", "bar-0", 1),
+ RecordHelpers.newOffsetCommitTombstoneRecord("foo", "bar-1", 0)
+ );
+ List<Record> records = new ArrayList<>();
Review Comment:
nit: Could we put an empty line here?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1561,6 +1604,156 @@ public void
testConsumerGroupOffsetFetchWithStaleMemberEpoch() {
() -> context.fetchAllOffsets("group", "member", 10,
Long.MAX_VALUE));
}
+ static private void testOffsetDeleteWith(
+ OffsetMetadataManagerTestContext context,
+ String groupId,
+ String topic,
+ int partition,
+ Errors error
+ ) {
+ final OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+ requestTopicCollection.add(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName(topic)
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(partition)
+ ))
+ );
+
+ final OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
expectedResponsePartitionCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+ expectedResponsePartitionCollection.add(
+ new OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+ .setPartitionIndex(partition)
+ .setErrorCode(error.code())
+ );
+ final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
expectedResponseTopicCollection =
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+ expectedResponseTopicCollection.add(
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopic()
+ .setName(topic)
+ .setPartitions(expectedResponsePartitionCollection)
+ );
+
+ final List<Record> expectedRecords = error == Errors.NONE &&
+ context.offsetMetadataManager.offset(groupId, topic, partition) !=
null ?
+ Collections.singletonList(
+ RecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic,
partition)
+ ) :
+ Collections.emptyList();
+
+ CoordinatorResult<OffsetDeleteResponseData, Record> coordinatorResult
= context.deleteOffsets(
+ new OffsetDeleteRequestData()
+ .setGroupId(groupId)
+ .setTopics(requestTopicCollection)
+ );
+
+ assertEquals(new
OffsetDeleteResponseData().setTopics(expectedResponseTopicCollection),
coordinatorResult.response());
+ assertEquals(expectedRecords, coordinatorResult.records());
+ }
+
+ @Test
+ public void testGenericGroupOffsetDelete() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ GenericGroup group =
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+ "foo",
+ true
+ );
+ context.commitOffset("foo", "bar", 0, 100L, 0);
+ group.setSubscribedTopics(Optional.of(Collections.emptySet()));
+ testOffsetDeleteWith(context, "foo", "bar", 0, Errors.NONE);
+ }
+
+ @Test
+ public void testGenericGroupOffsetDeleteWithInvalidOffsets() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ GenericGroup group =
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+ "foo",
+ true
+ );
+ group.setSubscribedTopics(Optional.of(Collections.singleton("bar")));
+ context.commitOffset("foo", "bar", 0, 100L, 0);
+
+ // Delete the offset whose topic partition doesn't exist.
+ testOffsetDeleteWith(context, "foo", "bar1", 0, Errors.NONE);
+ // Delete the offset from the topic that the group is subscribed to.
+ testOffsetDeleteWith(context, "foo", "bar", 0,
Errors.GROUP_SUBSCRIBED_TO_TOPIC);
+ }
+
+ @Test
+ public void testConsumerGroupOffsetDelete() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+ "foo",
+ true
+ );
+ context.commitOffset("foo", "bar", 0, 100L, 0);
+ assertFalse(group.isSubscribedToTopic("bar"));
+ testOffsetDeleteWith(context, "foo", "bar", 0, Errors.NONE);
+ }
+
+ @Test
+ public void testConsumerGroupOffsetDeleteWithInvalidOffsets() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+ "foo",
+ true
+ );
+ MetadataImage image = new
GroupMetadataManagerTest.MetadataImageBuilder()
+ .addTopic(Uuid.randomUuid(), "foo", 1)
+ .addRacks()
+ .build();
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder("member1")
+ .setSubscribedTopicNames(Collections.singletonList("bar"))
+ .build();
+ group.computeSubscriptionMetadata(
+ null,
+ member1,
+ image.topics(),
+ image.cluster()
+ );
+ group.updateMember(member1);
+ context.commitOffset("foo", "bar", 0, 100L, 0);
+ assertTrue(group.isSubscribedToTopic("bar"));
+
+ // Delete the offset whose topic partition doesn't exist.
+ testOffsetDeleteWith(context, "foo", "bar1", 0, Errors.NONE);
+ // Delete the offset from the topic that the group is subscribed to.
+ testOffsetDeleteWith(context, "foo", "bar", 0,
Errors.GROUP_SUBSCRIBED_TO_TOPIC);
+ }
+
+ @ParameterizedTest
+ @ValueSource(classes = {GenericGroup.class, ConsumerGroup.class})
Review Comment:
Could we use `GroupType` instead? Then you could use a switch based on the
enum.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -262,6 +267,44 @@ public HeartbeatResponseData genericGroupHeartbeat(
);
}
+ /**
+ * Handles a DeleteGroups request.
+ *
+ * @param context The request context.
+ * @param groupIds The groupIds of the groups to be deleted
+ * @return A Result containing the
DeleteGroupsResponseData.DeletableGroupResultCollection response and
+ * a list of records to update the state machine.
+ */
+ public
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> deleteGroups(
+ RequestContext context,
+ List<String> groupIds
+ ) throws ApiException {
+ final DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ final List<Record> records = new ArrayList<>();
+
+ groupIds.forEach(groupId -> {
+ try {
+ groupMetadataManager.validateGroupDelete(groupId);
+ offsetMetadataManager.deleteAllOffsets(groupId, records);
+ groupMetadataManager.deleteGroup(groupId, records);
+
+ resultCollection.add(
+ new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId(groupId)
+ );
+ } catch (ApiException exception) {
+ resultCollection.add(
+ new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId(groupId)
+ .setErrorCode(Errors.forException(exception).code())
+ );
+ }
+ });
+
+ return new CoordinatorResult<>(records, resultCollection);
Review Comment:
Good question. In my opinion, this log line is useful for the expiration
case. I am not sure if it really is in this one.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]