Ivan Bondarenko created KAFKA-12516: ---------------------------------------
Summary: Standartize KafkaConsumer's commitSync/commitAsync/committed Key: KAFKA-12516 URL: https://issues.apache.org/jira/browse/KAFKA-12516 Project: Kafka Issue Type: Improvement Components: consumer Affects Versions: 2.7.0 Reporter: Ivan Bondarenko Commit-related methods of KafkaConsumer are kind of asymmetrical. The main axis of asymmetry is (let's call it "MAP"): {code:java} Map<TopicPartition, OffsetAndMetadata> {code} To illustrate asymmetry let's put methods to table (skipping deprecated and those having Duration to reduce table). Before that, let's name "SAC" what is returned by code: {code:java} subscriptions.allConsumed() {code} So the table is... ||name||params||returns||is SAC used?||can we get MAP?|| |commited|{color:#de350b}Set<TP>{color}|MAP|no|via return| |commitSync|--|--|used|{color:#de350b}no{color}| |commitSync|MAP|--|no|no| |commitAsync|--|--|no|no| |commitAsync|callback|--|used|SAC via callback| |commitAsync|MAP, callback|--|no|no| What is asymmetric here... 1. *Main problem.* SAC can be retrieved only by 1 async method. So without workarounds we cannot just sync-commit (without parameters) and see what was committed. To solve this we can (multiple solutions can be applied): * Introduce new method like "getAllConsumed()" which will return SAC. * commitSync() without params can return SAC instead of void. 2. *Minor.* There is no "commited()" without params, so we need to create a Set manually. Solutions: * We can add "commited()" without params. In this case the default set will be constructed based on one of: ** SAC ** all partitions for subscribed topic ** assignment 3. *Looks like minor bug.* This peace of code is used in all commitAsync() methods, but not in all commitSync() methods, it doesn't look like on purpose: {code:java} offsets.forEach(this::updateLastSeenEpochIfNewer); {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)