Hi team,
   
I'm leaning the source code recently,the method logic seems wants to collect 
all timeout request, but it acutually just remove the first expire request of 
each broker, is it a deliberate design?  will it lose some callbacks ?  Looking 
forward to your reply!

Source file:  kafka.common.InterBrokerSendThread.scala
def removeAllTimedOut(now: Long): Collection[ClientRequest] = {
  val expiredRequests = new ArrayList[ClientRequest]
  for (requests <- unsent.values.asScala) {
    val requestIterator = requests.iterator
    var foundExpiredRequest = false 
    while (requestIterator.hasNext && !foundExpiredRequest) {
      val request = requestIterator.next
      val elapsedMs = Math.max(0, now - request.createdTimeMs)
      if (elapsedMs > request.requestTimeoutMs) {
        expiredRequests.add(request)
        requestIterator.remove()
        foundExpiredRequest = true
      }
    }
  }
  expiredRequests
}



Best
tiegen

   

Reply via email to