Firstly, Thanks very much. I think what I am struggling with is how the offsets are fetched based on LSO and high watermark. Also, if there’re are non-transactional producers, this code will always work with offset-1 if I want to get the last record written.
I cannot recall which admin command I can invoke to get the list of open transactions (or, if there’s any at all) In this way I don’t have to keep retrying. I also think this is very ambiguous and the documentation doesn’t is make it any easier to explain. I suppose an improvement would be to get the min offset from open transactions, and use that. Thanks, On Sat, 5 Sep 2020 at 11:49, adobewjl <wjl_is_...@163.com> wrote: > hi, i just run you code. > > > > > > it works and take the data at endOffsets - 100 in the first round poll. > > then hang at second round of poll. > > > > > > this may happen if there no continuously records send to the topic. > > and you pass the poll timeout with an Long.MAX_VALUE. the poll will just > for new record coming. > > and there is no newly data,so the poll hangs. > > > > > > and the second question. > > the seek() just specify where you what to consume on topic > > when you seek to somewhere. next time poll is just send request data at > where you seek. > > > > > > if > > seek 1 > > seek 20 > > > > > > then > > poll will start from 20. because the latest seek is 20. > > > > > > > > > > > > > > > > > > At 2020-09-05 02:18:53, "M. Manna" <manme...@gmail.com> wrote: > > >Hello, > > > > > >During some tests, I can see that consumer.endOffsets() returns some valid > > >value, but cannot be polled definitely if seek() is called on that > > >consumer. In other words, poll hangs almost forever. > > > > > >My intention is to start consuming from the last successfully written data > > >on that TopicPartition. I am using this model because that data is written > > >by a different system and required for send a separate command somewhere > > >else. > > > > > > > > >Here is my code > > > > > >> > > >> // Prepare > > >> List<String> topics = new ArrayList<>(); > > >> topics.add(topicString); > > >> > > >> KafkaConsumer<String, String> consumer = new > > >> KafkaConsumer<>(prepareSimpleProperties()); > > >> TopicPartition topicPartition = new TopicPartition(topicString, > 0); > > >> > > >> // ASSIGN > > > > > > Set<TopicPartition> assignments = consumer.assignment(); > > > if (assignments == null || !assignments.isEmpty() || > > >!assignments.contains(topicPartition)) { > > > consumer.assign(Collections.singleton(topicPartition)); > > > } > > > > > >// SEEK > > > Map<TopicPartition, Long> topicPartitionOffsets = > > >consumer.endOffsets(Collections.singleton(topicPartition)); > > > if (topicPartitionOffsets != null) { > > > for (Map.Entry<TopicPartition, Long> data: > > >topicPartitionOffsets.entrySet()) { > > > Long desiredOffset = data.getValue()-100; > > > consumer.seek(data.getKey(), desiredOffset < 0L ? 0L : > > >desiredOffset); > > > } > > > } > > > > > >// POLL > > > try { > > > while (true) { > > > ConsumerRecords<String, String> records = > > >consumer.poll(Duration.ofMillis(Long.MAX_VALUE)); > > > for (TopicPartition partition : records.partitions()) { > > > List<ConsumerRecord<String, String>> partitionRecords > = > > >records.records(partition); > > > for (ConsumerRecord<String, String> record : > > >partitionRecords) { > > > System.out.println(record.offset() + ": " + new > > >String(record.value())); > > > } > > > long lastOffset = > > >partitionRecords.get(partitionRecords.size() - 1).offset(); > > > consumer.commitSync(); > > > } > > > } > > > } finally { > > > consumer.close(); > > > } > > > } > > > > > >When I read the following from seek docs: > > > > > >*If this API is invoked for the same partition more than once, the latest > > >> offset will be used on the next poll(). Note that you may lose data if > this > > >> API is arbitrarily used in the middle of consumption, to reset the fetch > > >> offsets.* > > > > > > > > >I am not sure I understood what it means by "Latest offset" and "more than > > >once". What's the best way to get this offset calculation to work > > >correctly? Also, is there a documentation that explains (with hopefully, > > >some visual aid) how LSO and smallest offset for open transaction. > > >Confluent or Apache Kafka any link is fine. > > > > > >I would really appreciate some explanation as I believe this could be > > >explained a bit better in official website. > > > > > >Regards, > >