[ 
https://issues.apache.org/jira/browse/KAFKA-17182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17922724#comment-17922724
 ] 

Lianet Magrans commented on KAFKA-17182:
----------------------------------------

Cherry-picked to 4.0 
[https://github.com/apache/kafka/commit/9c02072c4fd8159087b7960872d6ee8271a308ab]
 

> Consumer fetch sessions are evicted too quickly with AsyncKafkaConsumer
> -----------------------------------------------------------------------
>
>                 Key: KAFKA-17182
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17182
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>    Affects Versions: 3.8.0
>            Reporter: Kirk True
>            Assignee: Kirk True
>            Priority: Blocker
>              Labels: consumer-threading-refactor, fetcher, 
> kip-848-client-support
>             Fix For: 4.0.0
>
>
> h1. Background
> {{Consumer}} implementations fetch data from the cluster and temporarily 
> buffer it in memory until the user next calls {{{}Consumer.poll(){}}}. When a 
> fetch request is being generated, partitions that already have buffered data 
> are not included in the fetch request.
> The {{ClassicKafkaConsumer}} performs much of its fetch logic and network I/O 
> in the application thread. On {{{}poll(){}}}, if there is any 
> locally-buffered data, the {{ClassicKafkaConsumer}} does not fetch _any_ new 
> data and simply returns the buffered data to the user from {{{}poll(){}}}.
> On the other hand, the {{AsyncKafkaConsumer}} consumer splits its logic and 
> network I/O between two threads, which results in a potential race condition 
> during fetch. The {{AsyncKafkaConsumer}} also checks for buffered data on its 
> application thread. If it finds there is none, it signals the background 
> thread to create a fetch request. However, it's possible for the background 
> thread to receive data from a previous fetch and buffer it before the fetch 
> request logic starts. When that occurs, as the background thread creates a 
> new fetch request, it skips any buffered data, which has the unintended 
> result that those partitions get added to the fetch request's "to remove" 
> set. This signals to the broker to remove those partitions from its internal 
> cache.
> This issue is technically possible in the {{ClassicKafkaConsumer}} too, since 
> the heartbeat thread performs network I/O in addition to the application 
> thread. However, because of the frequency at which the 
> {{{}AsyncKafkaConsumer{}}}'s background thread runs, it is ~100x more likely 
> to happen.
> h1. Options
> The core decision is: what should the background thread do if it is asked to 
> create a fetch request and it discovers there's buffered data. There were 
> multiple proposals to address this issue in the {{{}AsyncKafkaConsumer{}}}. 
> Among them are:
>  # The background thread should omit buffered partitions from the fetch 
> request as before (this is the existing behavior)
>  # The background thread should skip the fetch request generation entirely if 
> there are _any_ buffered partitions
>  # The background thread should include buffered partitions in the fetch 
> request, but use a small “max bytes” value
>  # The background thread should skip fetching from the nodes that have 
> buffered partitions
> Option 3 won out. The change in {{AsyncKafkaConsumer}} is to include in the 
> fetch request any partition with buffered data. By using a "max bytes" size 
> of 1, this should cause the fetch response to return as little data as 
> possible. In that way, the consumer doesn't buffer too much data on the 
> client before it can be returned from {{{}poll(){}}}.
> h1. Testing
> h2. Eviction rate testing
> Here are the results of our internal stress testing:
>  * {{{}ClassicKafkaConsumer{}}}—after the initial spike during test start up, 
> the average rate settles down to ~0.14 evictions/second 
> [!https://private-user-images.githubusercontent.com/92057/389141955-b13c46a2-226f-44c9-a8c5-d6dc0d38d40e.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MzY3OTU0OTksIm5iZiI6MTczNjc5NTE5OSwicGF0aCI6Ii85MjA1Ny8zODkxNDE5NTUtYjEzYzQ2YTItMjI2Zi00NGM5LWE4YzUtZDZkYzBkMzhkNDBlLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNTAxMTMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjUwMTEzVDE5MDYzOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWVlMzc3MGI0YzQ3YjMzNDAyMzk4ZTVjNDc0MzMzNWQ3OWI5MzZhN2M4ZmJhMTNkMzU2ODhiMzg4YmM2NDI5NGImWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.EyVhI7-v_crz8R465PVuKqZoqzDoImal8SBlCOFitCY|width=1111,height=400!|https://private-user-images.githubusercontent.com/92057/389141955-b13c46a2-226f-44c9-a8c5-d6dc0d38d40e.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MzY3OTU0OTksIm5iZiI6MTczNjc5NTE5OSwicGF0aCI6Ii85MjA1Ny8zODkxNDE5NTUtYjEzYzQ2YTItMjI2Zi00NGM5LWE4YzUtZDZkYzBkMzhkNDBlLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNTAxMTMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjUwMTEzVDE5MDYzOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWVlMzc3MGI0YzQ3YjMzNDAyMzk4ZTVjNDc0MzMzNWQ3OWI5MzZhN2M4ZmJhMTNkMzU2ODhiMzg4YmM2NDI5NGImWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.EyVhI7-v_crz8R465PVuKqZoqzDoImal8SBlCOFitCY]
>  * {{{}AsyncKafkaConsumer{}}}, (w/o fix)—after startup, the evictions still 
> settle down, but they are about 100x higher than the {{ClassicKafkaConsumer}} 
> at ~1.48 evictions/second 
> [!https://private-user-images.githubusercontent.com/92057/389141959-dca5ff7f-74bd-4174-b6e6-39c4e8479684.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MzY3OTU0OTksIm5iZiI6MTczNjc5NTE5OSwicGF0aCI6Ii85MjA1Ny8zODkxNDE5NTktZGNhNWZmN2YtNzRiZC00MTc0LWI2ZTYtMzljNGU4NDc5Njg0LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNTAxMTMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjUwMTEzVDE5MDYzOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWQwOTVkMTk3ZTYyMTFlYjlkMmNkN2MwYzJhNjVhZWM1MGNmMWZjYzQ0YzRmZGRkNTFjZWQ3MTc4ZWY0OTk1ZTMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.66SRL4hvz-2omy0NGwbb5apktUAkoJ5Oh7IrgFtG-N4|width=1106,height=400!|https://private-user-images.githubusercontent.com/92057/389141959-dca5ff7f-74bd-4174-b6e6-39c4e8479684.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MzY3OTU0OTksIm5iZiI6MTczNjc5NTE5OSwicGF0aCI6Ii85MjA1Ny8zODkxNDE5NTktZGNhNWZmN2YtNzRiZC00MTc0LWI2ZTYtMzljNGU4NDc5Njg0LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNTAxMTMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjUwMTEzVDE5MDYzOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWQwOTVkMTk3ZTYyMTFlYjlkMmNkN2MwYzJhNjVhZWM1MGNmMWZjYzQ0YzRmZGRkNTFjZWQ3MTc4ZWY0OTk1ZTMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.66SRL4hvz-2omy0NGwbb5apktUAkoJ5Oh7IrgFtG-N4]
>  * {{AsyncKafkaConsumer}} (w/ fix)—the eviction rate is now closer to the 
> {{ClassicKafkaConsumer}} at ~0.22 evictions/second 
> [!https://private-user-images.githubusercontent.com/92057/389141958-19009791-d63e-411d-96ed-b49605f93325.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MzY3OTU0OTksIm5iZiI6MTczNjc5NTE5OSwicGF0aCI6Ii85MjA1Ny8zODkxNDE5NTgtMTkwMDk3OTEtZDYzZS00MTFkLTk2ZWQtYjQ5NjA1ZjkzMzI1LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNTAxMTMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjUwMTEzVDE5MDYzOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTM4ODU1ODNkYmEzYTliMjRkZGY3MDMyMGExYmZmY2VjNzM0OTJkMDNmZDMyZmY0M2QwMmRhOWRmNDRiODY2NWImWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.8D90EW8XJBDJUANqhlHtxmJgKToKJWKqcfP3EiJmbPc|width=1110,height=400!|https://private-user-images.githubusercontent.com/92057/389141958-19009791-d63e-411d-96ed-b49605f93325.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MzY3OTU0OTksIm5iZiI6MTczNjc5NTE5OSwicGF0aCI6Ii85MjA1Ny8zODkxNDE5NTgtMTkwMDk3OTEtZDYzZS00MTFkLTk2ZWQtYjQ5NjA1ZjkzMzI1LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNTAxMTMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjUwMTEzVDE5MDYzOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTM4ODU1ODNkYmEzYTliMjRkZGY3MDMyMGExYmZmY2VjNzM0OTJkMDNmZDMyZmY0M2QwMmRhOWRmNDRiODY2NWImWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.8D90EW8XJBDJUANqhlHtxmJgKToKJWKqcfP3EiJmbPc]
> h2. {{EndToEndLatency}} testing
> The bundled {{EndToEndLatency}} test runner was executed on a single machine 
> using Docker. The {{apache/kafka:latest}} Docker image was used and either 
> the {{cluster/combined/plaintext/docker-compose.yml}} or 
> {{single-node/plaintext/docker-compose.yml}} Docker Compose configuration 
> files, depending on the test. The Docker containers were recreated from 
> scratch before each test.
> A single topic was created with 30 partitions and with a replication factor 
> of either 1 or 3, depending on a single- or multi-node setup.
> For each of the test runs these argument values were used:
>  * Message count: 100000
>  * {{{}acks{}}}: 1
>  * Message size: 128 bytes
> A configuration file which contained a single configuration value of 
> {{group.protocol=<$group_protocol>}} was also provided to the test, where 
> {{$group_protocol}} was either {{CLASSIC}} or {{{}CONSUMER{}}}.
> h3. Test results
> h4. Test 1—{{{}CLASSIC{}}} group protocol, cluster size: 3 nodes, replication 
> factor: 3
> ||Metric||{{trunk}}||PR||
> |Average latency|1.4901|1.4871|
> |50th percentile|1|1|
> |99th percentile|3|3|
> |99.9th percentile|6|6|
> h4. Test 2—{{{}CONSUMER{}}} group protocol, cluster size: 3 nodes, 
> replication factor: 3
> ||Metric||{{trunk}}||PR||
> |Average latency|1.4704|1.4807|
> |50th percentile|1|1|
> |99th percentile|3|3|
> |99.9th percentile|6|7|
> h4. Test 3—{{{}CLASSIC{}}} group protocol, cluster size: 1 node, replication 
> factor: 1
> ||Metric||{{trunk}}||PR||
> |Average latency|1.0777|1.0193|
> |50th percentile|1|1|
> |99th percentile|2|2|
> |99.9th percentile|5|4|
> h4. Test 4—{{{}CONSUMER{}}} group protocol, cluster size: 1 node, replication 
> factor: 1
> ||Metric||{{trunk}}||PR||
> |Average latency|1.0937|1.0503|
> |50th percentile|1|1|
> |99th percentile|2|2|
> |99.9th percentile|4|4|
> h3. Conclusion
> These tests did not reveal any significant differences between the current 
> fetcher logic on {{trunk}} and the one proposed in this PR. Addition test 
> runs using larger message counts and/or larger message sizes did not affect 
> the result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to