tombentley commented on a change in pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#discussion_r644571665



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -279,30 +279,33 @@ class KafkaApis(val requestChannel: RequestChannel,
         new 
StopReplicaResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code)))
     } else {
       val partitionStates = stopReplicaRequest.partitionStates().asScala
-      val (result, error) = replicaManager.stopReplicas(
-        request.context.correlationId,
-        stopReplicaRequest.controllerId,
-        stopReplicaRequest.controllerEpoch,
-        stopReplicaRequest.brokerEpoch,
-        partitionStates)
-      // Clear the coordinator caches in case we were the leader. In the case 
of a reassignment, we
-      // cannot rely on the LeaderAndIsr API for this since it is only sent to 
active replicas.
-      result.forKeyValue { (topicPartition, error) =>
-        if (error == Errors.NONE) {
-          if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
-              && partitionStates(topicPartition).deletePartition) {
-            groupCoordinator.onResignation(topicPartition.partition)
-          } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
-                     && partitionStates(topicPartition).deletePartition) {
+      def onStopReplicas(error: Errors, partitions: Map[TopicPartition, 
Errors]): Unit = {
+        // Clear the coordinator caches in case we were the leader. In the 
case of a reassignment, we
+        // cannot rely on the LeaderAndIsr API for this since it is only sent 
to active replicas.
+        partitions.forKeyValue { (topicPartition, partitionError) =>
+          if (partitionError == Errors.NONE) {
             val partitionState = partitionStates(topicPartition)
             val leaderEpoch = if (partitionState.leaderEpoch >= 0)
-                Some(partitionState.leaderEpoch)
+              Some(partitionState.leaderEpoch)
             else
               None
-            txnCoordinator.onResignation(topicPartition.partition, 
coordinatorEpoch = leaderEpoch)
+            if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
+              && partitionState.deletePartition) {
+              groupCoordinator.onResignation(topicPartition.partition, 
leaderEpoch)
+            } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
+              && partitionState.deletePartition) {
+              txnCoordinator.onResignation(topicPartition.partition, 
coordinatorEpoch = leaderEpoch)
+            }
           }
         }
       }
+      val (result, error) = replicaManager.stopReplicas(
+        request.context.correlationId,
+        stopReplicaRequest.controllerId,
+        stopReplicaRequest.controllerEpoch,
+        stopReplicaRequest.brokerEpoch,
+        partitionStates,
+        onStopReplicas)

Review comment:
       @hachikuji as @guozhangwang pointed out, the scheduler really _is_ FIFO 
(it uses a sequence number internally to guarantee order is maintained), so 
assuming his theory about the racing i/o threads is correct (and I think it is, 
but I've never observed this problem) then his solution of holding the lock for 
`handleStopReplicaRequest` would work. 
   
   The current version of the PR doesn't make assumptions about how any 
reordering can happen (i.e. whether caused by the inconsistent locking or 
anything else). So I don't think you're missing anything, you've just solved 
the problem differently.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to