The broker and consumer are version 0.11.0.0. 2017-09-03 17:38 GMT+02:00 Jeff Widman <j...@jeffwidman.com>:
> What broker version are you testing with? > > On Sep 3, 2017 4:14 AM, "Stig Døssing" <generalbas....@gmail.com> wrote: > > > Hi, > > > > The documentation for KafkaConsumer.commitSync(Map) states that a > > KafkaException will be thrown if the committed offset is invalid. I can't > > seem to provoke this behavior, so I'd like clarification on whether this > is > > something the consumer is intended to do. > > > > Here's the snippet I'd expect would error out (running in a loop): > > public long getEarliestOffset() { > > LOG.info("Current offset " + consumer.committed(new > > TopicPartition(TOPIC, 0))); > > ConsumerRecords<String, String> records = > consumer.poll(2000); > > consumer.seekToBeginning(consumer.assignment()); > > consumer.commitSync(Collections.singletonMap(new > > TopicPartition(TOPIC, 0), new OffsetAndMetadata(4_000_000L))); > > return consumer.position(new TopicPartition(TOPIC, 0)); > > } > > > > The committed offset appears to be updated to 4.000.000 even though the > > highest offset in that partition is ~6000. It also seems to work the > other > > way round, if I set the log retention such that offsets before 2000 are > > deleted, I can still commit offset 0. > > > > I'm trying to clarify what the consumer is expected to do, because I'm > > trying to figure out whether a consumer loop that is committing offsets > > with commitSync(Map) for a partition where log deletion is enabled needs > to > > put a try-catch guard around the commit call in case the committed offset > > has been deleted. > > > > Thanks, > > Stig Rohde Døssing > > >