Hi, Yes, you can use the offsetsForTimes() method. See below for a simple example that should get you started...
import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.*; import java.io.*; import java.time.Duration; import java.util.*; import java.text.*; public class searchByTime { static KafkaConsumer<String, String> c; public static void main(String args[]) throws Exception { Properties props = new Properties(); props.put("bootstrap.servers","localhost:9092"); props.put("max.poll.records",1); props.put("topic","yourtopicname"); props.put("group.id",UUID.randomUUID().toString()); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); c = new KafkaConsumer<String, String>(props); String topic = (String)props.get("topic"); c.subscribe(Collections.singletonList(topic)); System.out.println("subscribed to topic " + topic); System.out.println(c.partitionsFor(topic)); List<TopicPartition> partitions = new ArrayList<TopicPartition>(); for (PartitionInfo p: c.partitionsFor(topic)) { partitions.add(new TopicPartition(topic,p.partition())); } System.out.println(partitions); long timestamp = Long.parseLong(args[0]); Map<TopicPartition, Long> partitionOffsetsRequest = new HashMap<>(partitions.size()); for (TopicPartition partition : partitions) { partitionOffsetsRequest.put(new TopicPartition(partition.topic(), partition.partition()), timestamp); } final Map<TopicPartition, Long> result = new HashMap<>(partitions.size()); for (Map.Entry<TopicPartition, OffsetAndTimestamp> partitionToOffset : c.offsetsForTimes(partitionOffsetsRequest).entrySet()) { result.put(new TopicPartition(partitionToOffset.getKey().topic(), partitionToOffset.getKey().partition()), (partitionToOffset.getValue() == null) ? null : partitionToOffset.getValue().offset()); } System.out.println(result); ConsumerRecords<String, String> records = c.poll(Duration.ofSeconds(1)); for (TopicPartition part: result.keySet()){ long offset = result.get(part); c.seek(part,offset); } System.out.println("trying to get records..."); records = c.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> record : records) { Date date = new Date(record.timestamp()); DateFormat formatter = new SimpleDateFormat("HH:mm:ss.SSS"); formatter.setTimeZone(TimeZone.getTimeZone("UTC")); String dateFormatted = formatter.format(date); System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset() + " at time " + dateFormatted); } } } Thanks, Steve On Sat, Jan 23, 2021 at 6:14 AM M. Manna <manme...@gmail.com> wrote: > Hello, > > We know that using KafkaConsumer api we can replay messages from certain > offsets. However, we are not sure if we could specify timeStamp from which > we could replay messages again. > > Does anyone know if this is possible? > > Regards, >