chirag-wadhwa5 commented on code in PR #19478:
URL: https://github.com/apache/kafka/pull/19478#discussion_r2046439950


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -704,30 +710,102 @@ public DeleteShareGroupOffsetsResultHolder 
shareGroupDeleteOffsetsRequest(
                 groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(
                     groupId,
                     requestData,
-                    errorTopicResponseList
+                    errorTopicResponseList,
+                    records
                 );
 
             if (deleteShareGroupStateRequestTopicsData.isEmpty()) {
-                return new 
DeleteShareGroupOffsetsResultHolder(Errors.NONE.code(), null, 
errorTopicResponseList);
+                return new CoordinatorResult<>(
+                    records,
+                    new 
DeleteShareGroupOffsetsResultHolder(Errors.NONE.code(), null, 
errorTopicResponseList)
+                );
             }
 
             DeleteShareGroupStateRequestData deleteShareGroupStateRequestData 
= new DeleteShareGroupStateRequestData()
                 .setGroupId(requestData.groupId())
                 .setTopics(deleteShareGroupStateRequestTopicsData);
 
-            return new DeleteShareGroupOffsetsResultHolder(
-                Errors.NONE.code(),
-                null,
-                errorTopicResponseList,
-                
DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData)
+            return new CoordinatorResult<>(
+                records,
+                new DeleteShareGroupOffsetsResultHolder(
+                    Errors.NONE.code(),
+                    null,
+                    errorTopicResponseList,
+                    
DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData)
+                )
             );
 
         } catch (GroupIdNotFoundException exception) {
             log.error("groupId {} not found", groupId, exception);
-            return new 
DeleteShareGroupOffsetsResultHolder(Errors.GROUP_ID_NOT_FOUND.code(), 
exception.getMessage());
+            return new CoordinatorResult<>(
+                records,
+                new 
DeleteShareGroupOffsetsResultHolder(Errors.GROUP_ID_NOT_FOUND.code(), 
exception.getMessage())
+            );
         } catch (GroupNotEmptyException exception) {
             log.error("Provided group {} is not empty", groupId);
-            return new 
DeleteShareGroupOffsetsResultHolder(Errors.NON_EMPTY_GROUP.code(), 
exception.getMessage());
+            return new CoordinatorResult<>(
+                records,
+                new 
DeleteShareGroupOffsetsResultHolder(Errors.NON_EMPTY_GROUP.code(), 
exception.getMessage())
+            );
+        }
+    }
+
+    /**
+     * Completes the share group offset deletion by creating a 
ShareGroupStatePartitionMetadataRecord removing the
+     * deleted topics from deletingTopics set. Returns the final response for 
DeleteShareGroupOffsetsRequest
+     *
+     * @param groupId - The group ID
+     * @param topics - The set of topics which were deleted successfully by 
the persister
+     * @return the final response {@link DeleteShareGroupOffsetsResponseData} 
for the DeleteShareGroupOffsetsRequest
+     */
+    public CoordinatorResult<DeleteShareGroupOffsetsResponseData, 
CoordinatorRecord> completeDeleteShareGroupOffsets(
+        String groupId,
+        Map<Uuid, String> topics,
+        
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
errorTopicResponseList
+    ) {
+        List<CoordinatorRecord> records = new ArrayList<>();
+        try {

Review Comment:
   Thanks for the review. Just trying to be completely safe here, but I think 
there could be a situation where let's say a group was empty and a delete share 
group offsets request was sent for an initialized topics. But before the final 
ShareGroupStatePartitionMetadata record was written, another member joined the 
group. This time the group wouldn't be empty. While thinking about it I realize 
this requires some discussion. Because the purpose of a group to be empty 
before offsets can be deleted is because we don't want any active member to get 
confused about which offset to start consuming from. And if the topics are part 
of deletingTopics list, then those topics will never be read from by the 
consumers. I think removing these checks would be the correct thing, because in 
the above described scenario, the deletion would ultimately fail after addition 
of a new member, which should not be the case. Even if a new member joins, they 
will never be able to consume from a topic in the delet
 ingTopics list so the deletion should proceed without errorring out. I will 
update the PR with this change. Thanks again !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to