What broker version are you testing with?

On Sep 3, 2017 4:14 AM, "Stig Døssing" <generalbas....@gmail.com> wrote:

> Hi,
>
> The documentation for KafkaConsumer.commitSync(Map) states that a
> KafkaException will be thrown if the committed offset is invalid. I can't
> seem to provoke this behavior, so I'd like clarification on whether this is
> something the consumer is intended to do.
>
> Here's the snippet I'd expect would error out (running in a loop):
> public long getEarliestOffset() {
>             LOG.info("Current offset " + consumer.committed(new
> TopicPartition(TOPIC, 0)));
>             ConsumerRecords<String, String> records = consumer.poll(2000);
>             consumer.seekToBeginning(consumer.assignment());
>             consumer.commitSync(Collections.singletonMap(new
> TopicPartition(TOPIC, 0), new OffsetAndMetadata(4_000_000L)));
>             return consumer.position(new TopicPartition(TOPIC, 0));
>         }
>
> The committed offset appears to be updated to 4.000.000 even though the
> highest offset in that partition is ~6000. It also seems to work the other
> way round, if I set the log retention such that offsets before 2000 are
> deleted, I can still commit offset 0.
>
> I'm trying to clarify what the consumer is expected to do, because I'm
> trying to figure out whether a consumer loop that is committing offsets
> with commitSync(Map) for a partition where log deletion is enabled needs to
> put a try-catch guard around the commit call in case the committed offset
> has been deleted.
>
> Thanks,
> Stig Rohde Døssing
>

Reply via email to