For #3, a better example would be in ConsumerCoordinator (around line 632).
commitOffsetsAsync(allConsumedOffsets, new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { FYI On Mon, Feb 19, 2018 at 10:56 AM, Gabriel Giussi <gabrielgiu...@gmail.com> wrote: > Hi Ted, > my mistake was believe that commited offsets are used on the next poll, but > is not the case > <https://github.com/apache/kafka/blob/73be1e1168f91ee2a9d68e1d1c75c1 > 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/ > consumer/KafkaConsumer.java#L1202> > . > > > The offsets committed using this API will be used on the first fetch > after > > every rebalance and also on startup > > > > So, what to do after a failed commit depends on the nature of the exception > I guess. > > - WakeupException: retry > - Others: close consumer > > Thanks for your help to solve #2. I'm wondering about 1# and 3# yet. > > 2018-02-19 11:46 GMT-03:00 Ted Yu <yuzhih...@gmail.com>: > > > For #2, I think the assumption is that the records are processed by the > > loop: > > > > https://github.com/apache/kafka/blob/73be1e1168f91ee2a9d68e1d1c75c1 > > 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/ > > consumer/MockConsumer.java#L164 > > > > > > > > On Mon, Feb 19, 2018 at 4:39 AM, Gabriel Giussi <gabrielgiu...@gmail.com > > > > wrote: > > > > > Hi, > > > > > > I'm trying to use MockConsumer to test my application code but I've > > faced a > > > couple of limitations and I want to know if there are workarounds or > > > something that I'm overlooking. > > > Note: I'm using kafka-clients v 0.11.0.2 > > > > > > > > > 1. Why the addRecord > > > <https://github.com/apache/kafka/blob/ > 73be1e1168f91ee2a9d68e1d1c75c1 > > > 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/ > > > consumer/MockConsumer.java#L179> > > > requires that the consumer has assigned partitions? Given that this > is > > > just > > > simulating records being produced or existing records. > > > 2. Why the poll > > > <https://github.com/apache/kafka/blob/ > 73be1e1168f91ee2a9d68e1d1c75c1 > > > 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/ > > > consumer/MockConsumer.java#L132> > > > clear the map of records? It should not be cleared after commit? > > > 3. Why the commitAsync > > > <https://github.com/apache/kafka/blob/ > 73be1e1168f91ee2a9d68e1d1c75c1 > > > 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/ > > > consumer/MockConsumer.java#L198> > > > doesn't check for an exception and always succeed? > > > > > > Due to items (2) and (3) I'm not be able to test scenarios where the > > > commits fails and the consumer should poll again the same elements. > > > > > > If someone knows about other scenarios that can't be tested with > > > MockConsumer, please let me know. > > > > > > Thanks. > > > > > >