[ 
https://issues.apache.org/jira/browse/KAFKA-1905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin reassigned KAFKA-1905:
-----------------------------------

    Assignee: Jiangjie Qin

> KafkaProducer's performance could be halved when MaxInFlightRequest is set to 
> 1
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-1905
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1905
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Jiangjie Qin
>            Assignee: Jiangjie Qin
>
> In KafkaProducer, the following logic is used in each poll():
> 1. Get a list of nodes who has a batch available for sending
> 2. Filter the list to remove the node which is not ready to receive a new 
> request (MaxInFlightRequests is checked here) 
> 3. Compose the requests for the nodes in the filtered list, i.e. has a batch 
> to send and also ready to receive.
> 4. Increase InFlightRequests, send the requests and get the responses of 
> previous send.
> 5. handle all receives and decrease the inFlightRequests.
> In this case, when MaxInFlightRequest is set to 1, since we are checking the 
> InFlightRequests before each receive, even if we have already received the 
> response, the node will still be considered not ready. So for a sequence of 
> poll, we end up in the PollForSend - PollForReceive - PollForSend... pattern. 
> Which essentially halved the throughput in a fast network. Ideally we should 
> check whether node is ready after we check all the receives.
> Here are the some logs that shows this situation when I run 
> kafka-producer-perf-test locally.
> -----1st poll for send, no receive------
> [2015-01-28 13:54:06,009] INFO Nodes with data ready to send: [Node(0, 
> jqin-ld1.linkedin.biz, 9092)] 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,009] INFO Created 1 produce requests: 
> [ClientRequest(expectResponse=true, 
> payload={producer_perf_verification1-0=RecordBatch(topicPartition=producer_perf_verification1-0,
>  recordCount=15)}, 
> request=RequestSend(header={api_key=0,api_version=0,correlation_id=1074,client_id=producer-performance},
>  
> body={acks=-1,timeout=3000,topic_data=[{topic=producer_perf_verification1,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0
>  lim=15780 cap=16384]}]}]}))] 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,009] INFO responses #: 0 
> (org.apache.kafka.clients.producer.internals.Sender)
> ------ 2nd poll for receive, no send------
> [2015-01-28 13:54:06,009] INFO No ready nodes, timeout = 9223372036854775807 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,009] INFO responses #: 1 
> (org.apache.kafka.clients.producer.internals.Sender)
> ------ 3rd poll for send, no receive------
> [2015-01-28 13:54:06,010] INFO Nodes with data ready to send: [Node(0, 
> jqin-ld1.linkedin.biz, 9092)] 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,010] INFO Created 1 produce requests: 
> [ClientRequest(expectResponse=true, 
> payload={producer_perf_verification1-0=RecordBatch(topicPartition=producer_perf_verification1-0,
>  recordCount=15)}, 
> request=RequestSend(header={api_key=0,api_version=0,correlation_id=1075,client_id=producer-performance},
>  
> body={acks=-1,timeout=3000,topic_data=[{topic=producer_perf_verification1,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0
>  lim=15780 cap=16384]}]}]}))] 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,010] INFO responses #: 0 
> (org.apache.kafka.clients.producer.internals.Sender)
> ---- 4th poll for receive, no send----
> [2015-01-28 13:54:06,010] INFO No ready nodes, timeout = 9223372036854775807 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,010] INFO responses #: 1 
> (org.apache.kafka.clients.producer.internals.Sender)
> ---- 5th poll for send, no receive----
> [2015-01-28 13:54:06,011] INFO Nodes with data ready to send: [Node(0, 
> jqin-ld1.linkedin.biz, 9092)] 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,011] INFO Created 1 produce requests: 
> [ClientRequest(expectResponse=true, 
> payload={producer_perf_verification1-0=RecordBatch(topicPartition=producer_perf_verification1-0,
>  recordCount=15)}, 
> request=RequestSend(header={api_key=0,api_version=0,correlation_id=1076,client_id=producer-performance},
>  
> body={acks=-1,timeout=3000,topic_data=[{topic=producer_perf_verification1,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0
>  lim=15780 cap=16384]}]}]}))] 
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,011] INFO responses #: 0 
> (org.apache.kafka.clients.producer.internals.Sender)
> ---- 6th poll for receive, no send-----
> [2015-01-28 13:54:06,011] INFO No ready nodes, timeout = 9223372036854775807 
> (org.apache.kafka.clients.producer.internals.Sender)
> .........



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to