[ 
https://issues.apache.org/jira/browse/KAFKA-14189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17602037#comment-17602037
 ] 

Guozhang Wang commented on KAFKA-14189:
---------------------------------------

Hi [~aglicacha] [~vongosling]

The main motivation for using two connection sockets for the coordinator and 
partition leader is to not block coordination related requests such as 
join/sync by fetching requests (which could be long polling, and during that 
time we cannot send other requests using the same socket). Reusing the 
connection may cause issues e.g. a heartbeat request not being processed in 
time if there's already fetching request parked at the broker side.

> Improve connection limit and reuse of coordinator and leader in KafkaConsumer
> -----------------------------------------------------------------------------
>
>                 Key: KAFKA-14189
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14189
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 0.9.0.0
>            Reporter: Junyang Liu
>            Priority: Major
>
> The connection id of connection with coordinator in KafkaConsumer is 
> Integer.MAX_VALUE - coordinator id, which is different with connection id of 
> partition leader. So the connection cannot be reused when coordinator and 
> leader are in the same broker, which means we need two seperated connections 
> with the same broker. Suppose such case, a consumer has connected to the 
> coordinator and finished Join and Sync, and wants to send FETCH to leader in 
> the same broker. But the connection count has reached limit, so the consumer 
> with be in the group but cannot consume messages
> partial logs:
> {code:java}
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Added 
> READ_UNCOMMITTED fetch request for partition topic-test-4 at offset 9 to node 
> <ip>:9092 (id: 2 rack: 2) 
> (org.apache.kafka.clients.consumer.internals.Fetcher) 
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Built full fetch 
> (sessionId=INVALID, epoch=INITIAL) for node 2 with 1 partition(s). 
> (org.apache.kafka.clients.FetchSessionHandler) 
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Sending 
> READ_UNCOMMITTED FullFetchRequest(topic-test-4) to broker <ip>:9092 (id: 2 
> rack: 2) (org.apache.kafka.clients.consumer.internals.Fetcher)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Initiating 
> connection to node <ip>:9092 (id: 2 rack: 2) using address /<ip> 
> (org.apache.kafka.clients.NetworkClient) 
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Using older server 
> API v3 to send OFFSET_COMMIT 
> {group_id=group-test,generation_id=134,member_id=consumer-11-2e2b16eb-516c-496c-8aa4-c6e990b43598,retention_time=-1,topics=[{topic=topic-test,partitions=[{partition=3,offset=0,metadata=},{partition=4,offset=9,metadata=},{partition=5,offset=13,metadata=}]}]}
>  with correlation id 242 to node 2147483645 
> (org.apache.kafka.clients.NetworkClient)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Created socket with 
> SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2 
> (org.apache.kafka.common.network.Selector)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Completed 
> connection to node 2. Fetching API versions. 
> (org.apache.kafka.clients.NetworkClient)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Initiating API 
> versions fetch from node 2. (org.apache.kafka.clients.NetworkClient)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Subscribed to 
> topic(s): topic-test (org.apache.kafka.clients.consumer.KafkaConsumer)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Connection with 
> /<ip> disconnected (org.apache.kafka.common.network.Selector)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Node 2 
> disconnected. (org.apache.kafka.clients.NetworkClient) 
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Cancelled request 
> with header RequestHeader(apiKey=FETCH, apiVersion=10, clientId=consumer-11, 
> correlationId=241) due to node 2 being disconnected 
> (org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Error sending fetch 
> request (sessionId=INVALID, epoch=INITIAL) to node 2: 
> org.apache.kafka.common.errors.DisconnectException. 
> (org.apache.kafka.clients.FetchSessionHandler){code}
> connection to coordinator, rebalance and fetching offsets have finished. when 
> preparing connection to leader for fetching, the connection limit has 
> reached, so after tcp connection, the broker disconnect the client.  
>  
> The root cause of this issue is that the process of consuming is a 
> combination of multiple connections(connections with coordinator and leader 
> in same broker), not atomic, which may leads to "half connected". I think we 
> can do some improvement:
>  # reuse the connection with coordinator and leader in the same broker, to 
> avoid such "half connection" in KafkaConsumer when connection count limit 
> reached
>  # make the connection limit more flexible, such as allowing extra related 
> connections of a consumer when the connection count limit has reached if it 
> has connected to broker



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to