junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479661386



##########
File path: core/src/main/scala/kafka/server/ActionQueue.scala
##########
@@ -0,0 +1,48 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.util.concurrent.LinkedBlockingDeque
+
+/**
+ * This queue is used to collect actions which need to be executed later. One 
use case is that ReplicaManager#appendRecords
+ * produces record changes so we need to check and complete delayed requests. 
In order to avoid conflicting locking,
+ * we add those actions to this queue and then complete them at the end of 
KafkaApis.handle() or DelayedJoin.onExpiration.
+ */
+class ActionQueue {
+  private val queue = new LinkedBlockingDeque[() => Unit]()
+
+  /**
+   * add action to this queue.
+   * @param action action
+   */
+  def add(action: () => Unit): Unit = queue.put(action)
+
+  /**
+   * picks up a action to complete.

Review comment:
       a action => an action

##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -100,41 +99,22 @@ abstract class DelayedOperation(override val delayMs: Long,
   def tryComplete(): Boolean
 
   /**
-   * Thread-safe variant of tryComplete() that attempts completion only if the 
lock can be acquired
-   * without blocking.
    *
-   * If threadA acquires the lock and performs the check for completion before 
completion criteria is met
-   * and threadB satisfies the completion criteria, but fails to acquire the 
lock because threadA has not
-   * yet released the lock, we need to ensure that completion is attempted 
again without blocking threadA
-   * or threadB. `tryCompletePending` is set by threadB when it fails to 
acquire the lock and at least one
-   * of threadA or threadB will attempt completion of the operation if this 
flag is set. This ensures that
-   * every invocation of `maybeTryComplete` is followed by at least one 
invocation of `tryComplete` until
-   * the operation is actually completed.
+   * There is a long story about using "lock" or "tryLock".
+   *
+   * 1) using lock - There was a lot of cases that a thread holds a group lock 
and then it tries to hold more group
+   * locks to complete delayed requests. Unfortunately, the scenario causes 
deadlock and so we had introduced the
+   * "tryLock" to avoid deadlock.
+   *
+   * 2) using tryLock -  However, the "tryLock" causes another issue that the 
delayed requests may be into
+   * oblivion if the thread, which should complete the delayed requests, fails 
to get the lock.
+   *
+   * Now, we go back to use "lock" and make sure the thread which tries to 
complete delayed requests does NOT hold lock.
+   * The approach is that ReplicaManager collects all actions, which are used 
to complete delayed requests, in a queue.
+   * KafkaApis.handle() and the expiration thread for certain delayed 
operations (e.g. DelayedJoin) pick up and then
+   * execute an action when no lock is held.

Review comment:
       Perhaps we could add a note at the top of DelayedOperation so that 
people are aware of the need to complete actions for new DelayedOperations in 
the future.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -585,6 +591,23 @@ class ReplicaManager(val config: KafkaConfig,
                     result.info.logStartOffset, 
result.info.recordErrors.asJava, result.info.errorMessage)) // response status
       }
 
+      actionQueue.add {
+        () =>
+          localProduceResults.foreach {
+            case (topicPartition, result) =>
+              val requestKey = TopicPartitionOperationKey(topicPartition)
+              if (result.info.leaderHWIncremented) {

Review comment:
       It seems that we have to distinguish 3 states here: (1) records not 
appended due to an error; (2) records appended successfully and HWM advanced; 
(3) records appended successfully and HWM not advanced. In case (1), no 
purgatory needs to be checked.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -180,6 +181,11 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.DESCRIBE_CLIENT_QUOTAS => 
handleDescribeClientQuotasRequest(request)
         case ApiKeys.ALTER_CLIENT_QUOTAS => 
handleAlterClientQuotasRequest(request)
       }
+
+      // try to complete delayed action. In order to avoid conflicting 
locking, the actions to complete delayed requests
+      // are kept in a queue. We add the logic to check the ReplicaManager 
queue at the end of KafkaApis.handle() and the
+      // expiration thread for certain delayed operations (e.g. DelayedJoin)
+      actionQueue.tryCompleteAction()

Review comment:
       Even if we hit an exception in handleXXX(), it would still be useful to 
complete the actionQueue.

##########
File path: core/src/main/scala/kafka/server/ActionQueue.scala
##########
@@ -0,0 +1,48 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.util.concurrent.LinkedBlockingDeque
+
+/**
+ * This queue is used to collect actions which need to be executed later. One 
use case is that ReplicaManager#appendRecords
+ * produces record changes so we need to check and complete delayed requests. 
In order to avoid conflicting locking,
+ * we add those actions to this queue and then complete them at the end of 
KafkaApis.handle() or DelayedJoin.onExpiration.
+ */
+class ActionQueue {
+  private val queue = new LinkedBlockingDeque[() => Unit]()
+
+  /**
+   * add action to this queue.
+   * @param action action
+   */
+  def add(action: () => Unit): Unit = queue.put(action)
+
+  /**
+   * picks up a action to complete.
+   */
+  def tryCompleteAction(): Unit = {
+    val action = queue.poll()
+    if (action != null) action()
+  }
+
+  /**
+   * @return number of actions kept by this queue
+   */
+  def size: Int = queue.size()

Review comment:
       Is this used?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to