Hi,

Yes, you can use the offsetsForTimes() method.  See below for a simple
example that should get you started...

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.*;
import java.io.*;
import java.time.Duration;
import java.util.*;
import java.text.*;

public class searchByTime {
  static KafkaConsumer<String, String> c;

  public static void main(String args[]) throws Exception {
    Properties props = new Properties();
    props.put("bootstrap.servers","localhost:9092");
    props.put("max.poll.records",1);
    props.put("topic","yourtopicname");
    props.put("group.id",UUID.randomUUID().toString());
    props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");

    c = new KafkaConsumer<String, String>(props);
    String topic = (String)props.get("topic");
    c.subscribe(Collections.singletonList(topic));
    System.out.println("subscribed to topic " + topic);
    System.out.println(c.partitionsFor(topic));
    List<TopicPartition> partitions = new ArrayList<TopicPartition>();
    for (PartitionInfo p: c.partitionsFor(topic)) {
      partitions.add(new TopicPartition(topic,p.partition()));
    }
    System.out.println(partitions);

    long timestamp = Long.parseLong(args[0]);
    Map<TopicPartition, Long> partitionOffsetsRequest = new
HashMap<>(partitions.size());
    for (TopicPartition partition : partitions) {
      partitionOffsetsRequest.put(new TopicPartition(partition.topic(),
partition.partition()),
                                  timestamp);
    }

    final Map<TopicPartition, Long> result = new
HashMap<>(partitions.size());

    for (Map.Entry<TopicPartition, OffsetAndTimestamp> partitionToOffset :
      c.offsetsForTimes(partitionOffsetsRequest).entrySet()) {
        result.put(new TopicPartition(partitionToOffset.getKey().topic(),
partitionToOffset.getKey().partition()),
                                     (partitionToOffset.getValue() == null)
? null : partitionToOffset.getValue().offset());
      }

    System.out.println(result);
    ConsumerRecords<String, String> records = c.poll(Duration.ofSeconds(1));
    for (TopicPartition part: result.keySet()){
      long offset = result.get(part);
      c.seek(part,offset);
    }

    System.out.println("trying to get records...");
    records = c.poll(Duration.ofSeconds(1));
    for (ConsumerRecord<String, String> record : records) {
      Date date = new Date(record.timestamp());
      DateFormat formatter = new SimpleDateFormat("HH:mm:ss.SSS");
      formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
      String dateFormatted = formatter.format(date);
      System.out.println("Received message: (" + record.key() + ", " +
record.value() + ") at offset " + record.offset() + " at time " +
dateFormatted);
    }
  }
}

Thanks,

Steve


On Sat, Jan 23, 2021 at 6:14 AM M. Manna <manme...@gmail.com> wrote:

> Hello,
>
> We know that using KafkaConsumer api we can replay messages from certain
> offsets. However, we are not sure if we could specify timeStamp from which
> we could replay messages again.
>
> Does anyone know if this is possible?
>
> Regards,
>

Reply via email to