hi, Json I know the server logic: the server use selector mute/unmute , whenever the socket receive a request, it will mute until the response return, it become unmute.
------------------ ???????? ------------------ ??????: "????????";<travi2...@foxmail.com>; ????????: 2016??9??27??(??????) ????11:57 ??????: "dev"<dev@kafka.apache.org>; ????: ?????? it this a bug? - message disorder in async send mode -- 0.9.0java client sdk InFlightRequests hi, Jason can you explain the "head of line request blocking" in more detail? I am very curious, thanks! below is the code: class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup { private var responseListeners: List[(Int) => Unit] = Nil private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize) private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors) for(i <- 0 until numProcessors) responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]() the requestQueue is consumed by multiple threads, so how it can guarantee the response order the same as the request order? ------------------ ???????? ------------------ ??????: "Jason Gustafson";<ja...@confluent.io>; ????????: 2016??9??27??(??????) ????2:11 ??????: "dev"<dev@kafka.apache.org>; ????: Re: it this a bug? - message disorder in async send mode -- 0.9.0java client sdk InFlightRequests Hi there, The Kafka server implements head of line request blocking, which means that it will only handle one request a time from a given socket. That means that the responses will always be returned in the same order as the requests were sent. -Jason On Sat, Sep 24, 2016 at 1:19 AM, ???????? <travi2...@foxmail.com> wrote: > We know that in the async send mode, kafka do not guarantee the message > order even for the same partition. > > > That is, if we send 3 request ( the same topic, the same partition) to a > kafka server in the async mode, > the send order is 1, 2, 3 (correlation id is 1, 2, 3), while the kafka > server maybe save the 3 request in the log by the order 3, 2, 1, and > return to the client by the order 2, 3, 1?? > > > This happens because Kafka server processes requests with multi > threads(multi KafkaRequestHandler). > > > If the above is true, below in the 0.9.0 java client idk maybe has > problem: > > > In the class NetworkClient, there is a collection inFlightRequests to > maintain all the in flight request: > > > private final InFlightRequests inFlightRequests; > final class InFlightRequests { > > private final int maxInFlightRequestsPerConnection; > private final Map<String, Deque<ClientRequest>> requests = new > HashMap<String, Deque<ClientRequest>>(); ...} > It use a Deque to maintain the in flight requests whose response has not > come back. > Whenever we send a request, we will enqueue the request, and when the > response come back, we will dequeue the request. > private void doSend(ClientRequest request, long now) { > request.setSendTimeMs(now); > this.inFlightRequests.add(request); > selector.send(request.request()); > }private void handleCompletedReceives(List<ClientResponse> responses, > long now) { > for (NetworkReceive receive : this.selector.completedReceives()) { > String source = receive.source(); > ClientRequest req = inFlightRequests.completeNext(source); > ResponseHeader header = ResponseHeader.parse(receive.payload()); > // Always expect the response version id to be the same as the > request version id > short apiKey = req.request().header().apiKey(); > short apiVer = req.request().header().apiVersion(); > Struct body = (Struct) ProtoUtils.responseSchema(apiKey, > apiVer).read(receive.payload()); > correlate(req.request().header(), header); > if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body)) > responses.add(new ClientResponse(req, now, false, body)); > } > } > but if the request order and the response order does not match, is it the > Deque suitable? or it should be use a Map to maintain the request? > By the way, in the above, there is a function correlate(xxx) to check the > match, if not match, it will throw a exception.private void > correlate(RequestHeader requestHeader, ResponseHeader responseHeader) { > if (requestHeader.correlationId() != responseHeader.correlationId()) > throw new IllegalStateException("Correlation id for response (" + > responseHeader.correlationId() > + ") does not match request (" + > requestHeader.correlationId() + ")"); > } > But in the async mode, as mentioned above, the mismatch is normal, and > likely happen. > So here is it enough to process the problem by just throwing an exception ?