jeffkbkim commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1339019021
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -90,4 +92,29 @@ void validateOffsetFetch(
int memberEpoch,
long lastCommittedOffset
) throws KafkaException;
+
+ /**
+ * Validates the OffsetDelete request.
+ */
+ void validateOffsetDelete() throws KafkaException;
+
+ /**
+ * Validates the GroupDelete request.
+ */
+ void validateGroupDelete() throws KafkaException;
Review Comment:
should these be DeleteGroup?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##########
@@ -849,6 +853,43 @@ public void validateOffsetFetch(
}
}
+ /**
+ * Validates the OffsetDelete request.
+ */
+ @Override
+ public void validateOffsetDelete() throws ApiException {
+ if (isInState(DEAD)) {
+ throw new GroupIdNotFoundException(String.format("Group %s is in
dead state.", groupId));
+ } else if (isInState(STABLE)
+ || isInState(PREPARING_REBALANCE)
+ || isInState(COMPLETING_REBALANCE)) {
+ throw Errors.NON_EMPTY_GROUP.exception();
+ }
+ }
+
+ /**
+ * Validates the GroupDelete request.
+ */
+ @Override
+ public void validateGroupDelete() throws ApiException {
+ if (isInState(DEAD)) {
+ throw new GroupIdNotFoundException(String.format("Group %s is in
dead state.", groupId));
+ } else if (isInState(STABLE)
+ || isInState(PREPARING_REBALANCE)
+ || isInState(COMPLETING_REBALANCE)) {
+ throw Errors.NON_EMPTY_GROUP.exception();
+ }
Review Comment:
in the existing implementation, we transition to DEAD if the group is empty
so that even if the write operation fails we delete the group in the next purge
cycle.
we don't need to do this here since if the write operation fails we revert
to the previous state and return an error so the client knows that the
operation failed. is this correct?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -936,4 +938,213 @@ public void
testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
assertEquals(expectedResponse, future.get());
}
+
+ @Test
+ public void testDeleteOffsets() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+ requestTopicCollection.add(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ );
+ OffsetDeleteRequestData request = new
OffsetDeleteRequestData().setGroupId("group")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
responsePartitionCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+ responsePartitionCollection.add(
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0)
+ );
+ OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
responseTopicCollection =
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+ responseTopicCollection.add(
+ new
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection)
+ );
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setTopics(responseTopicCollection);
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offset"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(response));
+
+ CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(response, future.get());
+ }
+
+ @Test
+ public void testDeleteOffsetsInvalidGroupId() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+ requestTopicCollection.add(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ );
+ OffsetDeleteRequestData request = new
OffsetDeleteRequestData().setGroupId("")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offset"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(response));
+
+ CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(response, future.get());
+ }
+
+ @Test
+ public void testDeleteOffsetsCoordinatorNotAvailableException() throws
Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+ requestTopicCollection.add(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ );
+ OffsetDeleteRequestData request = new
OffsetDeleteRequestData().setGroupId("group")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offset"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(FutureUtils.failedFuture(
+ new CoordinatorLoadInProgressException(null)
+ ));
+
+ CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(response, future.get());
+ }
+
+ @Test
+ public void testDeleteGroups() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 3);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection1 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ DeleteGroupsResponseData.DeletableGroupResult result1 =
+ new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-1");
Review Comment:
can we follow the same line break as in L1101-1102? same for result2
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,80 @@ public CoordinatorResult<OffsetCommitResponseData, Record>
commitOffset(
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Handles an OffsetDelete request.
+ *
+ * @param request The OffsetDelete request.
+ *
+ * @return A Result containing the OffsetDeleteResponseData response and
+ * a list of records to update the state machine.
+ */
+ public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+ OffsetDeleteRequestData request
+ ) throws ApiException {
+ final Group group = validateOffsetDelete(request);
+ final List<Record> records = new ArrayList<>();
+ final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
responseTopicCollection =
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+ final OffsetDeleteResponseData response = new
OffsetDeleteResponseData();
+ final TimelineHashMap<String, TimelineHashMap<Integer,
OffsetAndMetadata>> offsetsByTopic =
+ offsetsByGroup.get(request.groupId());
+
+ request.topics().forEach(topic -> {
+ final
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
responsePartitionCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+ final TimelineHashMap<Integer, OffsetAndMetadata>
offsetsByPartition = offsetsByTopic == null ?
+ null : offsetsByTopic.get(topic.name());
+
+ topic.partitions().forEach(partition -> {
+ final OffsetDeleteResponseData.OffsetDeleteResponsePartition
responsePartition =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(partition.partitionIndex());
+ if (group.isSubscribedToTopic(topic.name())) {
+
responsePartition.setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code());
+ } else if (offsetsByPartition != null &&
offsetsByPartition.containsKey(partition.partitionIndex())) {
+ records.add(RecordHelpers.newOffsetCommitTombstoneRecord(
+ request.groupId(),
+ topic.name(),
+ partition.partitionIndex()
+ ));
+ }
+ responsePartitionCollection.add(responsePartition);
+ });
+
+ final OffsetDeleteResponseData.OffsetDeleteResponseTopic
responseTopic =
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopic()
+ .setName(topic.name())
+ .setPartitions(responsePartitionCollection);
+ responseTopicCollection.add(responseTopic);
+ });
+ response.setTopics(responseTopicCollection);
+
+ return new CoordinatorResult<>(records, response);
+ }
+
+ /**
+ * Deletes offsets as part of a DeleteGroups request.
+ * Populates the record list passed in with records to update the state
machine.
+ * Validations are done in deleteGroups method in GroupCoordinatorShard.
Review Comment:
Validations are done in `{@link
GroupCoordinatorShard#deleteGroups(RequestContext, List)}`
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -90,4 +92,29 @@ void validateOffsetFetch(
int memberEpoch,
long lastCommittedOffset
) throws KafkaException;
+
+ /**
+ * Validates the OffsetDelete request.
+ */
+ void validateOffsetDelete() throws KafkaException;
+
+ /**
+ * Validates the GroupDelete request.
+ */
+ void validateGroupDelete() throws KafkaException;
+
+ /**
+ * Returns true if the group is actively subscribed to the topic.
+ *
+ * @param topic The topic name.
+ * @return whether the group is subscribed to the topic.
Review Comment:
Whether
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -523,9 +526,78 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ final
List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>>
futures =
+ new ArrayList<>(groupIds.size());
+
+ final Map<TopicPartition, List<String>> groupsByTopicPartition = new
HashMap<>();
+ groupIds.forEach(groupId -> {
+ // For backwards compatibility, we support DeleteGroups for the
empty group id.
+ if (groupId == null) {
+
futures.add(CompletableFuture.completedFuture(DeleteGroupsRequest.getErrorResultCollection(
+ Collections.singletonList(null),
+ Errors.INVALID_GROUP_ID
+ )));
+ } else {
+ final TopicPartition topicPartition =
topicPartitionFor(groupId);
+ groupsByTopicPartition
+ .computeIfAbsent(topicPartition, __ -> new ArrayList<>())
+ .add(groupId);
+ }
+ });
+
+ groupsByTopicPartition.forEach((topicPartition, groupList) -> {
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
+ runtime.scheduleWriteOperation(
+ "delete-group",
Review Comment:
should this be "delete-groups"?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -262,6 +267,44 @@ public HeartbeatResponseData genericGroupHeartbeat(
);
}
+ /**
+ * Handles a DeleteGroups request.
+ *
+ * @param context The request context.
+ * @param groupIds The groupIds of the groups to be deleted
+ * @return A Result containing the
DeleteGroupsResponseData.DeletableGroupResultCollection response and
+ * a list of records to update the state machine.
+ */
+ public
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection,
Record> deleteGroups(
+ RequestContext context,
+ List<String> groupIds
+ ) throws ApiException {
+ final DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ final List<Record> records = new ArrayList<>();
+
+ groupIds.forEach(groupId -> {
+ try {
+ groupMetadataManager.validateGroupDelete(groupId);
+ offsetMetadataManager.deleteAllOffsets(groupId, records);
+ groupMetadataManager.deleteGroup(groupId, records);
+
+ resultCollection.add(
+ new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId(groupId)
+ );
+ } catch (ApiException exception) {
+ resultCollection.add(
+ new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId(groupId)
+ .setErrorCode(Errors.forException(exception).code())
+ );
+ }
+ });
+
+ return new CoordinatorResult<>(records, resultCollection);
Review Comment:
the current implementation logs how many groups and offsets were removed.
should we add something similar?
```
info(s"The following groups were deleted:
${groupsEligibleForDeletion.map(_.groupId).mkString(", ")}. " +
s"A total of $offsetsRemoved offsets were removed.")
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3071,6 +3071,31 @@ private void removeCurrentMemberFromGenericGroup(
group.remove(member.memberId());
}
+ /**
+ * Handles a GroupDelete request.
+ * Populates the record list passed in with record to update the state
machine.
+ * Validations are done in deleteGroups method in GroupCoordinatorShard.
Review Comment:
ditto on link
can we add a comment on why we don't expect an exception to be thrown here?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3071,6 +3071,31 @@ private void removeCurrentMemberFromGenericGroup(
group.remove(member.memberId());
}
+ /**
+ * Handles a GroupDelete request.
+ * Populates the record list passed in with record to update the state
machine.
+ * Validations are done in deleteGroups method in GroupCoordinatorShard.
+ *
+ * @param groupId The group id of the group to be deleted.
+ * @param records The record list to populate.
+ */
+ public void deleteGroup(
+ String groupId,
+ List<Record> records
+ ) {
+ records.addAll(group(groupId).createGroupTombstoneRecords());
+ }
+
+ /**
+ * Validates the GroupDelete request.
+ *
+ * @param groupId The group id of the group to be deleted.
Review Comment:
The id of the group to be deleted.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,80 @@ public CoordinatorResult<OffsetCommitResponseData, Record>
commitOffset(
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Handles an OffsetDelete request.
+ *
+ * @param request The OffsetDelete request.
+ *
+ * @return A Result containing the OffsetDeleteResponseData response and
+ * a list of records to update the state machine.
+ */
+ public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+ OffsetDeleteRequestData request
+ ) throws ApiException {
+ final Group group = validateOffsetDelete(request);
+ final List<Record> records = new ArrayList<>();
+ final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
responseTopicCollection =
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+ final OffsetDeleteResponseData response = new
OffsetDeleteResponseData();
+ final TimelineHashMap<String, TimelineHashMap<Integer,
OffsetAndMetadata>> offsetsByTopic =
+ offsetsByGroup.get(request.groupId());
+
+ request.topics().forEach(topic -> {
+ final
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
responsePartitionCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+ final TimelineHashMap<Integer, OffsetAndMetadata>
offsetsByPartition = offsetsByTopic == null ?
+ null : offsetsByTopic.get(topic.name());
+
+ topic.partitions().forEach(partition -> {
+ final OffsetDeleteResponseData.OffsetDeleteResponsePartition
responsePartition =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(partition.partitionIndex());
+ if (group.isSubscribedToTopic(topic.name())) {
Review Comment:
can we move this check outside of the forEach block? we perform this check
for every partition of the topic
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##########
@@ -849,6 +853,43 @@ public void validateOffsetFetch(
}
}
+ /**
+ * Validates the OffsetDelete request.
+ */
+ @Override
+ public void validateOffsetDelete() throws ApiException {
+ if (isInState(DEAD)) {
+ throw new GroupIdNotFoundException(String.format("Group %s is in
dead state.", groupId));
+ } else if (isInState(STABLE)
Review Comment:
don't we also need to check whether the stable group is using the
ConsumerProtocol.PROTOCOL_TYPE?
from
```
case PreparingRebalance | CompletingRebalance | Stable if
group.isConsumerGroup =>
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -936,4 +938,213 @@ public void
testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
assertEquals(expectedResponse, future.get());
}
+
+ @Test
+ public void testDeleteOffsets() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+ requestTopicCollection.add(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ );
+ OffsetDeleteRequestData request = new
OffsetDeleteRequestData().setGroupId("group")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
responsePartitionCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+ responsePartitionCollection.add(
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0)
+ );
+ OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
responseTopicCollection =
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+ responseTopicCollection.add(
+ new
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection)
+ );
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setTopics(responseTopicCollection);
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offset"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(response));
+
+ CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(response, future.get());
+ }
+
+ @Test
+ public void testDeleteOffsetsInvalidGroupId() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+ requestTopicCollection.add(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ );
+ OffsetDeleteRequestData request = new
OffsetDeleteRequestData().setGroupId("")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offset"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(response));
+
+ CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(response, future.get());
+ }
+
+ @Test
+ public void testDeleteOffsetsCoordinatorNotAvailableException() throws
Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+ requestTopicCollection.add(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ );
+ OffsetDeleteRequestData request = new
OffsetDeleteRequestData().setGroupId("group")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offset"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(FutureUtils.failedFuture(
+ new CoordinatorLoadInProgressException(null)
+ ));
+
+ CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(response, future.get());
+ }
+
+ @Test
+ public void testDeleteGroups() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 3);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection1 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ DeleteGroupsResponseData.DeletableGroupResult result1 =
+ new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-1");
+ resultCollection1.add(result1);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection2 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ DeleteGroupsResponseData.DeletableGroupResult result2 =
+ new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-2");
+ resultCollection2.add(result2);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection3 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
Review Comment:
do we need this?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -936,4 +938,213 @@ public void
testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
assertEquals(expectedResponse, future.get());
}
+
+ @Test
+ public void testDeleteOffsets() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+ requestTopicCollection.add(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ );
+ OffsetDeleteRequestData request = new
OffsetDeleteRequestData().setGroupId("group")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
responsePartitionCollection =
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+ responsePartitionCollection.add(
+ new
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0)
+ );
+ OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
responseTopicCollection =
+ new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+ responseTopicCollection.add(
+ new
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection)
+ );
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setTopics(responseTopicCollection);
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offset"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(response));
+
+ CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(response, future.get());
+ }
+
+ @Test
+ public void testDeleteOffsetsInvalidGroupId() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+ requestTopicCollection.add(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ );
+ OffsetDeleteRequestData request = new
OffsetDeleteRequestData().setGroupId("")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offset"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(response));
+
+ CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(response, future.get());
+ }
+
+ @Test
+ public void testDeleteOffsetsCoordinatorNotAvailableException() throws
Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection
requestTopicCollection =
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+ requestTopicCollection.add(
+ new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(
+ new
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+ ))
+ );
+ OffsetDeleteRequestData request = new
OffsetDeleteRequestData().setGroupId("group")
+ .setTopics(requestTopicCollection);
+
+ OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+ .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-offset"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(FutureUtils.failedFuture(
+ new CoordinatorLoadInProgressException(null)
+ ));
+
+ CompletableFuture<OffsetDeleteResponseData> future =
service.deleteOffsets(
+ requestContext(ApiKeys.OFFSET_DELETE),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(response, future.get());
+ }
+
+ @Test
+ public void testDeleteGroups() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 3);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection1 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ DeleteGroupsResponseData.DeletableGroupResult result1 =
+ new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-1");
+ resultCollection1.add(result1);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection2 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ DeleteGroupsResponseData.DeletableGroupResult result2 =
+ new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-2");
+ resultCollection2.add(result2);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection3 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ DeleteGroupsResponseData.DeletableGroupResult result3 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-3")
+ .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+ resultCollection3.add(result3);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.addAll(Arrays.asList(
+ new
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(null).setErrorCode(Errors.INVALID_GROUP_ID.code()),
+ result1.duplicate(),
+ result2.duplicate(),
+ result3.duplicate()
+ ));
+
+ when(runtime.partitions()).thenReturn(Sets.newSet(
+ new TopicPartition("__consumer_offsets", 0),
+ new TopicPartition("__consumer_offsets", 1),
+ new TopicPartition("__consumer_offsets", 2)
+ ));
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-group"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-group"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenAnswer(invocation -> {
+ Thread.sleep(1000);
Review Comment:
in general, it's not a good practice to use Thread.sleep in tests. also, i
don't think this tests what we actually want to test.
We want to confirm that the final future is not completed until this
operation completes. So i propose:
1. have this thread wait
2. confirm future did not complete
3. unblock this thread
4. confirm future completes
something like the following:
```
CountDownLatch latch = new CountDownLatch(1);
...
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("delete-group"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenAnswer(invocation -> CompletableFuture.supplyAsync(() -> {
try {
assertTrue(latch.await(5, TimeUnit.SECONDS));
} catch (InterruptedException ignored) {}
return resultCollection2;
}));
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("delete-group"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)),
ArgumentMatchers.any()
)).thenReturn(FutureUtils.failedFuture(new
CoordinatorLoadInProgressException(null)));
List<String> groupIds = Arrays.asList("group-id-1", "group-id-2",
"group-id-3", null);
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS),
groupIds, BufferSupplier.NO_CACHING);
assertFalse(future.isDone());
latch.countDown();
TestUtils.waitForCondition(future::isDone, "The future did not
complete.");
assertTrue(expectedResultCollection.containsAll(future.get()));
assertTrue(future.get().containsAll(expectedResultCollection));
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -705,9 +777,39 @@ public CompletableFuture<OffsetDeleteResponseData>
deleteOffsets(
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ if (!isGroupIdNotEmpty(request.groupId())) {
+ return CompletableFuture.completedFuture(new
OffsetDeleteResponseData()
+ .setErrorCode(Errors.INVALID_GROUP_ID.code())
+ );
+ }
+
+ return runtime.scheduleWriteOperation(
+ "delete-offset",
Review Comment:
should this be "delete-offsets"?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]