junrao commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r483276075
########## File path: core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala ########## @@ -201,8 +201,8 @@ object AbstractCoordinatorConcurrencyTest { } } val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq - watchKeys ++= producerRequestKeys producePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) + watchKeys ++= producerRequestKeys Review comment: @chia7712 : 1. I think we still need `operation.safeTryComplete` in `DelayedOperation.tryCompleteElseWatch()`. The reason is that after the `operation.tryComplete()` call, but before we add the key to watch, the operation could have been completed by another thread. Since that thread doesn't see the registered key, it won't complete the request. If we don't call `operation.safeTryComplete` after adding the key for watch, we could have missed the only chance for completing this operation. 2. I am not sure if there is a deadlock caused by TransactionStateManager. I don't see updateCacheCallback hold any lock on stateLock. The following locking sequence is possible through TransactionStateManager. thread 1 : hold readLock of stateLock, call ReplicaManager.appendRecords, call tryCompleteElseWatch, hold lock on delayedOperation thread 2: hold lock on delayedOperation, call delayedOperation.onComplete, call removeFromCacheCallback(), hold readLock of stateLock. However, since both threads hold readLock of stateLock, there shouldn't be a conflict. Do you see the test fail due to a deadlock? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org