hi, Json
I know the server logic:  the server use  selector mute/unmute ,  whenever the 
socket receive a request,  it will
mute until the response return, it become unmute.






------------------ ???????? ------------------
??????: "????????";<travi2...@foxmail.com>;
????????: 2016??9??27??(??????) ????11:57
??????: "dev"<dev@kafka.apache.org>; 

????: ?????? it this a bug? - message disorder in async send mode -- 0.9.0java 
client sdk InFlightRequests



hi, Jason
can you explain the "head of line request blocking" in more detail?   I am very 
curious, thanks! 


below is the code:


class RequestChannel(val numProcessors: Int, val queueSize: Int) extends 
KafkaMetricsGroup {
  private var responseListeners: List[(Int) => Unit] = Nil
  private val requestQueue = new 
ArrayBlockingQueue[RequestChannel.Request](queueSize)
  private val responseQueues = new 
Array[BlockingQueue[RequestChannel.Response]](numProcessors)
  for(i <- 0 until numProcessors)
    responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
the requestQueue is consumed by multiple threads,  so how it can guarantee the 
response order the same as the request order? 


------------------ ???????? ------------------
??????: "Jason Gustafson";<ja...@confluent.io>;
????????: 2016??9??27??(??????) ????2:11
??????: "dev"<dev@kafka.apache.org>; 

????: Re: it this a bug? - message disorder in async send mode -- 0.9.0java 
client sdk InFlightRequests



Hi there,

The Kafka server implements head of line request blocking, which means that
it will only handle one request a time from a given socket. That means that
the responses will always be returned in the same order as the requests
were sent.

-Jason

On Sat, Sep 24, 2016 at 1:19 AM, ???????? <travi2...@foxmail.com> wrote:

> We know that in the async send mode, kafka do not guarantee the message
> order even for  the same partition.
>
>
> That is, if we send 3 request  ( the same topic, the same partition)  to a
> kafka server in the async mode,
> the send order is 1, 2, 3 (correlation id is 1, 2, 3),  while the kafka
> server maybe save the 3 request in the log by the order 3, 2, 1,  and
> return to the client by the order 2, 3, 1??
>
>
> This happens because Kafka server processes requests with multi
> threads(multi KafkaRequestHandler).
>
>
> If the above is true,  below in the 0.9.0 java client idk maybe has
> problem:
>
>
> In the class NetworkClient,  there is a collection inFlightRequests to
> maintain all the in flight request:
>
>
> private final InFlightRequests inFlightRequests;
> final class InFlightRequests {
>
>     private final int maxInFlightRequestsPerConnection;
>     private final Map<String, Deque<ClientRequest>> requests = new
> HashMap<String, Deque<ClientRequest>>();    ...}
> It use a Deque to maintain the in flight requests whose response has not
> come back.
> Whenever we send a request, we will enqueue the request,  and when the
> response come back, we will dequeue the request.
> private void doSend(ClientRequest request, long now) {
>     request.setSendTimeMs(now);
>     this.inFlightRequests.add(request);
>     selector.send(request.request());
> }private void handleCompletedReceives(List<ClientResponse> responses,
> long now) {
>     for (NetworkReceive receive : this.selector.completedReceives()) {
>         String source = receive.source();
>         ClientRequest req = inFlightRequests.completeNext(source);
>         ResponseHeader header = ResponseHeader.parse(receive.payload());
>         // Always expect the response version id to be the same as the
> request version id
>         short apiKey = req.request().header().apiKey();
>         short apiVer = req.request().header().apiVersion();
>         Struct body = (Struct) ProtoUtils.responseSchema(apiKey,
> apiVer).read(receive.payload());
>         correlate(req.request().header(), header);
>         if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
>             responses.add(new ClientResponse(req, now, false, body));
>     }
> }
> but if the request order and the response order does not match,  is it the
> Deque suitable?  or it should be use a Map to maintain the request?
> By the way, in the above,  there is a function correlate(xxx) to check the
> match, if not match,  it will throw a exception.private void
> correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
>     if (requestHeader.correlationId() != responseHeader.correlationId())
>         throw new IllegalStateException("Correlation id for response (" +
> responseHeader.correlationId()
>                 + ") does not match request (" +
> requestHeader.correlationId() + ")");
> }
> But in the async mode,  as mentioned above,  the mismatch is normal, and
> likely happen.
> So here is it enough to process the problem by just throwing an exception ?

Reply via email to