Hi,

Earliest and Latest are like Enums that denote the first and last messages
in the partition. (Or the offsets for those positions)

My understanding is the you can only based on offsets . Not on timestamps.

regards

On Thu, Apr 16, 2015 at 7:35 AM, Alexey Borschenko <
aborsche...@elance-odesk.com> wrote:

> Hi all!
>
> I need to read offsets closing to specified timestamp.
> As I can see this can be achieved by using SImpleConsumer API.
> To test things I use SimpleConsumer example provided on site:
>
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
>
> I use Kafka 8.2.1
>
> When I pass -1 or -2 (latest or earliest) as time to getOffsetsBefore - it
> worx fine: returns 1562 and 73794 accordingly
> When I pass System.currentTimeMillis() as time: returns 1562 - same as
> earliest
> When I pass System.currentTimeMillis() - 10*60*1000 - it returns 1562
> When I pass System.currentTimeMillis() - 200*60*1000 - it returns 0
>
> Here is code snippet:
>
> *****************
>
> public void run(long a_maxReads, String a_topic, int a_partition,
> List<String> a_seedBrokers, int a_port) throws Exception {
>     System.out.println("a_maxReads = [" + a_maxReads + "], a_topic =
> [" + a_topic + "], a_partition = [" + a_partition + "], a_seedBrokers
> = [" + a_seedBrokers + "], a_port = [" + a_port + "]");
>     // find the meta data about the topic and partition we are interested
> in
>     //
>     PartitionMetadata metadata = findLeader(a_seedBrokers, a_port,
> a_topic, a_partition);
>     if (metadata == null) {
>         System.out.println("Can't find metadata for Topic and
> Partition. Exiting");
>         return;
>     }
>     if (metadata.leader() == null) {
>         System.out.println("Can't find Leader for Topic and Partition.
> Exiting");
>         return;
>     }
>     String leadBroker = metadata.leader().host();
>     String clientName = "Client_" + a_topic + "_" + a_partition;
>
>     SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port,
> 100000, 64 * 1024, clientName);
>     long readOffset = getLastOffset(
>             consumer, a_topic, a_partition,
>             System.currentTimeMillis() - 10 * 60 * 1000,
>             clientName);
>     System.out.println("readOffset = " + readOffset);
> }
>
> public static long getLastOffset(SimpleConsumer consumer, String
> topic, int partition,
>                                  long whichTime, String clientName) {
>     System.out.println("consumer = [" + consumer + "], topic = [" +
> topic + "], partition = [" + partition + "], whichTime = [" +
> whichTime + "], clientName = [" + clientName + "]");
>     TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
> partition);
>     Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
> new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
>     requestInfo.put(topicAndPartition, new
> PartitionOffsetRequestInfo(whichTime, 1));
>     kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
>             requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
> clientName);
>     OffsetResponse response = consumer.getOffsetsBefore(request);
>
>     if (response.hasError()) {
>         System.out.println("Error fetching data Offset Data the
> Broker. Reason: " + response.errorCode(topic, partition));
>         return 0;
>     }
>     long[] offsets = response.offsets(topic, partition);
>     if (offsets.length == 0) {
>         return 0;
>     }
>     return offsets[0];
> }
>
> ************************************
>
> How can I get more or less accurate offset values close to specified
> timestamp?
>
> Thanx!
>



-- 
http://khangaonkar.blogspot.com/

Reply via email to