hi, i just run you code.

it works and take the data at endOffsets - 100 in the first round poll. 
then hang at second round of poll.


this may happen if there no continuously records send to the topic.
and you pass the poll timeout with an Long.MAX_VALUE. the poll will just for 
new record coming.
and there is no newly data,so the poll hangs.


and the second question.
the seek() just specify where you what to consume on topic
when you seek to somewhere. next time poll is just send request data at where 
you seek.


if
seek 1
seek 20


then
poll will start from 20. because the latest seek is 20.








At 2020-09-05 02:18:53, "M. Manna" <manme...@gmail.com> wrote:
>Hello,
>
>During some tests, I can see that consumer.endOffsets() returns some valid
>value, but cannot be polled definitely if seek() is called on that
>consumer. In other words, poll hangs almost forever.
>
>My intention is to start consuming from the last successfully written data
>on that TopicPartition. I am using this model because that data is written
>by a different system and required for send a separate command somewhere
>else.
>
>
>Here is my code
>
>>
>> // Prepare
>>         List<String> topics = new ArrayList<>();
>>         topics.add(topicString);
>>
>>         KafkaConsumer<String, String> consumer = new
>> KafkaConsumer<>(prepareSimpleProperties());
>>         TopicPartition topicPartition = new TopicPartition(topicString, 0);
>>
>>        // ASSIGN
>
>        Set<TopicPartition> assignments = consumer.assignment();
>        if (assignments == null || !assignments.isEmpty() ||
>!assignments.contains(topicPartition)) {
>            consumer.assign(Collections.singleton(topicPartition));
>        }
>
>// SEEK
>        Map<TopicPartition, Long> topicPartitionOffsets =
>consumer.endOffsets(Collections.singleton(topicPartition));
>        if (topicPartitionOffsets != null) {
>            for (Map.Entry<TopicPartition, Long> data:
>topicPartitionOffsets.entrySet()) {
>                Long desiredOffset = data.getValue()-100;
>                consumer.seek(data.getKey(), desiredOffset < 0L ? 0L :
>desiredOffset);
>            }
>        }
>
>// POLL
>        try {
>            while (true) {
>                ConsumerRecords<String, String> records =
>consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
>                for (TopicPartition partition : records.partitions()) {
>                    List<ConsumerRecord<String, String>> partitionRecords =
>records.records(partition);
>                    for (ConsumerRecord<String, String> record :
>partitionRecords) {
>                        System.out.println(record.offset() + ": " + new
>String(record.value()));
>                    }
>                    long lastOffset =
>partitionRecords.get(partitionRecords.size() - 1).offset();
>                    consumer.commitSync();
>                }
>            }
>        } finally {
>            consumer.close();
>        }
>    }
>
>When I read the following from seek docs:
>
>*If this API is invoked for the same partition more than once, the latest
>> offset will be used on the next poll(). Note that you may lose data if this
>> API is arbitrarily used in the middle of consumption, to reset the fetch
>> offsets.*
>
>
>I am not sure I understood what it means by "Latest offset" and "more than
>once". What's the best way to get this offset calculation to work
>correctly? Also, is there a documentation that explains (with hopefully,
>some visual aid) how LSO and smallest offset for open transaction.
>Confluent or Apache Kafka any link is fine.
>
>I would really appreciate some explanation as I believe this could be
>explained a bit better in official website.
>
>Regards,

Reply via email to