chickenchickenlove commented on code in PR #21579:
URL: https://github.com/apache/kafka/pull/21579#discussion_r3179925158
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java:
##########
@@ -899,23 +900,43 @@ private static Map<String, SortedSet<Integer>>
toTasksAssignment(final List<Stre
}
/**
- * Leaves the group when the member closes.
+ * Closes the member's participation in the group, honoring the requested
{@link CloseOptions.GroupMembershipOperation}:
*
- * <p>
- * This method does the following:
- * <ol>
- * <li>Transitions member state to {@link
MemberState#PREPARE_LEAVING}.</li>
- * <li>Skips the invocation of the revocation callback or lost
callback.</li>
- * <li>Clears the current and target assignment, unsubscribes from all
topics and
- * transitions the member state to {@link MemberState#LEAVING}.</li>
- * </ol>
- * States {@link MemberState#PREPARE_LEAVING} and {@link
MemberState#LEAVING} cause the heartbeat request manager
- * to send a leave group heartbeat.
- * </p>
+ * <ul>
+ * <li>{@code REMAIN_IN_GROUP}: clears local task assignments,
unsubscribes, and completes
+ * immediately without sending a leave group heartbeat. The broker
will remove the member
+ * via session timeout.</li>
+ * <li>{@code DEFAULT} / {@code LEAVE_GROUP}: transitions to
+ * {@link MemberState#PREPARE_LEAVING} → {@link MemberState#LEAVING}
and sends the leave
+ * group heartbeat.</li>
+ * </ul>
*
- * @return future that will complete when the heartbeat to leave the group
has been sent out.
+ * @param membershipOperation the requested close behavior
+ * @return future that will complete when the close operation is done
*/
- public CompletableFuture<Void> leaveGroupOnClose() {
+ public CompletableFuture<Void> leaveGroupOnClose(final
CloseOptions.GroupMembershipOperation membershipOperation) {
+ if (CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP ==
membershipOperation) {
+ // Skip the leave group heartbeat. Clean up locally and complete
immediately.
+ // The broker will remove the member from the group via session
timeout.
+ log.info("Skipping leave group heartbeat for member {} with epoch
{} because REMAIN_IN_GROUP was specified.",
+ memberId, memberEpoch);
+ if (isNotInGroup()) {
+ subscriptionState.unsubscribe();
+ notifyAssignmentChange(Collections.emptySet());
+ return CompletableFuture.completedFuture(null);
+ }
+ clearTaskAndPartitionAssignment();
+ subscriptionState.unsubscribe();
+ // UNSUBSCRIBED is only reachable from PREPARE_LEAVING, LEAVING,
or FENCED.
+ // For any active state (JOINING/STABLE/RECONCILING/ACKNOWLEDGING)
we must pass
+ // through PREPARE_LEAVING first; if already in PREPARE_LEAVING or
LEAVING we
+ // can transition directly.
+ if (state != MemberState.PREPARE_LEAVING && state !=
MemberState.LEAVING) {
+ transitionTo(MemberState.PREPARE_LEAVING);
+ }
+ transitionTo(MemberState.UNSUBSCRIBED);
+ return CompletableFuture.completedFuture(null);
+ }
Review Comment:
Even in the `REMAIN_IN_GROUP` case, I think it would be better to follow the
same normal leaving path as the existing `private CompletableFuture<Void>
leaveGroup(final boolean isOnClose) { ... }`.
In the `ConsumerGroup` case, `leaveGroupOnClose()` stores the close
operation in `leaveGroupOperation`, then transitions through the normal leaving
path to the LEAVING state. At the heartbeat request generation stage,
`ConsumerHeartbeatRequestManager#shouldSendLeaveHeartbeatNow()` looks at this
operation and decides whether to send a leave heartbeat.
In particular, it is intentionally designed to skip the leave heartbeat only
for the dynamic member + `REMAIN_IN_GROUP` case.
For `StreamsGroup` as well, rather than handling `REMAIN_IN_GROUP` here with
an early return, I think it would be more aligned with the `ConsumerGroup` flow
to preserve the normal leaving path and let
`StreamsGroupHeartbeatRequestManager` decide whether to send or skip the
heartbeat based on the operation.
I think this structure would also be more natural when implementing
client-side support for Streams static membership later.
##########
streams/src/main/java/org/apache/kafka/streams/CloseOptions.java:
##########
@@ -25,22 +25,32 @@ public class CloseOptions {
* Enum to specify the group membership operation upon closing the Kafka
Streams application.
*
* <ul>
- * <li><b>{@code LEAVE_GROUP}</b>: means the consumer leave the
group.</li>
- * <li><b>{@code REMAIN_IN_GROUP}</b>: means the consumer will not leave
the group explicitly.
- * Note that this option is ignored when using the streams group
protocol
- * ({@code group.protocol=streams}); in that case, the consumer will
always leave the group.</li>
+ * <li><b>{@code DEFAULT}</b>: Applies the default behavior based on the
active protocol:
+ * <ul>
+ * <li>For the <b>classic protocol</b>: The consumer will remain in
the group.</li>
+ * <li>For the <b>streams protocol</b> ({@code
group.protocol=streams}): The consumer will leave
+ * the group (consistent with the protocol's design for dynamic
members).</li>
+ * </ul>
Review Comment:
Should we update this to clarify that CloseOptions.DEFAULT behaves
differently for static members and dynamic members?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java:
##########
@@ -899,23 +900,43 @@ private static Map<String, SortedSet<Integer>>
toTasksAssignment(final List<Stre
}
/**
- * Leaves the group when the member closes.
+ * Closes the member's participation in the group, honoring the requested
{@link CloseOptions.GroupMembershipOperation}:
*
- * <p>
- * This method does the following:
- * <ol>
- * <li>Transitions member state to {@link
MemberState#PREPARE_LEAVING}.</li>
- * <li>Skips the invocation of the revocation callback or lost
callback.</li>
- * <li>Clears the current and target assignment, unsubscribes from all
topics and
- * transitions the member state to {@link MemberState#LEAVING}.</li>
- * </ol>
- * States {@link MemberState#PREPARE_LEAVING} and {@link
MemberState#LEAVING} cause the heartbeat request manager
- * to send a leave group heartbeat.
- * </p>
+ * <ul>
+ * <li>{@code REMAIN_IN_GROUP}: clears local task assignments,
unsubscribes, and completes
+ * immediately without sending a leave group heartbeat. The broker
will remove the member
+ * via session timeout.</li>
+ * <li>{@code DEFAULT} / {@code LEAVE_GROUP}: transitions to
+ * {@link MemberState#PREPARE_LEAVING} → {@link MemberState#LEAVING}
and sends the leave
+ * group heartbeat.</li>
+ * </ul>
*
- * @return future that will complete when the heartbeat to leave the group
has been sent out.
+ * @param membershipOperation the requested close behavior
+ * @return future that will complete when the close operation is done
*/
- public CompletableFuture<Void> leaveGroupOnClose() {
+ public CompletableFuture<Void> leaveGroupOnClose(final
CloseOptions.GroupMembershipOperation membershipOperation) {
+ if (CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP ==
membershipOperation) {
+ // Skip the leave group heartbeat. Clean up locally and complete
immediately.
+ // The broker will remove the member from the group via session
timeout.
+ log.info("Skipping leave group heartbeat for member {} with epoch
{} because REMAIN_IN_GROUP was specified.",
Review Comment:
How about logging this after `isNotIngroup()`?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java:
##########
@@ -899,23 +900,43 @@ private static Map<String, SortedSet<Integer>>
toTasksAssignment(final List<Stre
}
/**
- * Leaves the group when the member closes.
+ * Closes the member's participation in the group, honoring the requested
{@link CloseOptions.GroupMembershipOperation}:
*
- * <p>
- * This method does the following:
- * <ol>
- * <li>Transitions member state to {@link
MemberState#PREPARE_LEAVING}.</li>
- * <li>Skips the invocation of the revocation callback or lost
callback.</li>
- * <li>Clears the current and target assignment, unsubscribes from all
topics and
- * transitions the member state to {@link MemberState#LEAVING}.</li>
- * </ol>
- * States {@link MemberState#PREPARE_LEAVING} and {@link
MemberState#LEAVING} cause the heartbeat request manager
- * to send a leave group heartbeat.
- * </p>
+ * <ul>
+ * <li>{@code REMAIN_IN_GROUP}: clears local task assignments,
unsubscribes, and completes
+ * immediately without sending a leave group heartbeat. The broker
will remove the member
+ * via session timeout.</li>
+ * <li>{@code DEFAULT} / {@code LEAVE_GROUP}: transitions to
+ * {@link MemberState#PREPARE_LEAVING} → {@link MemberState#LEAVING}
and sends the leave
+ * group heartbeat.</li>
+ * </ul>
*
- * @return future that will complete when the heartbeat to leave the group
has been sent out.
+ * @param membershipOperation the requested close behavior
+ * @return future that will complete when the close operation is done
*/
- public CompletableFuture<Void> leaveGroupOnClose() {
+ public CompletableFuture<Void> leaveGroupOnClose(final
CloseOptions.GroupMembershipOperation membershipOperation) {
Review Comment:
`ConsumerGroup` stores the `leaveGroupOperation` here and follows the normal
leaving path. Then, `AbstractHeartbeatRequestManager#poll()` decides whether to
send or skip the Heartbeat request.
Similarly, I think `StreamsGroup` should also add a `leaveGroupOperation`
field here. That way, `StreamsGroup` can follow the normal leaving path as
well, and `StreamsGroupHeartbeatRequestManager can` decide whether to send or
skip the Heartbeat request.
What do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]