dajac commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1344185641
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -827,4 +835,28 @@ public void shutdown() {
private static boolean isGroupIdNotEmpty(String groupId) {
return groupId != null && !groupId.isEmpty();
}
+
+ /**
+ * Handles the exception in the scheduleWriteOperation.
+ * @return The Errors instance associated with the given exception.
+ */
+ private Errors getErrorsForException(Throwable exception) {
Review Comment:
nit: If we keep it, the method could be static and we usually don't prefix
methods with `get`. `normalizeException` maybe an alternative name.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -262,6 +278,51 @@ public HeartbeatResponseData genericGroupHeartbeat(
);
}
+ /**
+ * Handles a DeleteGroups request.
+ *
+ * @param context The request context.
+ * @param groupIds The groupIds of the groups to be deleted
+ * @return A Result containing the
DeleteGroupsResponseData.DeletableGroupResultCollection response and
+ * a list of records to update the state machine.
+ */
+ public
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> deleteGroups(
+ RequestContext context,
+ List<String> groupIds
+ ) throws ApiException {
+ final DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection =
+ new
DeleteGroupsResponseData.DeletableGroupResultCollection(groupIds.size());
+ final List<Record> records = new ArrayList<>();
+ final AtomicInteger numDeletedOffsets = new AtomicInteger();
+ final List<String> deletedGroups = new ArrayList<>();
+
+ groupIds.forEach(groupId -> {
+ try {
+ groupMetadataManager.validateDeleteGroup(groupId);
+
numDeletedOffsets.addAndGet(offsetMetadataManager.deleteAllOffsets(groupId,
records));
+ groupMetadataManager.deleteGroup(groupId, records);
+ deletedGroups.add(groupId);
+
+ resultCollection.add(
+ new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId(groupId)
+ );
+ } catch (ApiException exception) {
+ resultCollection.add(
+ new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId(groupId)
+ .setErrorCode(Errors.forException(exception).code())
+ );
+ }
+ });
+
+ log.info("The following groups were deleted: {}. A total of {} offsets
were removed",
+ String.join(", ", deletedGroups),
+ numDeletedOffsets
+ );
Review Comment:
nit: `... removed.`.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,11 +119,126 @@ public void testCommitOffset() {
assertEquals(result, coordinator.commitOffset(context, request));
}
+ @Test
+ public void testDeleteGroups() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
+ groupMetadataManager,
+ offsetMetadataManager
+ );
+
+ RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+ List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection = new
DeleteGroupsResponseData.DeletableGroupResultCollection();
+ List<Record> expectedRecords = new ArrayList<>();
+ for (String groupId : groupIds) {
+ expectedResultCollection.add(new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(groupId));
+ expectedRecords.addAll(Arrays.asList(
+ RecordHelpers.newOffsetCommitTombstoneRecord(groupId,
"topic-name", 0),
+ RecordHelpers.newGroupMetadataTombstoneRecord(groupId)
+ ));
+ }
+
+
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> expectedResult = new CoordinatorResult<>(
+ expectedRecords,
+ expectedResultCollection
+ );
+
+
doNothing().when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.anyString());
+ when(offsetMetadataManager.deleteAllOffsets(anyString(),
anyList())).thenAnswer(invocation -> {
+ String groupId = invocation.getArgument(0);
+ List<Record> records = invocation.getArgument(1);
+ records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId,
"topic-name", 0));
+ return 1;
+ });
+ doAnswer(invocation -> {
+ String groupId = invocation.getArgument(0);
+ List<Record> records = invocation.getArgument(1);
+
records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId));
+ return null;
+ }).when(groupMetadataManager).deleteGroup(anyString(), anyList());
+
+
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> coordinatorResult =
+ coordinator.deleteGroups(context, groupIds);
+
+ for (String groupId : groupIds) {
+ verify(groupMetadataManager,
times(1)).validateDeleteGroup(ArgumentMatchers.eq(groupId));
+ verify(groupMetadataManager,
times(1)).deleteGroup(ArgumentMatchers.eq(groupId), anyList());
+ verify(offsetMetadataManager,
times(1)).deleteAllOffsets(ArgumentMatchers.eq(groupId), anyList());
+ }
+ assertEquals(expectedResult, coordinatorResult);
+ }
+
+ @Test
+ public void testDeleteGroupsInvalidGroupId() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
+ groupMetadataManager,
+ offsetMetadataManager
+ );
+
+ RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+ List<String> groupIds = Arrays.asList("group-id-1", "group-id-2",
"group-id-3");
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection =
+ new
DeleteGroupsResponseData.DeletableGroupResultCollection(Arrays.asList(
+ new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-1"),
+ new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-2")
+ .setErrorCode(Errors.INVALID_GROUP_ID.code()),
+ new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-3")
+ ).iterator());
+ List<Record> expectedRecords = Arrays.asList(
+ RecordHelpers.newOffsetCommitTombstoneRecord("group-id-1",
"topic-name", 0),
+ RecordHelpers.newGroupMetadataTombstoneRecord("group-id-1"),
+ RecordHelpers.newOffsetCommitTombstoneRecord("group-id-3",
"topic-name", 0),
+ RecordHelpers.newGroupMetadataTombstoneRecord("group-id-3")
+ );
+
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> expectedResult = new CoordinatorResult<>(
+ expectedRecords,
+ expectedResultCollection
+ );
+
+ doThrow(Errors.INVALID_GROUP_ID.exception())
Review Comment:
ditto.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,11 +119,126 @@ public void testCommitOffset() {
assertEquals(result, coordinator.commitOffset(context, request));
}
+ @Test
+ public void testDeleteGroups() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
+ groupMetadataManager,
+ offsetMetadataManager
+ );
+
+ RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+ List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection = new
DeleteGroupsResponseData.DeletableGroupResultCollection();
+ List<Record> expectedRecords = new ArrayList<>();
+ for (String groupId : groupIds) {
+ expectedResultCollection.add(new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(groupId));
+ expectedRecords.addAll(Arrays.asList(
+ RecordHelpers.newOffsetCommitTombstoneRecord(groupId,
"topic-name", 0),
+ RecordHelpers.newGroupMetadataTombstoneRecord(groupId)
+ ));
+ }
+
+
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> expectedResult = new CoordinatorResult<>(
+ expectedRecords,
+ expectedResultCollection
+ );
+
+
doNothing().when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.anyString());
+ when(offsetMetadataManager.deleteAllOffsets(anyString(),
anyList())).thenAnswer(invocation -> {
+ String groupId = invocation.getArgument(0);
+ List<Record> records = invocation.getArgument(1);
+ records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId,
"topic-name", 0));
+ return 1;
+ });
+ doAnswer(invocation -> {
+ String groupId = invocation.getArgument(0);
+ List<Record> records = invocation.getArgument(1);
+
records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId));
+ return null;
+ }).when(groupMetadataManager).deleteGroup(anyString(), anyList());
+
+
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> coordinatorResult =
+ coordinator.deleteGroups(context, groupIds);
+
+ for (String groupId : groupIds) {
+ verify(groupMetadataManager,
times(1)).validateDeleteGroup(ArgumentMatchers.eq(groupId));
+ verify(groupMetadataManager,
times(1)).deleteGroup(ArgumentMatchers.eq(groupId), anyList());
+ verify(offsetMetadataManager,
times(1)).deleteAllOffsets(ArgumentMatchers.eq(groupId), anyList());
+ }
+ assertEquals(expectedResult, coordinatorResult);
+ }
+
+ @Test
+ public void testDeleteGroupsInvalidGroupId() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
+ groupMetadataManager,
+ offsetMetadataManager
+ );
+
+ RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+ List<String> groupIds = Arrays.asList("group-id-1", "group-id-2",
"group-id-3");
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection =
+ new
DeleteGroupsResponseData.DeletableGroupResultCollection(Arrays.asList(
+ new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-1"),
+ new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-2")
+ .setErrorCode(Errors.INVALID_GROUP_ID.code()),
+ new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-3")
+ ).iterator());
+ List<Record> expectedRecords = Arrays.asList(
+ RecordHelpers.newOffsetCommitTombstoneRecord("group-id-1",
"topic-name", 0),
+ RecordHelpers.newGroupMetadataTombstoneRecord("group-id-1"),
+ RecordHelpers.newOffsetCommitTombstoneRecord("group-id-3",
"topic-name", 0),
+ RecordHelpers.newGroupMetadataTombstoneRecord("group-id-3")
+ );
+
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> expectedResult = new CoordinatorResult<>(
+ expectedRecords,
+ expectedResultCollection
+ );
+
+ doThrow(Errors.INVALID_GROUP_ID.exception())
+
.when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.eq("group-id-2"));
+ doAnswer(invocation -> {
+ String groupId = invocation.getArgument(0);
+ List<Record> records = invocation.getArgument(1);
+ records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId,
"topic-name", 0));
+ return null;
+ }).when(offsetMetadataManager).deleteAllOffsets(anyString(),
anyList());
+ doAnswer(invocation -> {
Review Comment:
ditto.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -259,30 +262,11 @@ public
CompletableFuture<ConsumerGroupHeartbeatResponseData> consumerGroupHeartb
"consumer-group-heartbeat",
topicPartitionFor(request.groupId()),
coordinator -> coordinator.consumerGroupHeartbeat(context, request)
- ).exceptionally(exception -> {
- if (exception instanceof UnknownTopicOrPartitionException ||
- exception instanceof NotEnoughReplicasException) {
- return new ConsumerGroupHeartbeatResponseData()
- .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code());
- }
-
- if (exception instanceof NotLeaderOrFollowerException ||
- exception instanceof KafkaStorageException) {
- return new ConsumerGroupHeartbeatResponseData()
- .setErrorCode(Errors.NOT_COORDINATOR.code());
- }
-
- if (exception instanceof RecordTooLargeException ||
- exception instanceof RecordBatchTooLargeException ||
- exception instanceof InvalidFetchSizeException) {
- return new ConsumerGroupHeartbeatResponseData()
- .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code());
- }
-
- return new ConsumerGroupHeartbeatResponseData()
- .setErrorCode(Errors.forException(exception).code())
- .setErrorMessage(exception.getMessage());
- });
+ ).exceptionally(exception ->
+ new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(getErrorsForException(exception).code())
Review Comment:
I think that we should be careful with this. The change is not 100%
equivalent to the previous implementation here because the error message is not
set for all errors whereas it was only set of a sub set before. While I agree
that we could do better, I would suggest to tackle this in a separate PR.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -262,6 +278,51 @@ public HeartbeatResponseData genericGroupHeartbeat(
);
}
+ /**
+ * Handles a DeleteGroups request.
+ *
+ * @param context The request context.
+ * @param groupIds The groupIds of the groups to be deleted
+ * @return A Result containing the
DeleteGroupsResponseData.DeletableGroupResultCollection response and
+ * a list of records to update the state machine.
+ */
+ public
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> deleteGroups(
+ RequestContext context,
+ List<String> groupIds
+ ) throws ApiException {
+ final DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection =
+ new
DeleteGroupsResponseData.DeletableGroupResultCollection(groupIds.size());
+ final List<Record> records = new ArrayList<>();
+ final AtomicInteger numDeletedOffsets = new AtomicInteger();
Review Comment:
Why do we need an AtomicInteger here?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,87 @@ public CoordinatorResult<OffsetCommitResponseData, Record>
commitOffset(
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Handles an OffsetDelete request.
+ *
+ * @param request The OffsetDelete request.
+ *
+ * @return A Result containing the OffsetDeleteResponseData response and
+ * a list of records to update the state machine.
+ */
+ public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+ OffsetDeleteRequestData request
+ ) throws ApiException {
+ final Group group = validateOffsetDelete(request);
+ final List<Record> records = new ArrayList<>();
+ final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
responseTopicCollection =
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+ final OffsetDeleteResponseData response = new
OffsetDeleteResponseData();
+ final TimelineHashMap<String, TimelineHashMap<Integer,
OffsetAndMetadata>> offsetsByTopic =
+ offsetsByGroup.get(request.groupId());
+
+ request.topics().forEach(topic -> {
+ final
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
responsePartitionCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+ final TimelineHashMap<Integer, OffsetAndMetadata>
offsetsByPartition = offsetsByTopic == null ?
+ null : offsetsByTopic.get(topic.name());
+
+ if (group.isSubscribedToTopic(topic.name())) {
+ topic.partitions().forEach(partition ->
+ responsePartitionCollection.add(new
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+ .setPartitionIndex(partition.partitionIndex())
+ .setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code())
+ )
+ );
+ } else {
+ topic.partitions().forEach(partition -> {
+ if (offsetsByPartition != null &&
offsetsByPartition.containsKey(partition.partitionIndex())) {
+ responsePartitionCollection.add(new
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+ .setPartitionIndex(partition.partitionIndex())
+ );
+
records.add(RecordHelpers.newOffsetCommitTombstoneRecord(
+ request.groupId(),
+ topic.name(),
+ partition.partitionIndex()
+ ));
+ }
+ });
+ }
+
+ final OffsetDeleteResponseData.OffsetDeleteResponseTopic
responseTopic =
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopic()
+ .setName(topic.name())
+ .setPartitions(responsePartitionCollection);
+ responseTopicCollection.add(responseTopic);
+ });
+ response.setTopics(responseTopicCollection);
+
+ return new CoordinatorResult<>(records, response);
+ }
+
+ /**
+ * Deletes offsets as part of a DeleteGroups request.
+ * Populates the record list passed in with records to update the state
machine.
+ * Validations are done in {@link
GroupCoordinatorShard#deleteGroups(RequestContext, List)}
+ *
+ * @param groupId The ID of the given group.
+ * @param records The record list to populate.
+ */
+ public void deleteAllOffsets(
+ String groupId,
+ List<Record> records
+ ) {
+ TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>>
offsetsByTopic = offsetsByGroup.get(groupId);
+
+ if (offsetsByTopic != null) {
Review Comment:
If you look at the usage in
`[GroupCoordinatorShard.java](https://github.com/apache/kafka/pull/14408/files#diff-d6369ef583dce1f7570cf396d7a4762c679fd2af323e1e1f93c9b665258373a0)`,
all offsets are removed before deleting the group.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,11 +119,126 @@ public void testCommitOffset() {
assertEquals(result, coordinator.commitOffset(context, request));
}
+ @Test
+ public void testDeleteGroups() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
+ groupMetadataManager,
+ offsetMetadataManager
+ );
+
+ RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+ List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection = new
DeleteGroupsResponseData.DeletableGroupResultCollection();
+ List<Record> expectedRecords = new ArrayList<>();
+ for (String groupId : groupIds) {
+ expectedResultCollection.add(new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(groupId));
+ expectedRecords.addAll(Arrays.asList(
+ RecordHelpers.newOffsetCommitTombstoneRecord(groupId,
"topic-name", 0),
+ RecordHelpers.newGroupMetadataTombstoneRecord(groupId)
+ ));
+ }
+
+
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> expectedResult = new CoordinatorResult<>(
+ expectedRecords,
+ expectedResultCollection
+ );
+
+
doNothing().when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.anyString());
+ when(offsetMetadataManager.deleteAllOffsets(anyString(),
anyList())).thenAnswer(invocation -> {
+ String groupId = invocation.getArgument(0);
+ List<Record> records = invocation.getArgument(1);
+ records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId,
"topic-name", 0));
+ return 1;
+ });
+ doAnswer(invocation -> {
+ String groupId = invocation.getArgument(0);
+ List<Record> records = invocation.getArgument(1);
+
records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId));
+ return null;
+ }).when(groupMetadataManager).deleteGroup(anyString(), anyList());
+
+
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> coordinatorResult =
+ coordinator.deleteGroups(context, groupIds);
+
+ for (String groupId : groupIds) {
+ verify(groupMetadataManager,
times(1)).validateDeleteGroup(ArgumentMatchers.eq(groupId));
+ verify(groupMetadataManager,
times(1)).deleteGroup(ArgumentMatchers.eq(groupId), anyList());
+ verify(offsetMetadataManager,
times(1)).deleteAllOffsets(ArgumentMatchers.eq(groupId), anyList());
+ }
+ assertEquals(expectedResult, coordinatorResult);
+ }
+
+ @Test
+ public void testDeleteGroupsInvalidGroupId() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
+ groupMetadataManager,
+ offsetMetadataManager
+ );
+
+ RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+ List<String> groupIds = Arrays.asList("group-id-1", "group-id-2",
"group-id-3");
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection =
+ new
DeleteGroupsResponseData.DeletableGroupResultCollection(Arrays.asList(
+ new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-1"),
+ new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-2")
+ .setErrorCode(Errors.INVALID_GROUP_ID.code()),
+ new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-3")
+ ).iterator());
+ List<Record> expectedRecords = Arrays.asList(
+ RecordHelpers.newOffsetCommitTombstoneRecord("group-id-1",
"topic-name", 0),
+ RecordHelpers.newGroupMetadataTombstoneRecord("group-id-1"),
+ RecordHelpers.newOffsetCommitTombstoneRecord("group-id-3",
"topic-name", 0),
+ RecordHelpers.newGroupMetadataTombstoneRecord("group-id-3")
+ );
+
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> expectedResult = new CoordinatorResult<>(
+ expectedRecords,
+ expectedResultCollection
+ );
+
+ doThrow(Errors.INVALID_GROUP_ID.exception())
+
.when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.eq("group-id-2"));
+ doAnswer(invocation -> {
Review Comment:
ditto.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,11 +119,126 @@ public void testCommitOffset() {
assertEquals(result, coordinator.commitOffset(context, request));
}
+ @Test
+ public void testDeleteGroups() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
+ groupMetadataManager,
+ offsetMetadataManager
+ );
+
+ RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+ List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection = new
DeleteGroupsResponseData.DeletableGroupResultCollection();
+ List<Record> expectedRecords = new ArrayList<>();
+ for (String groupId : groupIds) {
+ expectedResultCollection.add(new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(groupId));
+ expectedRecords.addAll(Arrays.asList(
+ RecordHelpers.newOffsetCommitTombstoneRecord(groupId,
"topic-name", 0),
+ RecordHelpers.newGroupMetadataTombstoneRecord(groupId)
+ ));
+ }
+
+
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> expectedResult = new CoordinatorResult<>(
+ expectedRecords,
+ expectedResultCollection
+ );
+
+
doNothing().when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.anyString());
+ when(offsetMetadataManager.deleteAllOffsets(anyString(),
anyList())).thenAnswer(invocation -> {
+ String groupId = invocation.getArgument(0);
+ List<Record> records = invocation.getArgument(1);
+ records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId,
"topic-name", 0));
+ return 1;
+ });
+ doAnswer(invocation -> {
Review Comment:
ditto.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,11 +119,126 @@ public void testCommitOffset() {
assertEquals(result, coordinator.commitOffset(context, request));
}
+ @Test
+ public void testDeleteGroups() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
+ groupMetadataManager,
+ offsetMetadataManager
+ );
+
+ RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+ List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection = new
DeleteGroupsResponseData.DeletableGroupResultCollection();
+ List<Record> expectedRecords = new ArrayList<>();
+ for (String groupId : groupIds) {
+ expectedResultCollection.add(new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(groupId));
+ expectedRecords.addAll(Arrays.asList(
+ RecordHelpers.newOffsetCommitTombstoneRecord(groupId,
"topic-name", 0),
+ RecordHelpers.newGroupMetadataTombstoneRecord(groupId)
+ ));
+ }
+
+
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> expectedResult = new CoordinatorResult<>(
+ expectedRecords,
+ expectedResultCollection
+ );
+
+
doNothing().when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.anyString());
Review Comment:
I agree. I mentioned this a few times a well.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +349,94 @@ public CoordinatorResult<OffsetCommitResponseData, Record>
commitOffset(
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Handles an OffsetDelete request.
+ *
+ * @param request The OffsetDelete request.
+ *
+ * @return A Result containing the OffsetDeleteResponseData response and
+ * a list of records to update the state machine.
+ */
+ public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+ OffsetDeleteRequestData request
+ ) throws ApiException {
+ final Group group = validateOffsetDelete(request);
+ final List<Record> records = new ArrayList<>();
+ final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
responseTopicCollection =
Review Comment:
I guess that they don't hurt, isn't it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]