Hello, During some tests, I can see that consumer.endOffsets() returns some valid value, but cannot be polled definitely if seek() is called on that consumer. In other words, poll hangs almost forever.
My intention is to start consuming from the last successfully written data on that TopicPartition. I am using this model because that data is written by a different system and required for send a separate command somewhere else. Here is my code > > // Prepare > List<String> topics = new ArrayList<>(); > topics.add(topicString); > > KafkaConsumer<String, String> consumer = new > KafkaConsumer<>(prepareSimpleProperties()); > TopicPartition topicPartition = new TopicPartition(topicString, 0); > > // ASSIGN Set<TopicPartition> assignments = consumer.assignment(); if (assignments == null || !assignments.isEmpty() || !assignments.contains(topicPartition)) { consumer.assign(Collections.singleton(topicPartition)); } // SEEK Map<TopicPartition, Long> topicPartitionOffsets = consumer.endOffsets(Collections.singleton(topicPartition)); if (topicPartitionOffsets != null) { for (Map.Entry<TopicPartition, Long> data: topicPartitionOffsets.entrySet()) { Long desiredOffset = data.getValue()-100; consumer.seek(data.getKey(), desiredOffset < 0L ? 0L : desiredOffset); } } // POLL try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE)); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ": " + new String(record.value())); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(); } } } finally { consumer.close(); } } When I read the following from seek docs: *If this API is invoked for the same partition more than once, the latest > offset will be used on the next poll(). Note that you may lose data if this > API is arbitrarily used in the middle of consumption, to reset the fetch > offsets.* I am not sure I understood what it means by "Latest offset" and "more than once". What's the best way to get this offset calculation to work correctly? Also, is there a documentation that explains (with hopefully, some visual aid) how LSO and smallest offset for open transaction. Confluent or Apache Kafka any link is fine. I would really appreciate some explanation as I believe this could be explained a bit better in official website. Regards,