For #3, a better example would be in ConsumerCoordinator (around line 632).

        commitOffsetsAsync(allConsumedOffsets, new OffsetCommitCallback() {
            @Override
            public void onComplete(Map<TopicPartition, OffsetAndMetadata>
offsets, Exception exception) {

FYI

On Mon, Feb 19, 2018 at 10:56 AM, Gabriel Giussi <gabrielgiu...@gmail.com>
wrote:

> Hi Ted,
> my mistake was believe that commited offsets are used on the next poll, but
> is not the case
> <https://github.com/apache/kafka/blob/73be1e1168f91ee2a9d68e1d1c75c1
> 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/
> consumer/KafkaConsumer.java#L1202>
> .
>
> > The offsets committed using this API will be used on the first fetch
> after
> > every rebalance and also on startup
> >
>
> So, what to do after a failed commit depends on the nature of the exception
> I guess.
>
>    - WakeupException: retry
>    - Others: close consumer
>
> Thanks for your help to solve #2. I'm wondering about 1# and 3# yet.
>
> 2018-02-19 11:46 GMT-03:00 Ted Yu <yuzhih...@gmail.com>:
>
> > For #2, I think the assumption is that the records are processed by the
> > loop:
> >
> > https://github.com/apache/kafka/blob/73be1e1168f91ee2a9d68e1d1c75c1
> > 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/
> > consumer/MockConsumer.java#L164
> >
> >
> >
> > On Mon, Feb 19, 2018 at 4:39 AM, Gabriel Giussi <gabrielgiu...@gmail.com
> >
> > wrote:
> >
> > > Hi,
> > >
> > > I'm trying to use MockConsumer to test my application code but I've
> > faced a
> > > couple of limitations and I want to know if there are workarounds or
> > > something that I'm overlooking.
> > > Note: I'm using kafka-clients v 0.11.0.2
> > >
> > >
> > >    1. Why the addRecord
> > >    <https://github.com/apache/kafka/blob/
> 73be1e1168f91ee2a9d68e1d1c75c1
> > > 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/
> > > consumer/MockConsumer.java#L179>
> > >    requires that the consumer has assigned partitions? Given that this
> is
> > > just
> > >    simulating records being produced or existing records.
> > >    2. Why the poll
> > >    <https://github.com/apache/kafka/blob/
> 73be1e1168f91ee2a9d68e1d1c75c1
> > > 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/
> > > consumer/MockConsumer.java#L132>
> > >    clear the map of records? It should not be cleared after commit?
> > >    3. Why the commitAsync
> > >    <https://github.com/apache/kafka/blob/
> 73be1e1168f91ee2a9d68e1d1c75c1
> > > 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/
> > > consumer/MockConsumer.java#L198>
> > >    doesn't check for an exception and always succeed?
> > >
> > > Due to items (2) and (3) I'm not be able to test scenarios where the
> > > commits fails and the consumer should poll again the same elements.
> > >
> > > If someone knows about other scenarios that can't be tested with
> > > MockConsumer, please let me know.
> > >
> > > Thanks.
> > >
> >
>

Reply via email to