hachikuji commented on a change in pull request #9749:
URL: https://github.com/apache/kafka/pull/9749#discussion_r556053749
##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -112,68 +111,74 @@ class DefaultAlterIsrManager(
val brokerEpochSupplier: () => Long
) extends AlterIsrManager with Logging with KafkaMetricsGroup {
- // Used to allow only one pending ISR update per partition
- private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new
ConcurrentHashMap[TopicPartition, AlterIsrItem]()
+ // Used to allow only one pending ISR update per partition (visible for
testing)
+ private[server] val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem]
= new ConcurrentHashMap[TopicPartition, AlterIsrItem]()
// Used to allow only one in-flight request at a time
private val inflightRequest: AtomicBoolean = new AtomicBoolean(false)
- private val lastIsrPropagationMs = new AtomicLong(0)
-
override def start(): Unit = {
controllerChannelManager.start()
- scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50,
TimeUnit.MILLISECONDS)
}
override def shutdown(): Unit = {
controllerChannelManager.shutdown()
}
override def submit(alterIsrItem: AlterIsrItem): Boolean = {
- unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) ==
null
+ val enqueued = unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition,
alterIsrItem) == null
+ maybePropagateIsrChanges()
+ enqueued
}
override def clearPending(topicPartition: TopicPartition): Unit = {
unsentIsrUpdates.remove(topicPartition)
}
- private def propagateIsrChanges(): Unit = {
+ private[server] def maybePropagateIsrChanges(): Unit = {
+ // Send all pending items if there is not already a request in-flight.
if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false,
true)) {
- // Copy current unsent ISRs but don't remove from the map
+ // Copy current unsent ISRs but don't remove from the map, they get
cleared in the response handler
val inflightAlterIsrItems = new ListBuffer[AlterIsrItem]()
unsentIsrUpdates.values().forEach(item =>
inflightAlterIsrItems.append(item))
-
- val now = time.milliseconds()
- lastIsrPropagationMs.set(now)
sendRequest(inflightAlterIsrItems.toSeq)
}
}
- private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
- val message = buildRequest(inflightAlterIsrItems)
-
- def clearInflightRequests(): Unit = {
- // Be sure to clear the in-flight flag to allow future AlterIsr requests
- if (!inflightRequest.compareAndSet(true, false)) {
- throw new IllegalStateException("AlterIsr response callback called
when no requests were in flight")
- }
+ private[server] def clearInFlightRequest(): Unit = {
+ if(!inflightRequest.compareAndSet(true, false)) {
Review comment:
nit: space after `if`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]