Also, I realised that endOffsets-100 was a debugging step for me. I originally had endOffset - 1. and that was polling for ever. Hence, my previous comment on LSO, high watermark, and open transactions.
On Sat, 5 Sep 2020 at 12:08, M. Manna <manme...@gmail.com> wrote: > > Firstly, Thanks very much. I think what I am struggling with is how the > offsets are fetched based on LSO and high watermark. Also, if there’re are > non-transactional producers, this code will always work with offset-1 if I > want to get the last record written. > > I cannot recall which admin command I can invoke to get the list of open > transactions (or, if there’s any at all) In this way I don’t have to keep > retrying. I also think this is very ambiguous and the documentation doesn’t > is make it any easier to explain. I suppose an improvement would be to get > the min offset from open transactions, and use that. > > Thanks, > > > > On Sat, 5 Sep 2020 at 11:49, adobewjl <wjl_is_...@163.com> wrote: > >> hi, i just run you code. >> >> >> >> >> >> it works and take the data at endOffsets - 100 in the first round poll. >> >> then hang at second round of poll. >> >> >> >> >> >> this may happen if there no continuously records send to the topic. >> >> and you pass the poll timeout with an Long.MAX_VALUE. the poll will just >> for new record coming. >> >> and there is no newly data,so the poll hangs. >> >> >> >> >> >> and the second question. >> >> the seek() just specify where you what to consume on topic >> >> when you seek to somewhere. next time poll is just send request data at >> where you seek. >> >> >> >> >> >> if >> >> seek 1 >> >> seek 20 >> >> >> >> >> >> then >> >> poll will start from 20. because the latest seek is 20. >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> At 2020-09-05 02:18:53, "M. Manna" <manme...@gmail.com> wrote: >> >> >Hello, >> >> > >> >> >During some tests, I can see that consumer.endOffsets() returns some >> valid >> >> >value, but cannot be polled definitely if seek() is called on that >> >> >consumer. In other words, poll hangs almost forever. >> >> > >> >> >My intention is to start consuming from the last successfully written >> data >> >> >on that TopicPartition. I am using this model because that data is >> written >> >> >by a different system and required for send a separate command somewhere >> >> >else. >> >> > >> >> > >> >> >Here is my code >> >> > >> >> >> >> >> >> // Prepare >> >> >> List<String> topics = new ArrayList<>(); >> >> >> topics.add(topicString); >> >> >> >> >> >> KafkaConsumer<String, String> consumer = new >> >> >> KafkaConsumer<>(prepareSimpleProperties()); >> >> >> TopicPartition topicPartition = new >> TopicPartition(topicString, 0); >> >> >> >> >> >> // ASSIGN >> >> > >> >> > Set<TopicPartition> assignments = consumer.assignment(); >> >> > if (assignments == null || !assignments.isEmpty() || >> >> >!assignments.contains(topicPartition)) { >> >> > consumer.assign(Collections.singleton(topicPartition)); >> >> > } >> >> > >> >> >// SEEK >> >> > Map<TopicPartition, Long> topicPartitionOffsets = >> >> >consumer.endOffsets(Collections.singleton(topicPartition)); >> >> > if (topicPartitionOffsets != null) { >> >> > for (Map.Entry<TopicPartition, Long> data: >> >> >topicPartitionOffsets.entrySet()) { >> >> > Long desiredOffset = data.getValue()-100; >> >> > consumer.seek(data.getKey(), desiredOffset < 0L ? 0L : >> >> >desiredOffset); >> >> > } >> >> > } >> >> > >> >> >// POLL >> >> > try { >> >> > while (true) { >> >> > ConsumerRecords<String, String> records = >> >> >consumer.poll(Duration.ofMillis(Long.MAX_VALUE)); >> >> > for (TopicPartition partition : records.partitions()) { >> >> > List<ConsumerRecord<String, String>> >> partitionRecords = >> >> >records.records(partition); >> >> > for (ConsumerRecord<String, String> record : >> >> >partitionRecords) { >> >> > System.out.println(record.offset() + ": " + new >> >> >String(record.value())); >> >> > } >> >> > long lastOffset = >> >> >partitionRecords.get(partitionRecords.size() - 1).offset(); >> >> > consumer.commitSync(); >> >> > } >> >> > } >> >> > } finally { >> >> > consumer.close(); >> >> > } >> >> > } >> >> > >> >> >When I read the following from seek docs: >> >> > >> >> >*If this API is invoked for the same partition more than once, the latest >> >> >> offset will be used on the next poll(). Note that you may lose data if >> this >> >> >> API is arbitrarily used in the middle of consumption, to reset the >> fetch >> >> >> offsets.* >> >> > >> >> > >> >> >I am not sure I understood what it means by "Latest offset" and "more >> than >> >> >once". What's the best way to get this offset calculation to work >> >> >correctly? Also, is there a documentation that explains (with hopefully, >> >> >some visual aid) how LSO and smallest offset for open transaction. >> >> >Confluent or Apache Kafka any link is fine. >> >> > >> >> >I would really appreciate some explanation as I believe this could be >> >> >explained a bit better in official website. >> >> > >> >> >Regards, >> >> > >