Thanks. Just realised that it was in the API since 0.11.0. Thanks Steve.

On Sat, 23 Jan 2021 at 12:42, Steve Howard <steve.how...@confluent.io>
wrote:

> Hi,
>
> Yes, you can use the offsetsForTimes() method.  See below for a simple
> example that should get you started...
>
> import org.apache.kafka.clients.consumer.*;
> import org.apache.kafka.common.config.ConfigException;
> import org.apache.kafka.common.*;
> import java.io.*;
> import java.time.Duration;
> import java.util.*;
> import java.text.*;
>
> public class searchByTime {
>   static KafkaConsumer<String, String> c;
>
>   public static void main(String args[]) throws Exception {
>     Properties props = new Properties();
>     props.put("bootstrap.servers","localhost:9092");
>     props.put("max.poll.records",1);
>     props.put("topic","yourtopicname");
>     props.put("group.id",UUID.randomUUID().toString());
>     props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>     props.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>
>     c = new KafkaConsumer<String, String>(props);
>     String topic = (String)props.get("topic");
>     c.subscribe(Collections.singletonList(topic));
>     System.out.println("subscribed to topic " + topic);
>     System.out.println(c.partitionsFor(topic));
>     List<TopicPartition> partitions = new ArrayList<TopicPartition>();
>     for (PartitionInfo p: c.partitionsFor(topic)) {
>       partitions.add(new TopicPartition(topic,p.partition()));
>     }
>     System.out.println(partitions);
>
>     long timestamp = Long.parseLong(args[0]);
>     Map<TopicPartition, Long> partitionOffsetsRequest = new
> HashMap<>(partitions.size());
>     for (TopicPartition partition : partitions) {
>       partitionOffsetsRequest.put(new TopicPartition(partition.topic(),
> partition.partition()),
>                                   timestamp);
>     }
>
>     final Map<TopicPartition, Long> result = new
> HashMap<>(partitions.size());
>
>     for (Map.Entry<TopicPartition, OffsetAndTimestamp> partitionToOffset :
>       c.offsetsForTimes(partitionOffsetsRequest).entrySet()) {
>         result.put(new TopicPartition(partitionToOffset.getKey().topic(),
> partitionToOffset.getKey().partition()),
>                                      (partitionToOffset.getValue() == null)
> ? null : partitionToOffset.getValue().offset());
>       }
>
>     System.out.println(result);
>     ConsumerRecords<String, String> records =
> c.poll(Duration.ofSeconds(1));
>     for (TopicPartition part: result.keySet()){
>       long offset = result.get(part);
>       c.seek(part,offset);
>     }
>
>     System.out.println("trying to get records...");
>     records = c.poll(Duration.ofSeconds(1));
>     for (ConsumerRecord<String, String> record : records) {
>       Date date = new Date(record.timestamp());
>       DateFormat formatter = new SimpleDateFormat("HH:mm:ss.SSS");
>       formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
>       String dateFormatted = formatter.format(date);
>       System.out.println("Received message: (" + record.key() + ", " +
> record.value() + ") at offset " + record.offset() + " at time " +
> dateFormatted);
>     }
>   }
> }
>
> Thanks,
>
> Steve
>
>
> On Sat, Jan 23, 2021 at 6:14 AM M. Manna <manme...@gmail.com> wrote:
>
> > Hello,
> >
> > We know that using KafkaConsumer api we can replay messages from certain
> > offsets. However, we are not sure if we could specify timeStamp from
> which
> > we could replay messages again.
> >
> > Does anyone know if this is possible?
> >
> > Regards,
> >
>

Reply via email to