Also, I realised that endOffsets-100 was a debugging step for me. I
originally had endOffset - 1. and that was polling for ever. Hence, my
previous comment on LSO, high watermark, and open transactions.



On Sat, 5 Sep 2020 at 12:08, M. Manna <manme...@gmail.com> wrote:

>
> Firstly, Thanks very much. I think what I am struggling with is how the
> offsets are fetched based on LSO and high watermark. Also, if there’re are
> non-transactional producers, this code will always work with offset-1 if I
> want to get the last record written.
>
> I cannot recall which admin command I can invoke to get the list of open
> transactions (or, if there’s any at all) In this way I don’t have to keep
> retrying. I also think this is very ambiguous and the documentation doesn’t
> is make it any easier to explain. I suppose an improvement would be to get
> the min offset from open transactions, and use that.
>
> Thanks,
>
>
>
> On Sat, 5 Sep 2020 at 11:49, adobewjl <wjl_is_...@163.com> wrote:
>
>> hi, i just run you code.
>>
>>
>>
>>
>>
>> it works and take the data at endOffsets - 100 in the first round poll.
>>
>> then hang at second round of poll.
>>
>>
>>
>>
>>
>> this may happen if there no continuously records send to the topic.
>>
>> and you pass the poll timeout with an Long.MAX_VALUE. the poll will just
>> for new record coming.
>>
>> and there is no newly data,so the poll hangs.
>>
>>
>>
>>
>>
>> and the second question.
>>
>> the seek() just specify where you what to consume on topic
>>
>> when you seek to somewhere. next time poll is just send request data at
>> where you seek.
>>
>>
>>
>>
>>
>> if
>>
>> seek 1
>>
>> seek 20
>>
>>
>>
>>
>>
>> then
>>
>> poll will start from 20. because the latest seek is 20.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> At 2020-09-05 02:18:53, "M. Manna" <manme...@gmail.com> wrote:
>>
>> >Hello,
>>
>> >
>>
>> >During some tests, I can see that consumer.endOffsets() returns some
>> valid
>>
>> >value, but cannot be polled definitely if seek() is called on that
>>
>> >consumer. In other words, poll hangs almost forever.
>>
>> >
>>
>> >My intention is to start consuming from the last successfully written
>> data
>>
>> >on that TopicPartition. I am using this model because that data is
>> written
>>
>> >by a different system and required for send a separate command somewhere
>>
>> >else.
>>
>> >
>>
>> >
>>
>> >Here is my code
>>
>> >
>>
>> >>
>>
>> >> // Prepare
>>
>> >>         List<String> topics = new ArrayList<>();
>>
>> >>         topics.add(topicString);
>>
>> >>
>>
>> >>         KafkaConsumer<String, String> consumer = new
>>
>> >> KafkaConsumer<>(prepareSimpleProperties());
>>
>> >>         TopicPartition topicPartition = new
>> TopicPartition(topicString, 0);
>>
>> >>
>>
>> >>        // ASSIGN
>>
>> >
>>
>> >        Set<TopicPartition> assignments = consumer.assignment();
>>
>> >        if (assignments == null || !assignments.isEmpty() ||
>>
>> >!assignments.contains(topicPartition)) {
>>
>> >            consumer.assign(Collections.singleton(topicPartition));
>>
>> >        }
>>
>> >
>>
>> >// SEEK
>>
>> >        Map<TopicPartition, Long> topicPartitionOffsets =
>>
>> >consumer.endOffsets(Collections.singleton(topicPartition));
>>
>> >        if (topicPartitionOffsets != null) {
>>
>> >            for (Map.Entry<TopicPartition, Long> data:
>>
>> >topicPartitionOffsets.entrySet()) {
>>
>> >                Long desiredOffset = data.getValue()-100;
>>
>> >                consumer.seek(data.getKey(), desiredOffset < 0L ? 0L :
>>
>> >desiredOffset);
>>
>> >            }
>>
>> >        }
>>
>> >
>>
>> >// POLL
>>
>> >        try {
>>
>> >            while (true) {
>>
>> >                ConsumerRecords<String, String> records =
>>
>> >consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
>>
>> >                for (TopicPartition partition : records.partitions()) {
>>
>> >                    List<ConsumerRecord<String, String>>
>> partitionRecords =
>>
>> >records.records(partition);
>>
>> >                    for (ConsumerRecord<String, String> record :
>>
>> >partitionRecords) {
>>
>> >                        System.out.println(record.offset() + ": " + new
>>
>> >String(record.value()));
>>
>> >                    }
>>
>> >                    long lastOffset =
>>
>> >partitionRecords.get(partitionRecords.size() - 1).offset();
>>
>> >                    consumer.commitSync();
>>
>> >                }
>>
>> >            }
>>
>> >        } finally {
>>
>> >            consumer.close();
>>
>> >        }
>>
>> >    }
>>
>> >
>>
>> >When I read the following from seek docs:
>>
>> >
>>
>> >*If this API is invoked for the same partition more than once, the latest
>>
>> >> offset will be used on the next poll(). Note that you may lose data if
>> this
>>
>> >> API is arbitrarily used in the middle of consumption, to reset the
>> fetch
>>
>> >> offsets.*
>>
>> >
>>
>> >
>>
>> >I am not sure I understood what it means by "Latest offset" and "more
>> than
>>
>> >once". What's the best way to get this offset calculation to work
>>
>> >correctly? Also, is there a documentation that explains (with hopefully,
>>
>> >some visual aid) how LSO and smallest offset for open transaction.
>>
>> >Confluent or Apache Kafka any link is fine.
>>
>> >
>>
>> >I would really appreciate some explanation as I believe this could be
>>
>> >explained a bit better in official website.
>>
>> >
>>
>> >Regards,
>>
>>
>
>

Reply via email to