jeffkbkim commented on a change in pull request #9050: URL: https://github.com/apache/kafka/pull/9050#discussion_r461742057
########## File path: core/src/main/scala/kafka/controller/KafkaController.scala ########## @@ -1986,101 +1965,125 @@ private[controller] class ControllerStats extends KafkaMetricsGroup { sealed trait ControllerEvent { def state: ControllerState + def preempt(): Unit } case object ControllerChange extends ControllerEvent { - override def state = ControllerState.ControllerChange + override def state: ControllerState = ControllerState.ControllerChange + override def preempt(): Unit = {} Review comment: @mumrah one of the issues was that people are able to create controller events without implementing preemption. In most cases, this is fine because most events don't have callbacks. But for events that do have callbacks, someone can forget to implement preemption and this won't be caught during compile time. This is to force an implementation of preemption which can be empty when designing a new event. ########## File path: core/src/main/scala/kafka/controller/KafkaController.scala ########## @@ -2090,6 +2093,12 @@ case class ReplicaLeaderElection( callback: ElectLeadersCallback = _ => {} ) extends ControllerEvent { override def state: ControllerState = ControllerState.ManualLeaderBalance + + override def preempt(): Unit = callback( + partitionsFromAdminClientOpt.fold(Map.empty[TopicPartition, Either[ApiError, Int]]) { partitions => Review comment: @mumrah i believe the initial `Map.empty[TopicPartition, Either[ApiError, Int]]` was used to structure the return type (`ElectLeadersCallback: Map[TopicPartition, Either[ApiError, Int]] => Unit`) which cannot be achieved using `foreach`. ########## File path: core/src/main/scala/kafka/controller/ControllerEventManager.scala ########## @@ -77,7 +77,7 @@ class ControllerEventManager(controllerId: Int, private val putLock = new ReentrantLock() private val queue = new LinkedBlockingQueue[QueuedEvent] // Visible for test - private[controller] val thread = new ControllerEventThread(ControllerEventThreadName) + private[controller] var thread = new ControllerEventThread(ControllerEventThreadName) Review comment: @jsancio updated the PR. i wrapped the entire method in a lock because the return type `QueuedEvent` can be retrieved by `put(event)`. the suggested code would require another variable to be initialized, then set inside the lock, then returned at the end. I thought it would be best to prefer simplicity and readability over performance as this only occurs in the start and end of `ControllerEventManager`'s lifecycle. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org