Firstly, Thanks very much. I think what I am struggling with is how the
offsets are fetched based on LSO and high watermark. Also, if there’re are
non-transactional producers, this code will always work with offset-1 if I
want to get the last record written.

I cannot recall which admin command I can invoke to get the list of open
transactions (or, if there’s any at all) In this way I don’t have to keep
retrying. I also think this is very ambiguous and the documentation doesn’t
is make it any easier to explain. I suppose an improvement would be to get
the min offset from open transactions, and use that.

Thanks,



On Sat, 5 Sep 2020 at 11:49, adobewjl <wjl_is_...@163.com> wrote:

> hi, i just run you code.
>
>
>
>
>
> it works and take the data at endOffsets - 100 in the first round poll.
>
> then hang at second round of poll.
>
>
>
>
>
> this may happen if there no continuously records send to the topic.
>
> and you pass the poll timeout with an Long.MAX_VALUE. the poll will just
> for new record coming.
>
> and there is no newly data,so the poll hangs.
>
>
>
>
>
> and the second question.
>
> the seek() just specify where you what to consume on topic
>
> when you seek to somewhere. next time poll is just send request data at
> where you seek.
>
>
>
>
>
> if
>
> seek 1
>
> seek 20
>
>
>
>
>
> then
>
> poll will start from 20. because the latest seek is 20.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> At 2020-09-05 02:18:53, "M. Manna" <manme...@gmail.com> wrote:
>
> >Hello,
>
> >
>
> >During some tests, I can see that consumer.endOffsets() returns some valid
>
> >value, but cannot be polled definitely if seek() is called on that
>
> >consumer. In other words, poll hangs almost forever.
>
> >
>
> >My intention is to start consuming from the last successfully written data
>
> >on that TopicPartition. I am using this model because that data is written
>
> >by a different system and required for send a separate command somewhere
>
> >else.
>
> >
>
> >
>
> >Here is my code
>
> >
>
> >>
>
> >> // Prepare
>
> >>         List<String> topics = new ArrayList<>();
>
> >>         topics.add(topicString);
>
> >>
>
> >>         KafkaConsumer<String, String> consumer = new
>
> >> KafkaConsumer<>(prepareSimpleProperties());
>
> >>         TopicPartition topicPartition = new TopicPartition(topicString,
> 0);
>
> >>
>
> >>        // ASSIGN
>
> >
>
> >        Set<TopicPartition> assignments = consumer.assignment();
>
> >        if (assignments == null || !assignments.isEmpty() ||
>
> >!assignments.contains(topicPartition)) {
>
> >            consumer.assign(Collections.singleton(topicPartition));
>
> >        }
>
> >
>
> >// SEEK
>
> >        Map<TopicPartition, Long> topicPartitionOffsets =
>
> >consumer.endOffsets(Collections.singleton(topicPartition));
>
> >        if (topicPartitionOffsets != null) {
>
> >            for (Map.Entry<TopicPartition, Long> data:
>
> >topicPartitionOffsets.entrySet()) {
>
> >                Long desiredOffset = data.getValue()-100;
>
> >                consumer.seek(data.getKey(), desiredOffset < 0L ? 0L :
>
> >desiredOffset);
>
> >            }
>
> >        }
>
> >
>
> >// POLL
>
> >        try {
>
> >            while (true) {
>
> >                ConsumerRecords<String, String> records =
>
> >consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
>
> >                for (TopicPartition partition : records.partitions()) {
>
> >                    List<ConsumerRecord<String, String>> partitionRecords
> =
>
> >records.records(partition);
>
> >                    for (ConsumerRecord<String, String> record :
>
> >partitionRecords) {
>
> >                        System.out.println(record.offset() + ": " + new
>
> >String(record.value()));
>
> >                    }
>
> >                    long lastOffset =
>
> >partitionRecords.get(partitionRecords.size() - 1).offset();
>
> >                    consumer.commitSync();
>
> >                }
>
> >            }
>
> >        } finally {
>
> >            consumer.close();
>
> >        }
>
> >    }
>
> >
>
> >When I read the following from seek docs:
>
> >
>
> >*If this API is invoked for the same partition more than once, the latest
>
> >> offset will be used on the next poll(). Note that you may lose data if
> this
>
> >> API is arbitrarily used in the middle of consumption, to reset the fetch
>
> >> offsets.*
>
> >
>
> >
>
> >I am not sure I understood what it means by "Latest offset" and "more than
>
> >once". What's the best way to get this offset calculation to work
>
> >correctly? Also, is there a documentation that explains (with hopefully,
>
> >some visual aid) how LSO and smallest offset for open transaction.
>
> >Confluent or Apache Kafka any link is fine.
>
> >
>
> >I would really appreciate some explanation as I believe this could be
>
> >explained a bit better in official website.
>
> >
>
> >Regards,
>
>

Reply via email to