Hi,

The documentation for KafkaConsumer.commitSync(Map) states that a
KafkaException will be thrown if the committed offset is invalid. I can't
seem to provoke this behavior, so I'd like clarification on whether this is
something the consumer is intended to do.

Here's the snippet I'd expect would error out (running in a loop):
public long getEarliestOffset() {
            LOG.info("Current offset " + consumer.committed(new
TopicPartition(TOPIC, 0)));
            ConsumerRecords<String, String> records = consumer.poll(2000);
            consumer.seekToBeginning(consumer.assignment());
            consumer.commitSync(Collections.singletonMap(new
TopicPartition(TOPIC, 0), new OffsetAndMetadata(4_000_000L)));
            return consumer.position(new TopicPartition(TOPIC, 0));
        }

The committed offset appears to be updated to 4.000.000 even though the
highest offset in that partition is ~6000. It also seems to work the other
way round, if I set the log retention such that offsets before 2000 are
deleted, I can still commit offset 0.

I'm trying to clarify what the consumer is expected to do, because I'm
trying to figure out whether a consumer loop that is committing offsets
with commitSync(Map) for a partition where log deletion is enabled needs to
put a try-catch guard around the commit call in case the committed offset
has been deleted.

Thanks,
Stig Rohde Døssing

Reply via email to