ableegoldman commented on code in PR #17614:
URL: https://github.com/apache/kafka/pull/17614#discussion_r1972542173


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -275,6 +284,24 @@ public int memberEpoch() {
         return memberEpoch;
     }
 
+    /**
+     * @return the operation the consumer will perform on leaving the group.
+     *
+     * @see CloseOptions.GroupMembershipOperation
+     */
+    public CloseOptions.GroupMembershipOperation leaveGroupOperation() {
+        return leaveGroupOperation;
+    }
+
+    /**
+     * Sets the operation on consumer group membership that the consumer will 
perform when closing.
+     * The {@link AbstractMembershipManager#leaveGroupOperation} should remain 
{@code GroupMembershipOperation.DEFAULT}
+     * until the consumer is closed.
+     *
+     * @param operation the operation to be performed on close
+     */
+    public abstract void 
leaveGroupOperationOnClose(CloseOptions.GroupMembershipOperation operation);

Review Comment:
   nit: call this `#leaveGroupOperationOnClose`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java:
##########
@@ -469,8 +498,14 @@ public int joinGroupEpoch() {
      */
     @Override
     public int leaveGroupEpoch() {
+        if 
(CloseOptions.GroupMembershipOperation.LEAVE_GROUP.equals(leaveGroupOperation)) 
{
+            return ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        } else if 
(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP.equals(leaveGroupOperation))
 {
+            return 
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
+        }

Review Comment:
   why do we need to add this? IIUC the static member vs dynamic member leave 
group epoch should still only be based on whether the group instance id is set 
-- I assume if we do skip the leave group then we just wouldn't call this 
method to begin with (though I'm not super familiar with the "leave group 
epoch", just know it's part of the new rebalancing protocol)



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -1151,31 +1153,28 @@ protected void handlePollTimeoutExpiry() {
             "either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches " +
             "returned in poll() with max.poll.records.");
 
-        maybeLeaveGroup("consumer poll timeout has expired.");
+        maybeLeaveGroup(DEFAULT, "consumer poll timeout has expired.");
     }
 
     /**
-     * Sends LeaveGroupRequest and logs the {@code leaveReason}, unless this 
member is using static membership or is already
-     * not part of the group (ie does not have a valid member id, is in the 
UNJOINED state, or the coordinator is unknown).
+     * Sends LeaveGroupRequest and logs the {@code leaveReason}, unless this 
member is using static membership
+     * with the default consumer group membership operation, or is already not 
part of the group (i.e., does not have a
+     * valid member ID, is in the UNJOINED state, or the coordinator is 
unknown).
      *
+     * @param membershipOperation the operation on consumer group membership 
that the consumer will perform when closing
      * @param leaveReason the reason to leave the group for logging
      * @throws KafkaException if the rebalance callback throws exception
      */
-    public synchronized RequestFuture<Void> maybeLeaveGroup(String 
leaveReason) {
+    public synchronized RequestFuture<Void> 
maybeLeaveGroup(CloseOptions.GroupMembershipOperation membershipOperation, 
String leaveReason) {
         RequestFuture<Void> future = null;
 
-        // Starting from 2.3, only dynamic members will send LeaveGroupRequest 
to the broker,
-        // consumer with valid group.instance.id is viewed as static member 
that never sends LeaveGroup,
-        // and the membership expiration is only controlled by session timeout.
-        if (isDynamicMember() && !coordinatorUnknown() &&
-            state != MemberState.UNJOINED && generation.hasMemberId()) {
-            // this is a minimal effort attempt to leave the group. we do not
-            // attempt any resending if the request fails or times out.
+        if (rebalanceConfig.leaveGroupOnClose && 
shouldSendLeaveGroupRequest(membershipOperation)) {
             log.info("Member {} sending LeaveGroup request to coordinator {} 
due to {}",
                 generation.memberId, coordinator, leaveReason);
+
             LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(
                 rebalanceConfig.groupId,
-                Collections.singletonList(new 
MemberIdentity().setMemberId(generation.memberId).setReason(JoinGroupRequest.maybeTruncateReason(leaveReason)))
+                List.of(new 
MemberIdentity().setMemberId(generation.memberId).setReason(JoinGroupRequest.maybeTruncateReason(leaveReason)))

Review Comment:
   why this change?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -427,7 +428,7 @@ public void onGroupAssignmentUpdated(Set<TopicPartition> 
partitions) {
             // call close methods if internal objects are already constructed; 
this is to prevent resource leak. see KAFKA-2121
             // we do not need to call `close` at all when `log` is null, which 
means no internal objects were initialized.
             if (this.log != null) {
-                close(Duration.ZERO, true);
+                close(Duration.ZERO, 
CloseOptions.GroupMembershipOperation.DEFAULT, true);

Review Comment:
   I guess this now has the same effect since we won't send the LeaveGroup 
unless `!coordinatorUnknown() && state != MemberState.UNJOINED && 
generation.hasMemberId())`, but  I'm wondering if we should just go ahead and 
hardcode the `REMAIN_IN_GROUP` option here in case we ever decide to change the 
semantics of `DEFAULT`
   
   If we didn't even finish constructing the kafka consumer there's no way it 
would have joined the group



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1357,7 +1367,11 @@ private void close(Duration timeout, boolean 
swallowException) {
         swallow(log, Level.ERROR, "Failed invoking asynchronous commit 
callbacks while closing consumer",

Review Comment:
   github won't let me leave a comment on this line but look at the call 
directly above here -- wondering does it make more sense to integrate the 
`membershipOperation` into the ``#leaveGroupOnClose` method?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -275,6 +284,24 @@ public int memberEpoch() {
         return memberEpoch;
     }
 
+    /**
+     * @return the operation the consumer will perform on leaving the group.
+     *
+     * @see CloseOptions.GroupMembershipOperation
+     */
+    public CloseOptions.GroupMembershipOperation leaveGroupOperation() {
+        return leaveGroupOperation;
+    }
+
+    /**
+     * Sets the operation on consumer group membership that the consumer will 
perform when closing.
+     * The {@link AbstractMembershipManager#leaveGroupOperation} should remain 
{@code GroupMembershipOperation.DEFAULT}
+     * until the consumer is closed.
+     *
+     * @param operation the operation to be performed on close
+     */
+    public abstract void 
leaveGroupOperationOnClose(CloseOptions.GroupMembershipOperation operation);

Review Comment:
   also: does this need to be abstract? maybe I missed one but it looks like 
each implementation does the same thing



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java:
##########
@@ -198,7 +199,7 @@ public void requestRejoin() {
     }
 
     public void maybeLeaveGroup(String leaveReason) {
-        coordinator.maybeLeaveGroup(leaveReason);
+        
coordinator.maybeLeaveGroup(CloseOptions.GroupMembershipOperation.DEFAULT, 
leaveReason);

Review Comment:
   I'm very unfamiliar with Connect but just based on the comment above where 
this method is called, it seems like this should probably be `LEAVE_GROUP` ?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -301,12 +303,15 @@ public void close(final Duration timeout) {
      * the network thread the time to close down cleanly.
      *
      * @param timeout Upper bound of time to wait for the network thread to 
close its resources
+     * @param membershipOperation the operation on consumer group membership 
that the consumer will perform when closing
      */
-    private void closeInternal(final Duration timeout) {
+    private void closeInternal(final Duration timeout, 
CloseOptions.GroupMembershipOperation membershipOperation) {
         long timeoutMs = timeout.toMillis();
         log.trace("Signaling the consumer network thread to close in {}ms", 
timeoutMs);
         running = false;
         closeTimeout = timeout;
+        requestManagers.consumerMembershipManager.ifPresent(membershipManager 
->

Review Comment:
   not saying this is wrong, just not very familiar with this part of the code 
-- why are we adding a leave group here that wasn't there before?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java:
##########
@@ -265,7 +266,7 @@ public class ClassicKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             // call close methods if internal objects are already constructed; 
this is to prevent resource leak. see KAFKA-2121
             // we do not need to call `close` at all when `log` is null, which 
means no internal objects were initialized.
             if (this.log != null) {
-                close(Duration.ZERO, true);
+                close(Duration.ZERO, 
CloseOptions.GroupMembershipOperation.DEFAULT, true);

Review Comment:
   ditto here: hardcode to `REMAIN_IN_GROUP`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to