hachikuji commented on a change in pull request #9749:
URL: https://github.com/apache/kafka/pull/9749#discussion_r555294948
##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -116,63 +116,75 @@ class DefaultAlterIsrManager(
private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new
ConcurrentHashMap[TopicPartition, AlterIsrItem]()
// Used to allow only one in-flight request at a time
- private val inflightRequest: AtomicBoolean = new AtomicBoolean(false)
+ @volatile
+ private var inflightRequest: Boolean = false
- private val lastIsrPropagationMs = new AtomicLong(0)
+ // Protect updates of the inflight flag and prevent additional pending items
from being submitted while we are
+ // preparing a request
+ private val inflightLock: ReentrantLock = new ReentrantLock()
override def start(): Unit = {
controllerChannelManager.start()
- scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50,
TimeUnit.MILLISECONDS)
}
override def shutdown(): Unit = {
controllerChannelManager.shutdown()
}
override def submit(alterIsrItem: AlterIsrItem): Boolean = {
- unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) ==
null
+ inLock(inflightLock) {
+ if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition,
alterIsrItem) == null) {
+ maybePropagateIsrChanges()
+ true
+ } else {
+ false
+ }
+ }
}
override def clearPending(topicPartition: TopicPartition): Unit = {
unsentIsrUpdates.remove(topicPartition)
}
- private def propagateIsrChanges(): Unit = {
- if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false,
true)) {
- // Copy current unsent ISRs but don't remove from the map
+ private[server] def maybePropagateIsrChanges(): Unit = inLock(inflightLock) {
+ // Send all pending items if there is not already a request in-flight.
+ if (!inflightRequest && !unsentIsrUpdates.isEmpty) {
+ // Copy current unsent ISRs but don't remove from the map, they get
cleared in the response handler
val inflightAlterIsrItems = new ListBuffer[AlterIsrItem]()
unsentIsrUpdates.values().forEach(item =>
inflightAlterIsrItems.append(item))
-
- val now = time.milliseconds()
- lastIsrPropagationMs.set(now)
sendRequest(inflightAlterIsrItems.toSeq)
+ inflightRequest = true
}
}
- private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
- val message = buildRequest(inflightAlterIsrItems)
-
- def clearInflightRequests(): Unit = {
- // Be sure to clear the in-flight flag to allow future AlterIsr requests
- if (!inflightRequest.compareAndSet(true, false)) {
- throw new IllegalStateException("AlterIsr response callback called
when no requests were in flight")
- }
+ private[server] def clearInFlightRequest(): Unit = inLock(inflightLock) {
+ if (!inflightRequest) {
+ warn("Attempting to clear AlterIsr in-flight flag when no apparent
request is in-flight")
}
+ inflightRequest = false
+ }
+ private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
+ val message = buildRequest(inflightAlterIsrItems)
debug(s"Sending AlterIsr to controller $message")
// We will not timeout AlterISR request, instead letting it retry
indefinitely
// until a response is received, or a new LeaderAndIsr overwrites the
existing isrState
- // which causes the inflight requests to be ignored.
+ // which causes the response for those partitions to be ignored.
controllerChannelManager.sendRequest(new AlterIsrRequest.Builder(message),
new ControllerRequestCompletionHandler {
override def onComplete(response: ClientResponse): Unit = {
- try {
- debug(s"Received AlterIsr response $response")
- val body = response.responseBody().asInstanceOf[AlterIsrResponse]
- handleAlterIsrResponse(body, message.brokerEpoch,
inflightAlterIsrItems)
- } finally {
- clearInflightRequests()
+ debug(s"Received AlterIsr response $response")
+ val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+ handleAlterIsrResponse(body, message.brokerEpoch,
inflightAlterIsrItems) match {
+ case Errors.NONE =>
+ // In the normal case, check for pending updates to send
immediately
+ clearInFlightRequest()
Review comment:
nit: shall we pull this out of the match since it is done regardless? It
would also make the code a little more resilient since we got rid of the
`try/catch` to put it before `handleAlterIsrResponse`.
##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -116,63 +116,75 @@ class DefaultAlterIsrManager(
private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new
ConcurrentHashMap[TopicPartition, AlterIsrItem]()
// Used to allow only one in-flight request at a time
- private val inflightRequest: AtomicBoolean = new AtomicBoolean(false)
+ @volatile
+ private var inflightRequest: Boolean = false
- private val lastIsrPropagationMs = new AtomicLong(0)
+ // Protect updates of the inflight flag and prevent additional pending items
from being submitted while we are
+ // preparing a request
+ private val inflightLock: ReentrantLock = new ReentrantLock()
override def start(): Unit = {
controllerChannelManager.start()
- scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50,
TimeUnit.MILLISECONDS)
}
override def shutdown(): Unit = {
controllerChannelManager.shutdown()
}
override def submit(alterIsrItem: AlterIsrItem): Boolean = {
- unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) ==
null
+ inLock(inflightLock) {
+ if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition,
alterIsrItem) == null) {
+ maybePropagateIsrChanges()
+ true
+ } else {
+ false
+ }
+ }
}
override def clearPending(topicPartition: TopicPartition): Unit = {
unsentIsrUpdates.remove(topicPartition)
}
- private def propagateIsrChanges(): Unit = {
- if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false,
true)) {
- // Copy current unsent ISRs but don't remove from the map
+ private[server] def maybePropagateIsrChanges(): Unit = inLock(inflightLock) {
+ // Send all pending items if there is not already a request in-flight.
+ if (!inflightRequest && !unsentIsrUpdates.isEmpty) {
+ // Copy current unsent ISRs but don't remove from the map, they get
cleared in the response handler
val inflightAlterIsrItems = new ListBuffer[AlterIsrItem]()
unsentIsrUpdates.values().forEach(item =>
inflightAlterIsrItems.append(item))
-
- val now = time.milliseconds()
- lastIsrPropagationMs.set(now)
sendRequest(inflightAlterIsrItems.toSeq)
+ inflightRequest = true
}
}
- private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
- val message = buildRequest(inflightAlterIsrItems)
-
- def clearInflightRequests(): Unit = {
- // Be sure to clear the in-flight flag to allow future AlterIsr requests
- if (!inflightRequest.compareAndSet(true, false)) {
- throw new IllegalStateException("AlterIsr response callback called
when no requests were in flight")
- }
+ private[server] def clearInFlightRequest(): Unit = inLock(inflightLock) {
+ if (!inflightRequest) {
+ warn("Attempting to clear AlterIsr in-flight flag when no apparent
request is in-flight")
}
+ inflightRequest = false
+ }
+ private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
+ val message = buildRequest(inflightAlterIsrItems)
debug(s"Sending AlterIsr to controller $message")
// We will not timeout AlterISR request, instead letting it retry
indefinitely
// until a response is received, or a new LeaderAndIsr overwrites the
existing isrState
- // which causes the inflight requests to be ignored.
+ // which causes the response for those partitions to be ignored.
controllerChannelManager.sendRequest(new AlterIsrRequest.Builder(message),
new ControllerRequestCompletionHandler {
override def onComplete(response: ClientResponse): Unit = {
- try {
- debug(s"Received AlterIsr response $response")
- val body = response.responseBody().asInstanceOf[AlterIsrResponse]
- handleAlterIsrResponse(body, message.brokerEpoch,
inflightAlterIsrItems)
- } finally {
- clearInflightRequests()
+ debug(s"Received AlterIsr response $response")
+ val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+ handleAlterIsrResponse(body, message.brokerEpoch,
inflightAlterIsrItems) match {
Review comment:
Just doublechecking our locking order. When we call `submit` from
`Partition`, we first have the leader and ISR write lock and then we acquire
the inflight lock added here. Now when we call `handleAlterIsrResponse`, we may
need to reacquire the leader and ISR write lock, but that is ok, because do not
need to hold the inflight lock when we do so. I think it might be worth adding
some comments on the locking order somewhere in this class since the use of the
leader and ISR lock is kind of hidden.
##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -116,63 +116,75 @@ class DefaultAlterIsrManager(
private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new
ConcurrentHashMap[TopicPartition, AlterIsrItem]()
// Used to allow only one in-flight request at a time
- private val inflightRequest: AtomicBoolean = new AtomicBoolean(false)
+ @volatile
Review comment:
Do we still need this? It looks like all accesses are protected with the
lock.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]