[ https://issues.apache.org/jira/browse/KAFKA-17182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17922868#comment-17922868 ]
Matthias J. Sax commented on KAFKA-17182: ----------------------------------------- For details about the introduced regression, cf https://issues.apache.org/jira/browse/KAFKA-18686 > Consumer fetch sessions are evicted too quickly with AsyncKafkaConsumer > ----------------------------------------------------------------------- > > Key: KAFKA-17182 > URL: https://issues.apache.org/jira/browse/KAFKA-17182 > Project: Kafka > Issue Type: Bug > Components: clients, consumer > Affects Versions: 3.8.0 > Reporter: Kirk True > Assignee: Kirk True > Priority: Blocker > Labels: consumer-threading-refactor, fetcher, > kip-848-client-support > Fix For: 4.0.0 > > > h1. Background > {{Consumer}} implementations fetch data from the cluster and temporarily > buffer it in memory until the user next calls {{{}Consumer.poll(){}}}. When a > fetch request is being generated, partitions that already have buffered data > are not included in the fetch request. > The {{ClassicKafkaConsumer}} performs much of its fetch logic and network I/O > in the application thread. On {{{}poll(){}}}, if there is any > locally-buffered data, the {{ClassicKafkaConsumer}} does not fetch _any_ new > data and simply returns the buffered data to the user from {{{}poll(){}}}. > On the other hand, the {{AsyncKafkaConsumer}} consumer splits its logic and > network I/O between two threads, which results in a potential race condition > during fetch. The {{AsyncKafkaConsumer}} also checks for buffered data on its > application thread. If it finds there is none, it signals the background > thread to create a fetch request. However, it's possible for the background > thread to receive data from a previous fetch and buffer it before the fetch > request logic starts. When that occurs, as the background thread creates a > new fetch request, it skips any buffered data, which has the unintended > result that those partitions get added to the fetch request's "to remove" > set. This signals to the broker to remove those partitions from its internal > cache. > This issue is technically possible in the {{ClassicKafkaConsumer}} too, since > the heartbeat thread performs network I/O in addition to the application > thread. However, because of the frequency at which the > {{{}AsyncKafkaConsumer{}}}'s background thread runs, it is ~100x more likely > to happen. > h1. Options > The core decision is: what should the background thread do if it is asked to > create a fetch request and it discovers there's buffered data. There were > multiple proposals to address this issue in the {{{}AsyncKafkaConsumer{}}}. > Among them are: > # The background thread should omit buffered partitions from the fetch > request as before (this is the existing behavior) > # The background thread should skip the fetch request generation entirely if > there are _any_ buffered partitions > # The background thread should include buffered partitions in the fetch > request, but use a small “max bytes” value > # The background thread should skip fetching from the nodes that have > buffered partitions > Option 3 won out. The change in {{AsyncKafkaConsumer}} is to include in the > fetch request any partition with buffered data. By using a "max bytes" size > of 1, this should cause the fetch response to return as little data as > possible. In that way, the consumer doesn't buffer too much data on the > client before it can be returned from {{{}poll(){}}}. > h1. Testing > h2. Eviction rate testing > Here are the results of our internal stress testing: > * {{{}ClassicKafkaConsumer{}}}—after the initial spike during test start up, > the average rate settles down to ~0.14 evictions/second > [!https://private-user-images.githubusercontent.com/92057/389141955-b13c46a2-226f-44c9-a8c5-d6dc0d38d40e.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MzY3OTU0OTksIm5iZiI6MTczNjc5NTE5OSwicGF0aCI6Ii85MjA1Ny8zODkxNDE5NTUtYjEzYzQ2YTItMjI2Zi00NGM5LWE4YzUtZDZkYzBkMzhkNDBlLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNTAxMTMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjUwMTEzVDE5MDYzOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWVlMzc3MGI0YzQ3YjMzNDAyMzk4ZTVjNDc0MzMzNWQ3OWI5MzZhN2M4ZmJhMTNkMzU2ODhiMzg4YmM2NDI5NGImWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.EyVhI7-v_crz8R465PVuKqZoqzDoImal8SBlCOFitCY|width=1111,height=400!|https://private-user-images.githubusercontent.com/92057/389141955-b13c46a2-226f-44c9-a8c5-d6dc0d38d40e.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MzY3OTU0OTksIm5iZiI6MTczNjc5NTE5OSwicGF0aCI6Ii85MjA1Ny8zODkxNDE5NTUtYjEzYzQ2YTItMjI2Zi00NGM5LWE4YzUtZDZkYzBkMzhkNDBlLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNTAxMTMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjUwMTEzVDE5MDYzOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWVlMzc3MGI0YzQ3YjMzNDAyMzk4ZTVjNDc0MzMzNWQ3OWI5MzZhN2M4ZmJhMTNkMzU2ODhiMzg4YmM2NDI5NGImWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.EyVhI7-v_crz8R465PVuKqZoqzDoImal8SBlCOFitCY] > * {{{}AsyncKafkaConsumer{}}}, (w/o fix)—after startup, the evictions still > settle down, but they are about 100x higher than the {{ClassicKafkaConsumer}} > at ~1.48 evictions/second > [!https://private-user-images.githubusercontent.com/92057/389141959-dca5ff7f-74bd-4174-b6e6-39c4e8479684.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MzY3OTU0OTksIm5iZiI6MTczNjc5NTE5OSwicGF0aCI6Ii85MjA1Ny8zODkxNDE5NTktZGNhNWZmN2YtNzRiZC00MTc0LWI2ZTYtMzljNGU4NDc5Njg0LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNTAxMTMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjUwMTEzVDE5MDYzOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWQwOTVkMTk3ZTYyMTFlYjlkMmNkN2MwYzJhNjVhZWM1MGNmMWZjYzQ0YzRmZGRkNTFjZWQ3MTc4ZWY0OTk1ZTMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.66SRL4hvz-2omy0NGwbb5apktUAkoJ5Oh7IrgFtG-N4|width=1106,height=400!|https://private-user-images.githubusercontent.com/92057/389141959-dca5ff7f-74bd-4174-b6e6-39c4e8479684.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MzY3OTU0OTksIm5iZiI6MTczNjc5NTE5OSwicGF0aCI6Ii85MjA1Ny8zODkxNDE5NTktZGNhNWZmN2YtNzRiZC00MTc0LWI2ZTYtMzljNGU4NDc5Njg0LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNTAxMTMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjUwMTEzVDE5MDYzOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWQwOTVkMTk3ZTYyMTFlYjlkMmNkN2MwYzJhNjVhZWM1MGNmMWZjYzQ0YzRmZGRkNTFjZWQ3MTc4ZWY0OTk1ZTMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.66SRL4hvz-2omy0NGwbb5apktUAkoJ5Oh7IrgFtG-N4] > * {{AsyncKafkaConsumer}} (w/ fix)—the eviction rate is now closer to the > {{ClassicKafkaConsumer}} at ~0.22 evictions/second > [!https://private-user-images.githubusercontent.com/92057/389141958-19009791-d63e-411d-96ed-b49605f93325.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MzY3OTU0OTksIm5iZiI6MTczNjc5NTE5OSwicGF0aCI6Ii85MjA1Ny8zODkxNDE5NTgtMTkwMDk3OTEtZDYzZS00MTFkLTk2ZWQtYjQ5NjA1ZjkzMzI1LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNTAxMTMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjUwMTEzVDE5MDYzOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTM4ODU1ODNkYmEzYTliMjRkZGY3MDMyMGExYmZmY2VjNzM0OTJkMDNmZDMyZmY0M2QwMmRhOWRmNDRiODY2NWImWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.8D90EW8XJBDJUANqhlHtxmJgKToKJWKqcfP3EiJmbPc|width=1110,height=400!|https://private-user-images.githubusercontent.com/92057/389141958-19009791-d63e-411d-96ed-b49605f93325.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MzY3OTU0OTksIm5iZiI6MTczNjc5NTE5OSwicGF0aCI6Ii85MjA1Ny8zODkxNDE5NTgtMTkwMDk3OTEtZDYzZS00MTFkLTk2ZWQtYjQ5NjA1ZjkzMzI1LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNTAxMTMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjUwMTEzVDE5MDYzOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTM4ODU1ODNkYmEzYTliMjRkZGY3MDMyMGExYmZmY2VjNzM0OTJkMDNmZDMyZmY0M2QwMmRhOWRmNDRiODY2NWImWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.8D90EW8XJBDJUANqhlHtxmJgKToKJWKqcfP3EiJmbPc] > h2. {{EndToEndLatency}} testing > The bundled {{EndToEndLatency}} test runner was executed on a single machine > using Docker. The {{apache/kafka:latest}} Docker image was used and either > the {{cluster/combined/plaintext/docker-compose.yml}} or > {{single-node/plaintext/docker-compose.yml}} Docker Compose configuration > files, depending on the test. The Docker containers were recreated from > scratch before each test. > A single topic was created with 30 partitions and with a replication factor > of either 1 or 3, depending on a single- or multi-node setup. > For each of the test runs these argument values were used: > * Message count: 100000 > * {{{}acks{}}}: 1 > * Message size: 128 bytes > A configuration file which contained a single configuration value of > {{group.protocol=<$group_protocol>}} was also provided to the test, where > {{$group_protocol}} was either {{CLASSIC}} or {{{}CONSUMER{}}}. > h3. Test results > h4. Test 1—{{{}CLASSIC{}}} group protocol, cluster size: 3 nodes, replication > factor: 3 > ||Metric||{{trunk}}||PR|| > |Average latency|1.4901|1.4871| > |50th percentile|1|1| > |99th percentile|3|3| > |99.9th percentile|6|6| > h4. Test 2—{{{}CONSUMER{}}} group protocol, cluster size: 3 nodes, > replication factor: 3 > ||Metric||{{trunk}}||PR|| > |Average latency|1.4704|1.4807| > |50th percentile|1|1| > |99th percentile|3|3| > |99.9th percentile|6|7| > h4. Test 3—{{{}CLASSIC{}}} group protocol, cluster size: 1 node, replication > factor: 1 > ||Metric||{{trunk}}||PR|| > |Average latency|1.0777|1.0193| > |50th percentile|1|1| > |99th percentile|2|2| > |99.9th percentile|5|4| > h4. Test 4—{{{}CONSUMER{}}} group protocol, cluster size: 1 node, replication > factor: 1 > ||Metric||{{trunk}}||PR|| > |Average latency|1.0937|1.0503| > |50th percentile|1|1| > |99th percentile|2|2| > |99.9th percentile|4|4| > h3. Conclusion > These tests did not reveal any significant differences between the current > fetcher logic on {{trunk}} and the one proposed in this PR. Addition test > runs using larger message counts and/or larger message sizes did not affect > the result. -- This message was sent by Atlassian Jira (v8.20.10#820010)