The broker and consumer are version 0.11.0.0.

2017-09-03 17:38 GMT+02:00 Jeff Widman <j...@jeffwidman.com>:

> What broker version are you testing with?
>
> On Sep 3, 2017 4:14 AM, "Stig Døssing" <generalbas....@gmail.com> wrote:
>
> > Hi,
> >
> > The documentation for KafkaConsumer.commitSync(Map) states that a
> > KafkaException will be thrown if the committed offset is invalid. I can't
> > seem to provoke this behavior, so I'd like clarification on whether this
> is
> > something the consumer is intended to do.
> >
> > Here's the snippet I'd expect would error out (running in a loop):
> > public long getEarliestOffset() {
> >             LOG.info("Current offset " + consumer.committed(new
> > TopicPartition(TOPIC, 0)));
> >             ConsumerRecords<String, String> records =
> consumer.poll(2000);
> >             consumer.seekToBeginning(consumer.assignment());
> >             consumer.commitSync(Collections.singletonMap(new
> > TopicPartition(TOPIC, 0), new OffsetAndMetadata(4_000_000L)));
> >             return consumer.position(new TopicPartition(TOPIC, 0));
> >         }
> >
> > The committed offset appears to be updated to 4.000.000 even though the
> > highest offset in that partition is ~6000. It also seems to work the
> other
> > way round, if I set the log retention such that offsets before 2000 are
> > deleted, I can still commit offset 0.
> >
> > I'm trying to clarify what the consumer is expected to do, because I'm
> > trying to figure out whether a consumer loop that is committing offsets
> > with commitSync(Map) for a partition where log deletion is enabled needs
> to
> > put a try-catch guard around the commit call in case the committed offset
> > has been deleted.
> >
> > Thanks,
> > Stig Rohde Døssing
> >
>

Reply via email to