Thanks. Just realised that it was in the API since 0.11.0. Thanks Steve. On Sat, 23 Jan 2021 at 12:42, Steve Howard <steve.how...@confluent.io> wrote:
> Hi, > > Yes, you can use the offsetsForTimes() method. See below for a simple > example that should get you started... > > import org.apache.kafka.clients.consumer.*; > import org.apache.kafka.common.config.ConfigException; > import org.apache.kafka.common.*; > import java.io.*; > import java.time.Duration; > import java.util.*; > import java.text.*; > > public class searchByTime { > static KafkaConsumer<String, String> c; > > public static void main(String args[]) throws Exception { > Properties props = new Properties(); > props.put("bootstrap.servers","localhost:9092"); > props.put("max.poll.records",1); > props.put("topic","yourtopicname"); > props.put("group.id",UUID.randomUUID().toString()); > props.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > > c = new KafkaConsumer<String, String>(props); > String topic = (String)props.get("topic"); > c.subscribe(Collections.singletonList(topic)); > System.out.println("subscribed to topic " + topic); > System.out.println(c.partitionsFor(topic)); > List<TopicPartition> partitions = new ArrayList<TopicPartition>(); > for (PartitionInfo p: c.partitionsFor(topic)) { > partitions.add(new TopicPartition(topic,p.partition())); > } > System.out.println(partitions); > > long timestamp = Long.parseLong(args[0]); > Map<TopicPartition, Long> partitionOffsetsRequest = new > HashMap<>(partitions.size()); > for (TopicPartition partition : partitions) { > partitionOffsetsRequest.put(new TopicPartition(partition.topic(), > partition.partition()), > timestamp); > } > > final Map<TopicPartition, Long> result = new > HashMap<>(partitions.size()); > > for (Map.Entry<TopicPartition, OffsetAndTimestamp> partitionToOffset : > c.offsetsForTimes(partitionOffsetsRequest).entrySet()) { > result.put(new TopicPartition(partitionToOffset.getKey().topic(), > partitionToOffset.getKey().partition()), > (partitionToOffset.getValue() == null) > ? null : partitionToOffset.getValue().offset()); > } > > System.out.println(result); > ConsumerRecords<String, String> records = > c.poll(Duration.ofSeconds(1)); > for (TopicPartition part: result.keySet()){ > long offset = result.get(part); > c.seek(part,offset); > } > > System.out.println("trying to get records..."); > records = c.poll(Duration.ofSeconds(1)); > for (ConsumerRecord<String, String> record : records) { > Date date = new Date(record.timestamp()); > DateFormat formatter = new SimpleDateFormat("HH:mm:ss.SSS"); > formatter.setTimeZone(TimeZone.getTimeZone("UTC")); > String dateFormatted = formatter.format(date); > System.out.println("Received message: (" + record.key() + ", " + > record.value() + ") at offset " + record.offset() + " at time " + > dateFormatted); > } > } > } > > Thanks, > > Steve > > > On Sat, Jan 23, 2021 at 6:14 AM M. Manna <manme...@gmail.com> wrote: > > > Hello, > > > > We know that using KafkaConsumer api we can replay messages from certain > > offsets. However, we are not sure if we could specify timeStamp from > which > > we could replay messages again. > > > > Does anyone know if this is possible? > > > > Regards, > > >