junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r464697840



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1240,26 +1298,30 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   def onExpireHeartbeat(group: GroupMetadata, memberId: String, isPending: 
Boolean): Unit = {
+    val groupsToComplete = scala.collection.mutable.Set[GroupKey]()
     group.inLock {
       if (group.is(Dead)) {
         info(s"Received notification of heartbeat expiration for member 
$memberId after group ${group.groupId} had already been unloaded or deleted.")
       } else if (isPending) {
         info(s"Pending member $memberId in group ${group.groupId} has been 
removed after session timeout expiration.")
-        removePendingMemberAndUpdateGroup(group, memberId)
+        groupsToComplete ++= removePendingMemberAndUpdateGroup(group, memberId)
       } else if (!group.has(memberId)) {
         debug(s"Member $memberId has already been removed from the group.")
       } else {
         val member = group.get(memberId)
         if (!member.hasSatisfiedHeartbeat) {
           info(s"Member ${member.memberId} in group ${group.groupId} has 
failed, removing it from the group")
-          removeMemberAndUpdateGroup(group, member, s"removing member 
${member.memberId} on heartbeat expiration")
+          groupsToComplete ++= removeMemberAndUpdateGroup(group, member, 
s"removing member ${member.memberId} on heartbeat expiration")
         }
       }
     }
+    groupsToComplete.foreach(joinPurgatory.checkAndComplete)

Review comment:
       This could be `completeDelayedJoinRequests(groupsToComplete)` ?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -554,19 +596,27 @@ class ReplicaManager(val config: KafkaConfig,
    * Append messages to leader replicas of the partition, and wait for them to 
be replicated to other replicas;
    * the callback function will be triggered either when timeout or the 
required acks are satisfied;
    * if the callback function itself is already synchronized on some object 
then pass this object to avoid deadlock.
+   *
+   * @param completeDelayedRequests true: the delayed requests may be 
completed inside the call with the expectation
+   *                                that no conflicting locks are held by the 
caller. Otherwise, the caller is expected
+   *                                to complete delayed requests for those 
returned partitions if completeDelayedRequests
+   *                                is false

Review comment:
       Could we change the explanation to sth like the following?
   
   This method may trigger the completeness check for delayed requests in a few 
purgatories. Occasionally, for serialization in the log, a caller may need to 
hold a lock while calling this method. To avoid deadlock, if the caller holds a 
conflicting lock while calling this method, the caller is expected to set 
completeDelayedRequests to false to avoid checking the delayed operations 
during this call. The caller will then explicitly complete those delayed 
operations based on the return value, without holding the conflicting lock. 

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -334,6 +345,37 @@ class ReplicaManager(val config: KafkaConfig,
       brokerTopicStats.removeMetrics(topic)
   }
 
+  /**
+   * try to complete delayed requests in following purgatories.
+   * 1) delayedFetchPurgatory
+   * 2) delayedProducePurgatory
+   * 3) delayedDeleteRecordsPurgatory
+   *
+   * Noted that caller should NOT hold any group lock in order to avoid 
deadlock.
+   *
+   * this method is visible for testing.
+   */
+  def completeDelayedRequests(topicPartition: TopicPartition): Unit = {
+    val topicPartitionOperationKey = TopicPartitionOperationKey(topicPartition)
+    delayedFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
+    delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey)
+    delayedDeleteRecordsPurgatory.checkAndComplete(topicPartitionOperationKey)
+  }
+
+  /**
+   * try to complete delayed requests in delayedFetchPurgatory.
+   *
+   * Noted that caller should NOT hold any group lock in order to avoid 
deadlock.

Review comment:
       group lock => conflicting lock

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -334,6 +345,37 @@ class ReplicaManager(val config: KafkaConfig,
       brokerTopicStats.removeMetrics(topic)
   }
 
+  /**
+   * try to complete delayed requests in following purgatories.
+   * 1) delayedFetchPurgatory
+   * 2) delayedProducePurgatory
+   * 3) delayedDeleteRecordsPurgatory
+   *
+   * Noted that caller should NOT hold any group lock in order to avoid 
deadlock.

Review comment:
       group lock => conflicting lock

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -369,6 +370,32 @@ class GroupCoordinator(val brokerId: Int,
     }
   }
 
+  /**
+   * try to complete produce, fetch and delete requests if the HW of partition 
is incremented. Otherwise, we try to complete
+   * only delayed fetch requests.
+   *
+   * Noted that this method may hold a lot of group lock so the caller should 
NOT hold any group lock

Review comment:
       a lot of group lock => multiple group locks

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -369,6 +370,32 @@ class GroupCoordinator(val brokerId: Int,
     }
   }
 
+  /**
+   * try to complete produce, fetch and delete requests if the HW of partition 
is incremented. Otherwise, we try to complete
+   * only delayed fetch requests.
+   *
+   * Noted that this method may hold a lot of group lock so the caller should 
NOT hold any group lock
+   * in order to avoid deadlock
+   * @param topicPartitions a map contains the partition and a flag indicting 
whether the HWM has been changed
+   */
+  private[group] def completeDelayedRequests(topicPartitions: 
Map[TopicPartition, LeaderHwChange]): Unit =
+    topicPartitions.foreach {
+      case (tp, leaderHWIncremented) => leaderHWIncremented match {
+        case LeaderHwIncremented => 
groupManager.replicaManager.completeDelayedRequests(tp)
+        case _ => groupManager.replicaManager.completeDelayedFetchRequests(tp)
+      }
+    }
+
+  /**
+   * complete the delayed join requests associated to input group keys.
+   *
+   * Noted that this method may hold a lot of group lock so the caller should 
NOT hold any group lock

Review comment:
       "this method may hold a lot of group lock" : This is actually not true. 
Unlike producer/fetch purgatory, which is keyed on partition, joinPurgatory is 
keyed on the group. So, when we complete a key, only a single group's lock will 
be held.
   
   The reason that we don't want the caller to hold a group lock is that 
DelayedJoin itself uses a lock other than the group lock for 
DelayedOperation.maybeTryComplete() and we want to avoid the deadlock between 
that lock and the group lock.

##########
File path: core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
##########
@@ -33,11 +34,40 @@ import scala.math.{max, min}
  */
 private[group] class DelayedJoin(coordinator: GroupCoordinator,
                                  group: GroupMetadata,
-                                 rebalanceTimeout: Long) extends 
DelayedOperation(rebalanceTimeout, Some(group.lock)) {
+                                 rebalanceTimeout: Long) extends 
DelayedOperation(rebalanceTimeout) {
 
-  override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, 
forceComplete _)
-  override def onExpiration() = coordinator.onExpireJoin()
-  override def onComplete() = coordinator.onCompleteJoin(group)
+  /**
+   * The delayed requests should be completed without holding group lock so we 
keep those partitions and then
+   * complete them after releasing lock.
+   */
+  private[group] var partitionsToComplete: 
scala.collection.Map[TopicPartition, LeaderHwChange] = Map.empty
+
+  /**
+   * It controls the lock manually since GroupCoordinator#onCompleteJoin() 
invoked by onComplete() can't be within a
+   * group lock since the completion of the delayed request for partitions 
returned from GroupCoordinator#onCompleteJoin()
+   * need to be done outside of the group lock.
+   */
+  override def tryComplete(): Boolean = try group.inLock {
+    /**
+     * holds the group lock for both the "group.hasAllMembersJoined" check and 
the call to forceComplete()
+     */

Review comment:
       Perhaps add "but completes the delayed requests without holding the 
group lock".

##########
File path: 
core/src/test/scala/unit/kafka/coordinator/group/DelayedJoinTest.scala
##########
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
+
+import kafka.utils.MockTime
+import org.easymock.EasyMock
+import org.junit.{Assert, Test}
+
+class DelayedJoinTest {
+
+  @Test
+  def testCompleteDelayedRequests(): Unit = {

Review comment:
       Perhaps we could add a comment on what this method is intended to test?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to