Hi team, I'm leaning the source code recently,the method logic seems wants to collect all timeout request, but it acutually just remove the first expire request of each broker, is it a deliberate design? will it lose some callbacks ? Looking forward to your reply!
Source file: kafka.common.InterBrokerSendThread.scala def removeAllTimedOut(now: Long): Collection[ClientRequest] = { val expiredRequests = new ArrayList[ClientRequest] for (requests <- unsent.values.asScala) { val requestIterator = requests.iterator var foundExpiredRequest = false while (requestIterator.hasNext && !foundExpiredRequest) { val request = requestIterator.next val elapsedMs = Math.max(0, now - request.createdTimeMs) if (elapsedMs > request.requestTimeoutMs) { expiredRequests.add(request) requestIterator.remove() foundExpiredRequest = true } } } expiredRequests } Best tiegen