Hello,

During some tests, I can see that consumer.endOffsets() returns some valid
value, but cannot be polled definitely if seek() is called on that
consumer. In other words, poll hangs almost forever.

My intention is to start consuming from the last successfully written data
on that TopicPartition. I am using this model because that data is written
by a different system and required for send a separate command somewhere
else.


Here is my code

>
> // Prepare
>         List<String> topics = new ArrayList<>();
>         topics.add(topicString);
>
>         KafkaConsumer<String, String> consumer = new
> KafkaConsumer<>(prepareSimpleProperties());
>         TopicPartition topicPartition = new TopicPartition(topicString, 0);
>
>        // ASSIGN

        Set<TopicPartition> assignments = consumer.assignment();
        if (assignments == null || !assignments.isEmpty() ||
!assignments.contains(topicPartition)) {
            consumer.assign(Collections.singleton(topicPartition));
        }

// SEEK
        Map<TopicPartition, Long> topicPartitionOffsets =
consumer.endOffsets(Collections.singleton(topicPartition));
        if (topicPartitionOffsets != null) {
            for (Map.Entry<TopicPartition, Long> data:
topicPartitionOffsets.entrySet()) {
                Long desiredOffset = data.getValue()-100;
                consumer.seek(data.getKey(), desiredOffset < 0L ? 0L :
desiredOffset);
            }
        }

// POLL
        try {
            while (true) {
                ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                for (TopicPartition partition : records.partitions()) {
                    List<ConsumerRecord<String, String>> partitionRecords =
records.records(partition);
                    for (ConsumerRecord<String, String> record :
partitionRecords) {
                        System.out.println(record.offset() + ": " + new
String(record.value()));
                    }
                    long lastOffset =
partitionRecords.get(partitionRecords.size() - 1).offset();
                    consumer.commitSync();
                }
            }
        } finally {
            consumer.close();
        }
    }

When I read the following from seek docs:

*If this API is invoked for the same partition more than once, the latest
> offset will be used on the next poll(). Note that you may lose data if this
> API is arbitrarily used in the middle of consumption, to reset the fetch
> offsets.*


I am not sure I understood what it means by "Latest offset" and "more than
once". What's the best way to get this offset calculation to work
correctly? Also, is there a documentation that explains (with hopefully,
some visual aid) how LSO and smallest offset for open transaction.
Confluent or Apache Kafka any link is fine.

I would really appreciate some explanation as I believe this could be
explained a bit better in official website.

Regards,

Reply via email to