junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r484638938



##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -221,36 +205,34 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
 
     // The cost of tryComplete() is typically proportional to the number of 
keys. Calling
     // tryComplete() for each key is going to be expensive if there are many 
keys. Instead,
-    // we do the check in the following way. Call tryComplete(). If the 
operation is not completed,
-    // we just add the operation to all keys. Then we call tryComplete() 
again. At this time, if
-    // the operation is still not completed, we are guaranteed that it won't 
miss any future triggering
-    // event since the operation is already on the watcher list for all keys. 
This does mean that
-    // if the operation is completed (by another thread) between the two 
tryComplete() calls, the
-    // operation is unnecessarily added for watch. However, this is a less 
severe issue since the
-    // expire reaper will clean it up periodically.
-
-    // At this point the only thread that can attempt this operation is this 
current thread
-    // Hence it is safe to tryComplete() without a lock
-    var isCompletedByMe = operation.tryComplete()
-    if (isCompletedByMe)
-      return true
-
-    var watchCreated = false
-    for(key <- watchKeys) {
-      // If the operation is already completed, stop adding it to the rest of 
the watcher list.
-      if (operation.isCompleted)
-        return false
-      watchForOperation(key, operation)
-
-      if (!watchCreated) {
-        watchCreated = true
-        estimatedTotalOperations.incrementAndGet()
-      }
-    }
-
-    isCompletedByMe = operation.maybeTryComplete()
-    if (isCompletedByMe)
-      return true
+    // we do the check in the following way through safeTryCompleteOrElse().

Review comment:
       I think we still want to keep the rest of the paragraph starting from 
"Call tryComplete().".

##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -221,36 +205,34 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
 
     // The cost of tryComplete() is typically proportional to the number of 
keys. Calling
     // tryComplete() for each key is going to be expensive if there are many 
keys. Instead,
-    // we do the check in the following way. Call tryComplete(). If the 
operation is not completed,
-    // we just add the operation to all keys. Then we call tryComplete() 
again. At this time, if
-    // the operation is still not completed, we are guaranteed that it won't 
miss any future triggering
-    // event since the operation is already on the watcher list for all keys. 
This does mean that
-    // if the operation is completed (by another thread) between the two 
tryComplete() calls, the
-    // operation is unnecessarily added for watch. However, this is a less 
severe issue since the
-    // expire reaper will clean it up periodically.
-
-    // At this point the only thread that can attempt this operation is this 
current thread
-    // Hence it is safe to tryComplete() without a lock
-    var isCompletedByMe = operation.tryComplete()
-    if (isCompletedByMe)
-      return true
-
-    var watchCreated = false
-    for(key <- watchKeys) {
-      // If the operation is already completed, stop adding it to the rest of 
the watcher list.
-      if (operation.isCompleted)
-        return false
-      watchForOperation(key, operation)
-
-      if (!watchCreated) {
-        watchCreated = true
-        estimatedTotalOperations.incrementAndGet()
-      }
-    }
-
-    isCompletedByMe = operation.maybeTryComplete()
-    if (isCompletedByMe)
-      return true
+    // we do the check in the following way through safeTryCompleteOrElse().
+    //
+    // ==============[story about lock]==============
+    // We hold the operation's lock while adding the operation to watch list 
and doing the tryComplete() check. This is
+    // to avoid a potential deadlock between the callers to 
tryCompleteElseWatch() and checkAndComplete(). For example,
+    // the following deadlock can happen if the lock is only held for the 
final tryComplete() check,
+    // 1) thread_a holds readlock of stateLock from TransactionStateManager
+    // 2) thread_a is executing tryCompleteElseWatch
+    // 3) thread_a adds op to watch list
+    // 4) thread_b requires writelock of stateLock from 
TransactionStateManager (blocked by thread_a)
+    // 5) thread_c calls checkAndComplete() and holds lock of op
+    // 6) thread_c is waiting readlock of stateLock to complete op (blocked by 
thread_b)
+    // 7) thread_a is waiting lock of op to call safeTryComplete (blocked by 
thread_c)
+    //
+    // Noted that current approach can't prevent all deadlock. For example,
+    // 1) thread_a gets lock of op
+    // 2) thread_a adds op to watch list
+    // 3) thread_a calls op#tryComplete (and it tries to require lock_b)
+    // 4) thread_b holds lock_b

Review comment:
       thread_b holds lock_b => thread_b holds lock_b and calls 
checkAndComplete()

##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -221,36 +205,34 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
 
     // The cost of tryComplete() is typically proportional to the number of 
keys. Calling
     // tryComplete() for each key is going to be expensive if there are many 
keys. Instead,
-    // we do the check in the following way. Call tryComplete(). If the 
operation is not completed,
-    // we just add the operation to all keys. Then we call tryComplete() 
again. At this time, if
-    // the operation is still not completed, we are guaranteed that it won't 
miss any future triggering
-    // event since the operation is already on the watcher list for all keys. 
This does mean that
-    // if the operation is completed (by another thread) between the two 
tryComplete() calls, the
-    // operation is unnecessarily added for watch. However, this is a less 
severe issue since the
-    // expire reaper will clean it up periodically.
-
-    // At this point the only thread that can attempt this operation is this 
current thread
-    // Hence it is safe to tryComplete() without a lock
-    var isCompletedByMe = operation.tryComplete()
-    if (isCompletedByMe)
-      return true
-
-    var watchCreated = false
-    for(key <- watchKeys) {
-      // If the operation is already completed, stop adding it to the rest of 
the watcher list.
-      if (operation.isCompleted)
-        return false
-      watchForOperation(key, operation)
-
-      if (!watchCreated) {
-        watchCreated = true
-        estimatedTotalOperations.incrementAndGet()
-      }
-    }
-
-    isCompletedByMe = operation.maybeTryComplete()
-    if (isCompletedByMe)
-      return true
+    // we do the check in the following way through safeTryCompleteOrElse().
+    //
+    // ==============[story about lock]==============
+    // We hold the operation's lock while adding the operation to watch list 
and doing the tryComplete() check. This is

Review comment:
       We hold => Through safeTryCompleteOrElse(), we hold

##########
File path: 
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
##########
@@ -112,6 +113,13 @@ class GroupCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurrencyTest
     }.toSet
   }
 
+  /**
+   * handleTxnCommitOffsets does not complete delayed requests now so it 
causes error if handleTxnCompletion is executed
+   * before completing delayed request. In random mode, we use this global 
lock to prevent such error.
+   */
+  private var isRandomOrder = false

Review comment:
       Instead of introducing a global var, could we add a new param when 
constructing CommitTxnOffsetsOperation and CompleteTxnOperation?

##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -221,36 +205,34 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
 
     // The cost of tryComplete() is typically proportional to the number of 
keys. Calling
     // tryComplete() for each key is going to be expensive if there are many 
keys. Instead,
-    // we do the check in the following way. Call tryComplete(). If the 
operation is not completed,
-    // we just add the operation to all keys. Then we call tryComplete() 
again. At this time, if
-    // the operation is still not completed, we are guaranteed that it won't 
miss any future triggering
-    // event since the operation is already on the watcher list for all keys. 
This does mean that
-    // if the operation is completed (by another thread) between the two 
tryComplete() calls, the
-    // operation is unnecessarily added for watch. However, this is a less 
severe issue since the
-    // expire reaper will clean it up periodically.
-
-    // At this point the only thread that can attempt this operation is this 
current thread
-    // Hence it is safe to tryComplete() without a lock
-    var isCompletedByMe = operation.tryComplete()
-    if (isCompletedByMe)
-      return true
-
-    var watchCreated = false
-    for(key <- watchKeys) {
-      // If the operation is already completed, stop adding it to the rest of 
the watcher list.
-      if (operation.isCompleted)
-        return false
-      watchForOperation(key, operation)
-
-      if (!watchCreated) {
-        watchCreated = true
-        estimatedTotalOperations.incrementAndGet()
-      }
-    }
-
-    isCompletedByMe = operation.maybeTryComplete()
-    if (isCompletedByMe)
-      return true
+    // we do the check in the following way through safeTryCompleteOrElse().
+    //
+    // ==============[story about lock]==============
+    // We hold the operation's lock while adding the operation to watch list 
and doing the tryComplete() check. This is
+    // to avoid a potential deadlock between the callers to 
tryCompleteElseWatch() and checkAndComplete(). For example,
+    // the following deadlock can happen if the lock is only held for the 
final tryComplete() check,
+    // 1) thread_a holds readlock of stateLock from TransactionStateManager
+    // 2) thread_a is executing tryCompleteElseWatch
+    // 3) thread_a adds op to watch list
+    // 4) thread_b requires writelock of stateLock from 
TransactionStateManager (blocked by thread_a)
+    // 5) thread_c calls checkAndComplete() and holds lock of op
+    // 6) thread_c is waiting readlock of stateLock to complete op (blocked by 
thread_b)
+    // 7) thread_a is waiting lock of op to call safeTryComplete (blocked by 
thread_c)
+    //
+    // Noted that current approach can't prevent all deadlock. For example,

Review comment:
       Noted that current approach can't prevent all deadlock. => Note that 
even with the current approach, deadlocks could still be introduced.

##########
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##########
@@ -221,36 +205,34 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
 
     // The cost of tryComplete() is typically proportional to the number of 
keys. Calling
     // tryComplete() for each key is going to be expensive if there are many 
keys. Instead,
-    // we do the check in the following way. Call tryComplete(). If the 
operation is not completed,
-    // we just add the operation to all keys. Then we call tryComplete() 
again. At this time, if
-    // the operation is still not completed, we are guaranteed that it won't 
miss any future triggering
-    // event since the operation is already on the watcher list for all keys. 
This does mean that
-    // if the operation is completed (by another thread) between the two 
tryComplete() calls, the
-    // operation is unnecessarily added for watch. However, this is a less 
severe issue since the
-    // expire reaper will clean it up periodically.
-
-    // At this point the only thread that can attempt this operation is this 
current thread
-    // Hence it is safe to tryComplete() without a lock
-    var isCompletedByMe = operation.tryComplete()
-    if (isCompletedByMe)
-      return true
-
-    var watchCreated = false
-    for(key <- watchKeys) {
-      // If the operation is already completed, stop adding it to the rest of 
the watcher list.
-      if (operation.isCompleted)
-        return false
-      watchForOperation(key, operation)
-
-      if (!watchCreated) {
-        watchCreated = true
-        estimatedTotalOperations.incrementAndGet()
-      }
-    }
-
-    isCompletedByMe = operation.maybeTryComplete()
-    if (isCompletedByMe)
-      return true
+    // we do the check in the following way through safeTryCompleteOrElse().
+    //
+    // ==============[story about lock]==============
+    // We hold the operation's lock while adding the operation to watch list 
and doing the tryComplete() check. This is
+    // to avoid a potential deadlock between the callers to 
tryCompleteElseWatch() and checkAndComplete(). For example,
+    // the following deadlock can happen if the lock is only held for the 
final tryComplete() check,
+    // 1) thread_a holds readlock of stateLock from TransactionStateManager
+    // 2) thread_a is executing tryCompleteElseWatch
+    // 3) thread_a adds op to watch list
+    // 4) thread_b requires writelock of stateLock from 
TransactionStateManager (blocked by thread_a)
+    // 5) thread_c calls checkAndComplete() and holds lock of op
+    // 6) thread_c is waiting readlock of stateLock to complete op (blocked by 
thread_b)
+    // 7) thread_a is waiting lock of op to call safeTryComplete (blocked by 
thread_c)
+    //
+    // Noted that current approach can't prevent all deadlock. For example,
+    // 1) thread_a gets lock of op

Review comment:
       thread_a gets lock of op => thread_a calls tryCompleteElseWatch() and 
gets lock of op




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to