[ https://issues.apache.org/jira/browse/KAFKA-1150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13833474#comment-13833474 ]
Jun Rao commented on KAFKA-1150: -------------------------------- Right, the first problem (second fetch is needed) is due to that logic for handling whether a fetch request is satisfied is incorrect. As Joel pointed out, currently, we check to see if a fetch request is satisfied after messages are added to the leader's log. At that point, the new messages are not committed yet. The way we do the check is to pass in the bytes of the new messages and add that to current accumulated size. If the result exceeds minBytes, the fetch request is satisfied. So, the place that we trigger the check is inappropriate and the way that we do the check is incorrect. The second problem (second fetch is returned late) is due to that we don't check if any fetch request can be satisfied after new messages are committed. To fix the issues, we have to (1) trigger the check at the right places (2) do the check correctly. For (2), one simple approach is to always do a translation from last committed offset to file position to compute the bytes from the fetch offset. This will add the index lookup overhead for every check though. > Fetch on a replicated topic does not return as soon as possible > --------------------------------------------------------------- > > Key: KAFKA-1150 > URL: https://issues.apache.org/jira/browse/KAFKA-1150 > Project: Kafka > Issue Type: Bug > Components: core, replication > Affects Versions: 0.8 > Reporter: Andrey Balmin > Assignee: Neha Narkhede > > I see a huge performance difference between replicated and not replicated > topics. On my laptop, running two brokers, I see producer-2-consumer latency > of under 1ms for topics with one replica. > However, with two replicas the same latency equals to the max fetch delay. > Here is a simple test I just did: > one producer thread in a loop sending one message and sleeping for 2500ms, > and one consumer thread looping on the long poll with max fetch delay of 1000 > ms. > Here is what happens with no replication: > Produced 1 key: key1 at time: 15:33:52.822 > Consumed up to 1 at time: 15:33:52.822 > Consumed up to 1 at time: 15:33:53.823 > Consumed up to 1 at time: 15:33:54.825 > Produced 2 key: key2 at time: 15:33:55.324 > Consumed up to 2 at time: 15:33:55.324 > Consumed up to 2 at time: 15:33:56.326 > Consumed up to 2 at time: 15:33:57.328 > Produced 3 key: key3 at time: 15:33:57.827 > Consumed up to 3 at time: 15:33:57.827 > The are no delays between the message being produced and consumed -- this is > the behavior I expected. > Here is the same test, but for a topic with two replicas: > Consumed up to 0 at time: 15:50:29.575 > Produced 1 key: key1 at time: 15:50:29.575 > Consumed up to 1 at time: 15:50:30.577 > Consumed up to 1 at time: 15:50:31.579 > Consumed up to 1 at time: 15:50:32.078 > Produced 2 key: key2 at time: 15:50:32.078 > Consumed up to 2 at time: 15:50:33.081 > Consumed up to 2 at time: 15:50:34.081 > Consumed up to 2 at time: 15:50:34.581 > Produced 3 key: key3 at time: 15:50:34.581 > Consumed up to 3 at time: 15:50:35.584 > Notice how the fetch always returns as soon as the produce request is issued, > but without the new message, which consistently arrives ~1002 ms later. > Below is the request log snippet for this part: > Produced 2 key: key2 at time: 15:50:32.078 > Consumed up to 2 at time: 15:50:33.081 > You can see the first FetchRequest returns at the same time as the replica > FetchRequest, but this fetch response is *empty* -- the message is not > committed yet, so it cannot be returned. The message is committed at > 15:50:32,079. However, the next FetchRequest (that does return the message) > comes in at 15:50:32,078, but completes only at 15:50:33,081. Why is it > waiting for the full 1000 ms, instead of returning right away? > [2013-11-25 15:50:32,077] TRACE Processor 1 received request : Name: > ProducerRequest; Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: > 1; AckTimeoutMs: 20 ms; TopicAndPartition: [test_topic,0] -> 2078 > (kafka.network.RequestChannel$) > [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest; > Version: 0; CorrelationId: 7; ClientId: con; ReplicaId: -1; MaxWait: 1000 ms; > MinBytes: 1 bytes; RequestInfo: [test_topic,0] -> > PartitionFetchInfo(129,1024000) from client > /0:0:0:0:0:0:0:1%0:63264;totalTime:499,queueTime:0,localTime:0,remoteTime:499,sendTime:0 > (kafka.request.logger) > [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest; > Version: 0; CorrelationId: 3463; ClientId: ReplicaFetcherThread-0-0; > ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] > -> PartitionFetchInfo(129,1048576) from client > /127.0.0.1:63056;totalTime:499,queueTime:1,localTime:0,remoteTime:498,sendTime:0 > (kafka.request.logger) > [2013-11-25 15:50:32,078] TRACE Processor 1 received request : Name: > FetchRequest; Version: 0; CorrelationId: 8; ClientId: con; ReplicaId: -1; > MaxWait: 1000 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] -> > PartitionFetchInfo(129,1024000) (kafka.network.RequestChannel$) > [2013-11-25 15:50:32,078] TRACE Completed request:Name: ProducerRequest; > Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: 1; AckTimeoutMs: > 20 ms; TopicAndPartition: [test_topic,0] -> 2078 from client > /0:0:0:0:0:0:0:1%0:63266;totalTime:1,queueTime:0,localTime:1,remoteTime:0,sendTime:0 > (kafka.request.logger) > [2013-11-25 15:50:32,079] TRACE Processor 0 received request : Name: > FetchRequest; Version: 0; CorrelationId: 3464; ClientId: > ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; > RequestInfo: [test_topic,0] -> PartitionFetchInfo(130,1048576) > (kafka.network.RequestChannel$) > [2013-11-25 15:50:32,581] TRACE Completed request:Name: FetchRequest; > Version: 0; CorrelationId: 3464; ClientId: ReplicaFetcherThread-0-0; > ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] > -> PartitionFetchInfo(130,1048576) from client > /127.0.0.1:63056;totalTime:503,queueTime:1,localTime:0,remoteTime:502,sendTime:0 > (kafka.request.logger) > [2013-11-25 15:50:32,582] TRACE Processor 0 received request : Name: > FetchRequest; Version: 0; CorrelationId: 3465; ClientId: > ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; > RequestInfo: [test_topic,0] -> PartitionFetchInfo(130,1048576) > (kafka.network.RequestChannel$) > [2013-11-25 15:50:33,081] TRACE Completed request:Name: FetchRequest; > Version: 0; CorrelationId: 8; ClientId: con; ReplicaId: -1; MaxWait: 1000 ms; > MinBytes: 1 bytes; RequestInfo: [test_topic,0] -> > PartitionFetchInfo(129,1024000) from client > /0:0:0:0:0:0:0:1%0:63264;totalTime:1003,queueTime:0,localTime:1,remoteTime:1001,sendTime:1 > (kafka.request.logger) > ---------- > Environment note: I first noticed this behavior on three brokers running on > three Ubuntu EC2 instances. I then boiled it down to this simple test running > two brokers on a Mac laptop. -- This message was sent by Atlassian JIRA (v6.1#6144)