[ 
https://issues.apache.org/jira/browse/KAFKA-13559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

frankshi updated KAFKA-13559:
-----------------------------
    Description: 
Hi team:

We have found the value in the source code 
[here|https://github.com/apache/kafka/blob/2.7/core/src/main/scala/kafka/network/SocketServer.scala#L922]
 may lead broker’s  ProduceResponse to be delayed for 300ms.
 * Server-version: 2.13-2.7.0.
 * Client-version: confluent-kafka-python-1.5.0.

we have set the client’s  configuration as following:
{code:java}
ling.ms = 0
acks = 1
delivery.timeout.ms = 100
request.timeout.ms =  80
Sasl.mechanism =  “PLAIN”
Security.protocol  =  “SASL_SSL”
......
{code}
Because we set ACKs = 1, the client sends ProduceRequests and receives 
ProduceResponses from brokers. The leader broker doesn't need to wait for the 
ISR’s writing data to disk successfully.  It can reply to the client by sending 
ProduceResponses directly. In our situation, the ping value between the client 
and the kafka brokers is about ~10ms, and most of the time, the responses are 
received about 10ms after the Produce requests are sent. But sometimes the 
responses are received about ~300ms later.

The following shows the log from the client.
{code:java}
2021-11-26 02:31:30,567  Sent partial ProduceRequest (v7, 0+16527/37366 bytes, 
CorrId 2753)
2021-11-26 02:31:30,568  Sent partial ProduceRequest (v7, 16527+16384/37366 
bytes, CorrId 2753)
2021-11-26 02:31:30,568  Sent ProduceRequest (v7, 37366 bytes @ 32911, CorrId 
2753)
2021-11-26 02:31:30,570  Sent ProduceRequest (v7, 4714 bytes @ 0, CorrId 2754)
2021-11-26 02:31:30,571  Sent ProduceRequest (v7, 1161 bytes @ 0, CorrId 2755)
2021-11-26 02:31:30,572  Sent ProduceRequest (v7, 1240 bytes @ 0, CorrId 2756)
2021-11-26 02:31:30,572  Received ProduceResponse (v7, 69 bytes, CorrId 2751, 
rtt 9.79ms)
2021-11-26 02:31:30,572  Received ProduceResponse (v7, 69 bytes, CorrId 2752, 
rtt 10.34ms)
2021-11-26 02:31:30,573  Received ProduceResponse (v7, 69 bytes, CorrId 2753, 
rtt 10.11ms)
2021-11-26 02:31:30,872  Received ProduceResponse (v7, 69 bytes, CorrId 2754, 
rtt 309.69ms)
2021-11-26 02:31:30,883  Sent ProduceRequest (v7, 1818 bytes @ 0, CorrId 2757)
2021-11-26 02:31:30,887  Sent ProduceRequest (v7, 1655 bytes @ 0, CorrId 2758)
2021-11-26 02:31:30,888  Received ProduceResponse (v7, 69 bytes, CorrId 2755, 
rtt 318.85ms)
2021-11-26 02:31:30,893  Sent partial ProduceRequest (v7, 0+16527/37562 bytes, 
CorrId 2759)
2021-11-26 02:31:30,894  Sent partial ProduceRequest (v7, 16527+16384/37562 
bytes, CorrId 2759)
2021-11-26 02:31:30,895  Sent ProduceRequest (v7, 37562 bytes @ 32911, CorrId 
2759)
2021-11-26 02:31:30,896  Sent ProduceRequest (v7, 4700 bytes @ 0, CorrId 2760)
2021-11-26 02:31:30,897  Received ProduceResponse (v7, 69 bytes, CorrId 2756, 
rtt 317.74ms)
2021-11-26 02:31:30,897  Received ProduceResponse (v7, 69 bytes, CorrId 2757, 
rtt 4.22ms)
2021-11-26 02:31:30,899  Received ProduceResponse (v7, 69 bytes, CorrId 2758, 
rtt 2.61ms){code}
 
The requests of CorrId 2753 and 2754 are almost sent at the same time, but the 
Response of 2754 is delayed for ~300ms. 

We checked the logs on the broker.

 
{code:java}
[2021-11-26 02:31:30,873] DEBUG Completed request:RequestHeader(apiKey=PRODUCE, 
apiVersion=7, clientId=rdkafka, correlationId=2754) – 
{acks=1,timeout=80,numPartitions=1},response: 
{responses=[\{topic=***,partition_responses=[{partition=32,error_code=0,base_offset=58625,log_append_time=-1,log_start_offset=49773}]}
],throttle_time_ms=0} from connection 
10.10.44.59:9093-10.10.0.68:31183-66;totalTime:0.852,requestQueueTime:0.128,localTime:0.427,remoteTime:0.09,throttleTime:0,responseQueueTime:0.073,sendTime:0.131,securityProtocol:SASL_SSL,principal:User:***,listener:SASL_SSL,clientInformation:ClientInformation(softwareName=confluent-kafka-python,
 softwareVersion=1.5.0-rdkafka-1.5.2) (kafka.request.logger)
{code}
 
 
It seems that the time cost on the server side is very small. What’s the reason 
for the latency spikes?

We also did tcpdump  at the server side and found the delay comes from the 
server side.

The CorrId=2754’s request was received at 10:31:30.566172 and The CorrId=2754’s 
response was sent at 10:31:30.873518. So, the server's  processing time for 
this request is about {*}873-566=307ms{*}.

wireshark shows the CorrId=2754 ProduceRequest's timestamp and request info.

!image-2021-12-21-14-45-22-716.png!

wireshark shows the CorrId=2754 ProduceResponse's timestamp and response info.

!image-2021-12-21-14-44-56-689.png!

 

We checked the source code and found the problems. The broker’s processor’s run 
loop is as following:

!image-5.png|width=1001,height=449!

Look at the poll  function, you can see the {*}poll timeout value is 300ms{*}.
{code:java}
private def poll(): Unit = {
    val pollTimeout = if (newConnections.isEmpty) 300 else 0
    try selector.poll(pollTimeout)
    catch {
      case e @ (_: IllegalStateException | _: IOException) =>
        // The exception is not re-thrown and any completed 
sends/receives/connections/disconnections
        // from this poll will be processed.
        error(s"Processor $id poll failed", e)
    }
  }{code}
 
The following is the selector.poll function:

!image-6.png!

So,  we may encounter the following situation:
 * The first run in the loop.
 ** poll -> received request ->processCompletedReceives -> request to queue.

 * The second run in the loop.
 ** processNewResponse-> ResponseQueue is empty(IO thread is processing the 
request) -> poll() ->  select(timeout=0) -> 
{color:#172b4d}madeReadProgressLastCall = false{color}

 * The third run in the loop.
 ** processNewResponse ->   ResponseQueue is *NOT* empty -> poll() -> 
select(timeout=300) immediately return, because the response data is already, 
the fd has been added to write_fd sets.

 * The fourth run in the loop.
 ** ResponseQueue is empty() -> poll() -> select(timeout=300) wait for 300ms or 
new data arrives.

The server may receive server produce requests at one time  but can only handle 
one request each time, after the previous response sending finished, then it 
can handle the next request.  When the previous request is in handling status, 
the other requests are saved in the cache. So, if the first response was sent 
and at that time no new data arrived, the saved request may be delayed for 
300ms to process. 

{color:#ff0000}*We suggest changing the poll timeout value from 300  to 
10.*{color}

*The following two figures show the comparisons of Request-Response RTT value.*

*!image-3.png!*

 RTT values when poll timeout value = 300

 

!image-2.png!

RTT values when poll timeout value = 10

 

Another problem, why does the server's  log show the total time is very small?

Because the start  time is set in the following function  
processCompletedReceives, yet when the request is saved at the cache, the timer 
doesn't start, so the totalTime doesn't include the time in the cache.

!image-7.png!

  was:
Hi team:

We have found the value in the source code 
[here|https://github.com/apache/kafka/blob/2.7/core/src/main/scala/kafka/network/SocketServer.scala#L922]
 may lead broker’s  ProduceResponse to be delayed for 300ms.
 * Server-version: 2.13-2.7.0.
 * Client-version: confluent-kafka-python-1.5.0.

we have set the client’s  configuration as following:
{code:java}
ling.ms = 0
acks = 1
delivery.timeout.ms = 100
request.timeout.ms =  80
Sasl.mechanism =  “PLAIN”
Security.protocol  =  “SASL_SSL”
......
{code}
Because we set ACKs = 1, the client sends ProduceRequests and receives 
ProduceResponses from brokers. The leader broker doesn't need to wait for the 
ISR’s writing data to disk successfully.  It can reply to the client by sending 
ProduceResponses directly. In our situation, the ping value between the client 
and the kafka brokers is about ~10ms, and most of the time, the responses are 
received about 10ms after the Produce requests are sent. But sometimes the 
responses are received about ~300ms later.

The following shows the log from the client.
{code:java}
2021-11-26 02:31:30,567  Sent partial ProduceRequest (v7, 0+16527/37366 bytes, 
CorrId 2753)
2021-11-26 02:31:30,568  Sent partial ProduceRequest (v7, 16527+16384/37366 
bytes, CorrId 2753)
2021-11-26 02:31:30,568  Sent ProduceRequest (v7, 37366 bytes @ 32911, CorrId 
2753)
2021-11-26 02:31:30,570  Sent ProduceRequest (v7, 4714 bytes @ 0, CorrId 2754)
2021-11-26 02:31:30,571  Sent ProduceRequest (v7, 1161 bytes @ 0, CorrId 2755)
2021-11-26 02:31:30,572  Sent ProduceRequest (v7, 1240 bytes @ 0, CorrId 2756)
2021-11-26 02:31:30,572  Received ProduceResponse (v7, 69 bytes, CorrId 2751, 
rtt 9.79ms)
2021-11-26 02:31:30,572  Received ProduceResponse (v7, 69 bytes, CorrId 2752, 
rtt 10.34ms)
2021-11-26 02:31:30,573  Received ProduceResponse (v7, 69 bytes, CorrId 2753, 
rtt 10.11ms)
2021-11-26 02:31:30,872  Received ProduceResponse (v7, 69 bytes, CorrId 2754, 
rtt 309.69ms)
2021-11-26 02:31:30,883  Sent ProduceRequest (v7, 1818 bytes @ 0, CorrId 2757)
2021-11-26 02:31:30,887  Sent ProduceRequest (v7, 1655 bytes @ 0, CorrId 2758)
2021-11-26 02:31:30,888  Received ProduceResponse (v7, 69 bytes, CorrId 2755, 
rtt 318.85ms)
2021-11-26 02:31:30,893  Sent partial ProduceRequest (v7, 0+16527/37562 bytes, 
CorrId 2759)
2021-11-26 02:31:30,894  Sent partial ProduceRequest (v7, 16527+16384/37562 
bytes, CorrId 2759)
2021-11-26 02:31:30,895  Sent ProduceRequest (v7, 37562 bytes @ 32911, CorrId 
2759)
2021-11-26 02:31:30,896  Sent ProduceRequest (v7, 4700 bytes @ 0, CorrId 2760)
2021-11-26 02:31:30,897  Received ProduceResponse (v7, 69 bytes, CorrId 2756, 
rtt 317.74ms)
2021-11-26 02:31:30,897  Received ProduceResponse (v7, 69 bytes, CorrId 2757, 
rtt 4.22ms)
2021-11-26 02:31:30,899  Received ProduceResponse (v7, 69 bytes, CorrId 2758, 
rtt 2.61ms){code}
 
The requests of CorrId 2753 and 2754 are almost sent at the same time, but the 
Response of 2754 is delayed for ~300ms. 

We checked the logs on the broker.

 
{code:java}
[2021-11-26 02:31:30,873] DEBUG Completed request:RequestHeader(apiKey=PRODUCE, 
apiVersion=7, clientId=rdkafka, correlationId=2754) – 
{acks=1,timeout=80,numPartitions=1},response: 
{responses=[\{topic=***,partition_responses=[{partition=32,error_code=0,base_offset=58625,log_append_time=-1,log_start_offset=49773}]}
],throttle_time_ms=0} from connection 
10.10.44.59:9093-10.10.0.68:31183-66;totalTime:0.852,requestQueueTime:0.128,localTime:0.427,remoteTime:0.09,throttleTime:0,responseQueueTime:0.073,sendTime:0.131,securityProtocol:SASL_SSL,principal:User:***,listener:SASL_SSL,clientInformation:ClientInformation(softwareName=confluent-kafka-python,
 softwareVersion=1.5.0-rdkafka-1.5.2) (kafka.request.logger)
{code}
 
 
It seems that the time cost on the server side is very small. What’s the reason 
for the latency spikes?

We also did tcpdump  at the server side and found the delay comes from the 
server side.

The CorrId=2754’s request was received at 10:31:30.566172 and The CorrId=2754’s 
response was sent at 10:31:30.873518. So, the server's  processing time for 
this request is about {*}873-566=307ms{*}.

wireshark shows the CorrId=2754 ProduceRequest's timestamp and request info.

!image-2021-12-21-14-45-22-716.png!

wireshark shows the CorrId=2754 ProduceResponse's timestamp and response info.

!image-2021-12-21-14-44-56-689.png!

 

We checked the source code and found the problems. The broker’s processor’s run 
loop is as following:

!image-5.png|width=1001,height=449!

Look at the poll  function, you can see the {*}poll timeout value is 300ms{*}.
{code:java}
private def poll(): Unit = {
    val pollTimeout = if (newConnections.isEmpty) 300 else 0
    try selector.poll(pollTimeout)
    catch {
      case e @ (_: IllegalStateException | _: IOException) =>
        // The exception is not re-thrown and any completed 
sends/receives/connections/disconnections
        // from this poll will be processed.
        error(s"Processor $id poll failed", e)
    }
  }{code}
 
The following is the selector.poll function:

!image-6.png!

So,  we may encounter the following situation:
 * The first run in the loop.
 ** poll -> received request ->processCompletedReceives -> request to queue.

 * The second run in the loop.
 ** processNewResponse-> ResponseQueue is empty(IO thread is processing the 
request) -> poll() ->  select(timeout=0) -> 
{color:#172b4d}madeReadProgressLastCall = false{color}

 * The third run in the loop.
 ** processNewResponse ->   ResponseQueue is *NOT* empty -> poll() -> 
select(timeout=300) immediately return, because the response data is already, 
the fd has been added to write_fd sets.

 * The fourth run in the loop.
 ** ResponseQueue is empty() -> poll() -> select(timeout=300) wait for 300ms or 
new data arrives.

The server may receive server produce requests at one time  but can only handle 
one request each time, after the previous response sending finished, then it 
can handle the next request.  When the previous request is in handling status, 
the other requests are saved in the cache. So, if the first response was sent 
and at that time no new data arrived, the saved request may be delayed for 
300ms to process. 

{color:#FF0000}*We suggest changing the poll timeout value from 300  to 
10.*{color}

*The following two figures show the comparisons of Request-Response RTT value.*

*!image-3.png!*

 RTT values when poll timeout value = 300

!image-2.png!

RTT values when poll timeout value = 10

Another problem, why does the server's  log show the total time is very small?

Because the start  time is set in the following function  
processCompletedReceives, yet when the request is saved at the cache, the timer 
doesn't start, so the totalTime doesn't include the time in the cache.

!image-7.png!


> The broker's  ProduceResponse may be delayed for 300ms
> ------------------------------------------------------
>
>                 Key: KAFKA-13559
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13559
>             Project: Kafka
>          Issue Type: Task
>          Components: core
>    Affects Versions: 2.7.0
>            Reporter: frankshi
>            Priority: Major
>         Attachments: image-1.png, image-2.png, 
> image-2021-12-21-14-44-56-689.png, image-2021-12-21-14-45-22-716.png, 
> image-3.png, image-5.png, image-6.png, image-7.png, image.png
>
>
> Hi team:
> We have found the value in the source code 
> [here|https://github.com/apache/kafka/blob/2.7/core/src/main/scala/kafka/network/SocketServer.scala#L922]
>  may lead broker’s  ProduceResponse to be delayed for 300ms.
>  * Server-version: 2.13-2.7.0.
>  * Client-version: confluent-kafka-python-1.5.0.
> we have set the client’s  configuration as following:
> {code:java}
> ling.ms = 0
> acks = 1
> delivery.timeout.ms = 100
> request.timeout.ms =  80
> Sasl.mechanism =  “PLAIN”
> Security.protocol  =  “SASL_SSL”
> ......
> {code}
> Because we set ACKs = 1, the client sends ProduceRequests and receives 
> ProduceResponses from brokers. The leader broker doesn't need to wait for the 
> ISR’s writing data to disk successfully.  It can reply to the client by 
> sending ProduceResponses directly. In our situation, the ping value between 
> the client and the kafka brokers is about ~10ms, and most of the time, the 
> responses are received about 10ms after the Produce requests are sent. But 
> sometimes the responses are received about ~300ms later.
> The following shows the log from the client.
> {code:java}
> 2021-11-26 02:31:30,567  Sent partial ProduceRequest (v7, 0+16527/37366 
> bytes, CorrId 2753)
> 2021-11-26 02:31:30,568  Sent partial ProduceRequest (v7, 16527+16384/37366 
> bytes, CorrId 2753)
> 2021-11-26 02:31:30,568  Sent ProduceRequest (v7, 37366 bytes @ 32911, CorrId 
> 2753)
> 2021-11-26 02:31:30,570  Sent ProduceRequest (v7, 4714 bytes @ 0, CorrId 2754)
> 2021-11-26 02:31:30,571  Sent ProduceRequest (v7, 1161 bytes @ 0, CorrId 2755)
> 2021-11-26 02:31:30,572  Sent ProduceRequest (v7, 1240 bytes @ 0, CorrId 2756)
> 2021-11-26 02:31:30,572  Received ProduceResponse (v7, 69 bytes, CorrId 2751, 
> rtt 9.79ms)
> 2021-11-26 02:31:30,572  Received ProduceResponse (v7, 69 bytes, CorrId 2752, 
> rtt 10.34ms)
> 2021-11-26 02:31:30,573  Received ProduceResponse (v7, 69 bytes, CorrId 2753, 
> rtt 10.11ms)
> 2021-11-26 02:31:30,872  Received ProduceResponse (v7, 69 bytes, CorrId 2754, 
> rtt 309.69ms)
> 2021-11-26 02:31:30,883  Sent ProduceRequest (v7, 1818 bytes @ 0, CorrId 2757)
> 2021-11-26 02:31:30,887  Sent ProduceRequest (v7, 1655 bytes @ 0, CorrId 2758)
> 2021-11-26 02:31:30,888  Received ProduceResponse (v7, 69 bytes, CorrId 2755, 
> rtt 318.85ms)
> 2021-11-26 02:31:30,893  Sent partial ProduceRequest (v7, 0+16527/37562 
> bytes, CorrId 2759)
> 2021-11-26 02:31:30,894  Sent partial ProduceRequest (v7, 16527+16384/37562 
> bytes, CorrId 2759)
> 2021-11-26 02:31:30,895  Sent ProduceRequest (v7, 37562 bytes @ 32911, CorrId 
> 2759)
> 2021-11-26 02:31:30,896  Sent ProduceRequest (v7, 4700 bytes @ 0, CorrId 2760)
> 2021-11-26 02:31:30,897  Received ProduceResponse (v7, 69 bytes, CorrId 2756, 
> rtt 317.74ms)
> 2021-11-26 02:31:30,897  Received ProduceResponse (v7, 69 bytes, CorrId 2757, 
> rtt 4.22ms)
> 2021-11-26 02:31:30,899  Received ProduceResponse (v7, 69 bytes, CorrId 2758, 
> rtt 2.61ms){code}
>  
> The requests of CorrId 2753 and 2754 are almost sent at the same time, but 
> the Response of 2754 is delayed for ~300ms. 
> We checked the logs on the broker.
>  
> {code:java}
> [2021-11-26 02:31:30,873] DEBUG Completed 
> request:RequestHeader(apiKey=PRODUCE, apiVersion=7, clientId=rdkafka, 
> correlationId=2754) – {acks=1,timeout=80,numPartitions=1},response: 
> {responses=[\{topic=***,partition_responses=[{partition=32,error_code=0,base_offset=58625,log_append_time=-1,log_start_offset=49773}]}
> ],throttle_time_ms=0} from connection 
> 10.10.44.59:9093-10.10.0.68:31183-66;totalTime:0.852,requestQueueTime:0.128,localTime:0.427,remoteTime:0.09,throttleTime:0,responseQueueTime:0.073,sendTime:0.131,securityProtocol:SASL_SSL,principal:User:***,listener:SASL_SSL,clientInformation:ClientInformation(softwareName=confluent-kafka-python,
>  softwareVersion=1.5.0-rdkafka-1.5.2) (kafka.request.logger)
> {code}
>  
>  
> It seems that the time cost on the server side is very small. What’s the 
> reason for the latency spikes?
> We also did tcpdump  at the server side and found the delay comes from the 
> server side.
> The CorrId=2754’s request was received at 10:31:30.566172 and The 
> CorrId=2754’s response was sent at 10:31:30.873518. So, the server's  
> processing time for this request is about {*}873-566=307ms{*}.
> wireshark shows the CorrId=2754 ProduceRequest's timestamp and request info.
> !image-2021-12-21-14-45-22-716.png!
> wireshark shows the CorrId=2754 ProduceResponse's timestamp and response info.
> !image-2021-12-21-14-44-56-689.png!
>  
> We checked the source code and found the problems. The broker’s processor’s 
> run loop is as following:
> !image-5.png|width=1001,height=449!
> Look at the poll  function, you can see the {*}poll timeout value is 300ms{*}.
> {code:java}
> private def poll(): Unit = {
>     val pollTimeout = if (newConnections.isEmpty) 300 else 0
>     try selector.poll(pollTimeout)
>     catch {
>       case e @ (_: IllegalStateException | _: IOException) =>
>         // The exception is not re-thrown and any completed 
> sends/receives/connections/disconnections
>         // from this poll will be processed.
>         error(s"Processor $id poll failed", e)
>     }
>   }{code}
>  
> The following is the selector.poll function:
> !image-6.png!
> So,  we may encounter the following situation:
>  * The first run in the loop.
>  ** poll -> received request ->processCompletedReceives -> request to queue.
>  * The second run in the loop.
>  ** processNewResponse-> ResponseQueue is empty(IO thread is processing the 
> request) -> poll() ->  select(timeout=0) -> 
> {color:#172b4d}madeReadProgressLastCall = false{color}
>  * The third run in the loop.
>  ** processNewResponse ->   ResponseQueue is *NOT* empty -> poll() -> 
> select(timeout=300) immediately return, because the response data is already, 
> the fd has been added to write_fd sets.
>  * The fourth run in the loop.
>  ** ResponseQueue is empty() -> poll() -> select(timeout=300) wait for 300ms 
> or new data arrives.
> The server may receive server produce requests at one time  but can only 
> handle one request each time, after the previous response sending finished, 
> then it can handle the next request.  When the previous request is in 
> handling status, the other requests are saved in the cache. So, if the first 
> response was sent and at that time no new data arrived, the saved request may 
> be delayed for 300ms to process. 
> {color:#ff0000}*We suggest changing the poll timeout value from 300  to 
> 10.*{color}
> *The following two figures show the comparisons of Request-Response RTT 
> value.*
> *!image-3.png!*
>  RTT values when poll timeout value = 300
>  
> !image-2.png!
> RTT values when poll timeout value = 10
>  
> Another problem, why does the server's  log show the total time is very small?
> Because the start  time is set in the following function  
> processCompletedReceives, yet when the request is saved at the cache, the 
> timer doesn't start, so the totalTime doesn't include the time in the cache.
> !image-7.png!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to