chickenchickenlove commented on code in PR #21579:
URL: https://github.com/apache/kafka/pull/21579#discussion_r3179925158


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java:
##########
@@ -899,23 +900,43 @@ private static Map<String, SortedSet<Integer>> 
toTasksAssignment(final List<Stre
     }
 
     /**
-     * Leaves the group when the member closes.
+     * Closes the member's participation in the group, honoring the requested 
{@link CloseOptions.GroupMembershipOperation}:
      *
-     * <p>
-     * This method does the following:
-     * <ol>
-     *     <li>Transitions member state to {@link 
MemberState#PREPARE_LEAVING}.</li>
-     *     <li>Skips the invocation of the revocation callback or lost 
callback.</li>
-     *     <li>Clears the current and target assignment, unsubscribes from all 
topics and
-     *     transitions the member state to {@link MemberState#LEAVING}.</li>
-     * </ol>
-     * States {@link MemberState#PREPARE_LEAVING} and {@link 
MemberState#LEAVING} cause the heartbeat request manager
-     * to send a leave group heartbeat.
-     * </p>
+     * <ul>
+     *   <li>{@code REMAIN_IN_GROUP}: clears local task assignments, 
unsubscribes, and completes
+     *       immediately without sending a leave group heartbeat. The broker 
will remove the member
+     *       via session timeout.</li>
+     *   <li>{@code DEFAULT} / {@code LEAVE_GROUP}: transitions to
+     *       {@link MemberState#PREPARE_LEAVING} → {@link MemberState#LEAVING} 
and sends the leave
+     *       group heartbeat.</li>
+     * </ul>
      *
-     * @return future that will complete when the heartbeat to leave the group 
has been sent out.
+     * @param membershipOperation the requested close behavior
+     * @return future that will complete when the close operation is done
      */
-    public CompletableFuture<Void> leaveGroupOnClose() {
+    public CompletableFuture<Void> leaveGroupOnClose(final 
CloseOptions.GroupMembershipOperation membershipOperation) {
+        if (CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP == 
membershipOperation) {
+            // Skip the leave group heartbeat. Clean up locally and complete 
immediately.
+            // The broker will remove the member from the group via session 
timeout.
+            log.info("Skipping leave group heartbeat for member {} with epoch 
{} because REMAIN_IN_GROUP was specified.",
+                memberId, memberEpoch);
+            if (isNotInGroup()) {
+                subscriptionState.unsubscribe();
+                notifyAssignmentChange(Collections.emptySet());
+                return CompletableFuture.completedFuture(null);
+            }
+            clearTaskAndPartitionAssignment();
+            subscriptionState.unsubscribe();
+            // UNSUBSCRIBED is only reachable from PREPARE_LEAVING, LEAVING, 
or FENCED.
+            // For any active state (JOINING/STABLE/RECONCILING/ACKNOWLEDGING) 
we must pass
+            // through PREPARE_LEAVING first; if already in PREPARE_LEAVING or 
LEAVING we
+            // can transition directly.
+            if (state != MemberState.PREPARE_LEAVING && state != 
MemberState.LEAVING) {
+                transitionTo(MemberState.PREPARE_LEAVING);
+            }
+            transitionTo(MemberState.UNSUBSCRIBED);
+            return CompletableFuture.completedFuture(null);
+        }

Review Comment:
   Even in the `REMAIN_IN_GROUP` case, I think it would be better to follow the 
same normal leaving path as the existing `private CompletableFuture<Void> 
leaveGroup(final boolean isOnClose) { ... }`.
   
   In the `ConsumerGroup` case, `leaveGroupOnClose()` stores the close 
operation in `leaveGroupOperation`, then transitions through the normal leaving 
path to the LEAVING state. At the heartbeat request generation stage, 
`ConsumerHeartbeatRequestManager#shouldSendLeaveHeartbeatNow()` looks at this 
operation and decides whether to send a leave heartbeat.
   
   In particular, it is intentionally designed to skip the leave heartbeat only 
for the dynamic member + `REMAIN_IN_GROUP` case.
   
   For `StreamsGroup` as well, rather than handling `REMAIN_IN_GROUP` here with 
an early return, I think it would be more aligned with the `ConsumerGroup` flow 
to preserve the normal leaving path and let 
`StreamsGroupHeartbeatRequestManager` decide whether to send or skip the 
heartbeat based on the operation.
   
   I think this structure would also be more natural when implementing 
client-side support for Streams static membership later.



##########
streams/src/main/java/org/apache/kafka/streams/CloseOptions.java:
##########
@@ -25,22 +25,32 @@ public class CloseOptions {
      * Enum to specify the group membership operation upon closing the Kafka 
Streams application.
      *
      * <ul>
-     *   <li><b>{@code LEAVE_GROUP}</b>: means the consumer leave the 
group.</li>
-     *   <li><b>{@code REMAIN_IN_GROUP}</b>: means the consumer will not leave 
the group explicitly.
-     *       Note that this option is ignored when using the streams group 
protocol
-     *       ({@code group.protocol=streams}); in that case, the consumer will 
always leave the group.</li>
+     *   <li><b>{@code DEFAULT}</b>: Applies the default behavior based on the 
active protocol:
+     *     <ul>
+     *       <li>For the <b>classic protocol</b>: The consumer will remain in 
the group.</li>
+     *       <li>For the <b>streams protocol</b> ({@code 
group.protocol=streams}): The consumer will leave
+     *           the group (consistent with the protocol's design for dynamic 
members).</li>
+     *     </ul>

Review Comment:
   Should we update this to clarify that CloseOptions.DEFAULT behaves 
differently for static members and dynamic members?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java:
##########
@@ -899,23 +900,43 @@ private static Map<String, SortedSet<Integer>> 
toTasksAssignment(final List<Stre
     }
 
     /**
-     * Leaves the group when the member closes.
+     * Closes the member's participation in the group, honoring the requested 
{@link CloseOptions.GroupMembershipOperation}:
      *
-     * <p>
-     * This method does the following:
-     * <ol>
-     *     <li>Transitions member state to {@link 
MemberState#PREPARE_LEAVING}.</li>
-     *     <li>Skips the invocation of the revocation callback or lost 
callback.</li>
-     *     <li>Clears the current and target assignment, unsubscribes from all 
topics and
-     *     transitions the member state to {@link MemberState#LEAVING}.</li>
-     * </ol>
-     * States {@link MemberState#PREPARE_LEAVING} and {@link 
MemberState#LEAVING} cause the heartbeat request manager
-     * to send a leave group heartbeat.
-     * </p>
+     * <ul>
+     *   <li>{@code REMAIN_IN_GROUP}: clears local task assignments, 
unsubscribes, and completes
+     *       immediately without sending a leave group heartbeat. The broker 
will remove the member
+     *       via session timeout.</li>
+     *   <li>{@code DEFAULT} / {@code LEAVE_GROUP}: transitions to
+     *       {@link MemberState#PREPARE_LEAVING} → {@link MemberState#LEAVING} 
and sends the leave
+     *       group heartbeat.</li>
+     * </ul>
      *
-     * @return future that will complete when the heartbeat to leave the group 
has been sent out.
+     * @param membershipOperation the requested close behavior
+     * @return future that will complete when the close operation is done
      */
-    public CompletableFuture<Void> leaveGroupOnClose() {
+    public CompletableFuture<Void> leaveGroupOnClose(final 
CloseOptions.GroupMembershipOperation membershipOperation) {
+        if (CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP == 
membershipOperation) {
+            // Skip the leave group heartbeat. Clean up locally and complete 
immediately.
+            // The broker will remove the member from the group via session 
timeout.
+            log.info("Skipping leave group heartbeat for member {} with epoch 
{} because REMAIN_IN_GROUP was specified.",

Review Comment:
   How about logging this after `isNotIngroup()`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java:
##########
@@ -899,23 +900,43 @@ private static Map<String, SortedSet<Integer>> 
toTasksAssignment(final List<Stre
     }
 
     /**
-     * Leaves the group when the member closes.
+     * Closes the member's participation in the group, honoring the requested 
{@link CloseOptions.GroupMembershipOperation}:
      *
-     * <p>
-     * This method does the following:
-     * <ol>
-     *     <li>Transitions member state to {@link 
MemberState#PREPARE_LEAVING}.</li>
-     *     <li>Skips the invocation of the revocation callback or lost 
callback.</li>
-     *     <li>Clears the current and target assignment, unsubscribes from all 
topics and
-     *     transitions the member state to {@link MemberState#LEAVING}.</li>
-     * </ol>
-     * States {@link MemberState#PREPARE_LEAVING} and {@link 
MemberState#LEAVING} cause the heartbeat request manager
-     * to send a leave group heartbeat.
-     * </p>
+     * <ul>
+     *   <li>{@code REMAIN_IN_GROUP}: clears local task assignments, 
unsubscribes, and completes
+     *       immediately without sending a leave group heartbeat. The broker 
will remove the member
+     *       via session timeout.</li>
+     *   <li>{@code DEFAULT} / {@code LEAVE_GROUP}: transitions to
+     *       {@link MemberState#PREPARE_LEAVING} → {@link MemberState#LEAVING} 
and sends the leave
+     *       group heartbeat.</li>
+     * </ul>
      *
-     * @return future that will complete when the heartbeat to leave the group 
has been sent out.
+     * @param membershipOperation the requested close behavior
+     * @return future that will complete when the close operation is done
      */
-    public CompletableFuture<Void> leaveGroupOnClose() {
+    public CompletableFuture<Void> leaveGroupOnClose(final 
CloseOptions.GroupMembershipOperation membershipOperation) {

Review Comment:
   `ConsumerGroup` stores the `leaveGroupOperation` here and follows the normal 
leaving path. Then, `AbstractHeartbeatRequestManager#poll()` decides whether to 
send or skip the Heartbeat request.
   
   Similarly, I think `StreamsGroup` should also add a `leaveGroupOperation` 
field here. That way, `StreamsGroup` can follow the normal leaving path as 
well, and `StreamsGroupHeartbeatRequestManager can` decide whether to send or 
skip the Heartbeat request.
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to