Hi all! I need to read offsets closing to specified timestamp. As I can see this can be achieved by using SImpleConsumer API. To test things I use SimpleConsumer example provided on site: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
I use Kafka 8.2.1 When I pass -1 or -2 (latest or earliest) as time to getOffsetsBefore - it worx fine: returns 1562 and 73794 accordingly When I pass System.currentTimeMillis() as time: returns 1562 - same as earliest When I pass System.currentTimeMillis() - 10*60*1000 - it returns 1562 When I pass System.currentTimeMillis() - 200*60*1000 - it returns 0 Here is code snippet: ***************** public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception { System.out.println("a_maxReads = [" + a_maxReads + "], a_topic = [" + a_topic + "], a_partition = [" + a_partition + "], a_seedBrokers = [" + a_seedBrokers + "], a_port = [" + a_port + "]"); // find the meta data about the topic and partition we are interested in // PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition); if (metadata == null) { System.out.println("Can't find metadata for Topic and Partition. Exiting"); return; } if (metadata.leader() == null) { System.out.println("Can't find Leader for Topic and Partition. Exiting"); return; } String leadBroker = metadata.leader().host(); String clientName = "Client_" + a_topic + "_" + a_partition; SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); long readOffset = getLastOffset( consumer, a_topic, a_partition, System.currentTimeMillis() - 10 * 60 * 1000, clientName); System.out.println("readOffset = " + readOffset); } public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { System.out.println("consumer = [" + consumer + "], topic = [" + topic + "], partition = [" + partition + "], whichTime = [" + whichTime + "], clientName = [" + clientName + "]"); TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest( requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition)); return 0; } long[] offsets = response.offsets(topic, partition); if (offsets.length == 0) { return 0; } return offsets[0]; } ************************************ How can I get more or less accurate offset values close to specified timestamp? Thanx!