[ https://issues.apache.org/jira/browse/KAFKA-1905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jiangjie Qin reassigned KAFKA-1905: ----------------------------------- Assignee: Jiangjie Qin > KafkaProducer's performance could be halved when MaxInFlightRequest is set to > 1 > ------------------------------------------------------------------------------- > > Key: KAFKA-1905 > URL: https://issues.apache.org/jira/browse/KAFKA-1905 > Project: Kafka > Issue Type: Bug > Reporter: Jiangjie Qin > Assignee: Jiangjie Qin > > In KafkaProducer, the following logic is used in each poll(): > 1. Get a list of nodes who has a batch available for sending > 2. Filter the list to remove the node which is not ready to receive a new > request (MaxInFlightRequests is checked here) > 3. Compose the requests for the nodes in the filtered list, i.e. has a batch > to send and also ready to receive. > 4. Increase InFlightRequests, send the requests and get the responses of > previous send. > 5. handle all receives and decrease the inFlightRequests. > In this case, when MaxInFlightRequest is set to 1, since we are checking the > InFlightRequests before each receive, even if we have already received the > response, the node will still be considered not ready. So for a sequence of > poll, we end up in the PollForSend - PollForReceive - PollForSend... pattern. > Which essentially halved the throughput in a fast network. Ideally we should > check whether node is ready after we check all the receives. > Here are the some logs that shows this situation when I run > kafka-producer-perf-test locally. > -----1st poll for send, no receive------ > [2015-01-28 13:54:06,009] INFO Nodes with data ready to send: [Node(0, > jqin-ld1.linkedin.biz, 9092)] > (org.apache.kafka.clients.producer.internals.Sender) > [2015-01-28 13:54:06,009] INFO Created 1 produce requests: > [ClientRequest(expectResponse=true, > payload={producer_perf_verification1-0=RecordBatch(topicPartition=producer_perf_verification1-0, > recordCount=15)}, > request=RequestSend(header={api_key=0,api_version=0,correlation_id=1074,client_id=producer-performance}, > > body={acks=-1,timeout=3000,topic_data=[{topic=producer_perf_verification1,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0 > lim=15780 cap=16384]}]}]}))] > (org.apache.kafka.clients.producer.internals.Sender) > [2015-01-28 13:54:06,009] INFO responses #: 0 > (org.apache.kafka.clients.producer.internals.Sender) > ------ 2nd poll for receive, no send------ > [2015-01-28 13:54:06,009] INFO No ready nodes, timeout = 9223372036854775807 > (org.apache.kafka.clients.producer.internals.Sender) > [2015-01-28 13:54:06,009] INFO responses #: 1 > (org.apache.kafka.clients.producer.internals.Sender) > ------ 3rd poll for send, no receive------ > [2015-01-28 13:54:06,010] INFO Nodes with data ready to send: [Node(0, > jqin-ld1.linkedin.biz, 9092)] > (org.apache.kafka.clients.producer.internals.Sender) > [2015-01-28 13:54:06,010] INFO Created 1 produce requests: > [ClientRequest(expectResponse=true, > payload={producer_perf_verification1-0=RecordBatch(topicPartition=producer_perf_verification1-0, > recordCount=15)}, > request=RequestSend(header={api_key=0,api_version=0,correlation_id=1075,client_id=producer-performance}, > > body={acks=-1,timeout=3000,topic_data=[{topic=producer_perf_verification1,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0 > lim=15780 cap=16384]}]}]}))] > (org.apache.kafka.clients.producer.internals.Sender) > [2015-01-28 13:54:06,010] INFO responses #: 0 > (org.apache.kafka.clients.producer.internals.Sender) > ---- 4th poll for receive, no send---- > [2015-01-28 13:54:06,010] INFO No ready nodes, timeout = 9223372036854775807 > (org.apache.kafka.clients.producer.internals.Sender) > [2015-01-28 13:54:06,010] INFO responses #: 1 > (org.apache.kafka.clients.producer.internals.Sender) > ---- 5th poll for send, no receive---- > [2015-01-28 13:54:06,011] INFO Nodes with data ready to send: [Node(0, > jqin-ld1.linkedin.biz, 9092)] > (org.apache.kafka.clients.producer.internals.Sender) > [2015-01-28 13:54:06,011] INFO Created 1 produce requests: > [ClientRequest(expectResponse=true, > payload={producer_perf_verification1-0=RecordBatch(topicPartition=producer_perf_verification1-0, > recordCount=15)}, > request=RequestSend(header={api_key=0,api_version=0,correlation_id=1076,client_id=producer-performance}, > > body={acks=-1,timeout=3000,topic_data=[{topic=producer_perf_verification1,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0 > lim=15780 cap=16384]}]}]}))] > (org.apache.kafka.clients.producer.internals.Sender) > [2015-01-28 13:54:06,011] INFO responses #: 0 > (org.apache.kafka.clients.producer.internals.Sender) > ---- 6th poll for receive, no send----- > [2015-01-28 13:54:06,011] INFO No ready nodes, timeout = 9223372036854775807 > (org.apache.kafka.clients.producer.internals.Sender) > ......... -- This message was sent by Atlassian JIRA (v6.3.4#6332)