Yes and no :)

1) if you use the index to find the "end-offset" of your scan ("consume
all messages with a smaller offset") you would compare the offset of
each message with this "end offset" -- thus, this is the same thing as
consuming the topic from beginning and just compare the record timestamp
directly (if it's larger you can stop the scan). What I am trying to say
is, that the index doesn't help you to build a more efficient solution :)

The index in only useful, if you want to use the offset to *start
reading* from.


2) If you stop reading at the "end-offset", you do not encounter for
late arriving records. Both the time-index as well as the manual linear
scan would stop at the first record with a timestamp larger than your
"end timestamp". However, if you have late arriving data, you would miss
those. And as you claim you want to sort records in timestamp order, you
obviously have late data (otherwise, there would not be any need to
reorder records). Thus, you need to keep scanning for "some more data"
to also find late records -- how much more you want to scan is something
you need to define by yourself. For example, you could say, I read until
I see a message with timestamp "end timestamp + 5minutes", because I
know data is max 5 minutes late.


3) Your original problem was to sort data by timestamp -- thus, if you
scan your data, you also need to buffer data in main memory, reorder
out-of-order records, and write back to the output topic. Thus, you
still need a lot of custom code (but I agree, it might be less than if
you use Kafka Streams).



-Matthias

On 11/24/17 2:26 PM, Ray Ruvinskiy wrote:
> I see, thanks. I suppose I was trying to avoid so much custom logic, which is 
> why I initially was looking at the time-based index functionality. Based on 
> what you’ve said so far, I take it using the time-based index to find an 
> offset corresponding to a timestamp and then consuming all messages with a 
> smaller offset is not a viable solution?
> 
> Ray
> 
> On 2017-11-22, 12:12 AM, "Matthias J. Sax" <matth...@confluent.io> wrote:
> 
>     Using Kafka Streams, it seems reasonable to implement this using
>     low-level Processor API with a custom state store.
>     
>     Thus, you use the `StateStore` interface to implement you state store --
>     this allows you to spill to disk if you need to to handle state larger
>     than main memory.
>     
>     If you want to browse some state store examples, you can check out
>     RocksDBStore class that implement Kafka Streams' default `StateStore`.
>     
>     Within your custom `Processor` you can access the state accordingly to
>     maintain the window etc.
>     
>     It's a quite special use case and thus, there is not much out-of-the-box
>     support. You can check out some basic examples here:
>     https://github.com/confluentinc/kafka-streams-examples
>     
>     One example implements a custom state store (but only in-memory):
>     
> https://github.com/confluentinc/kafka-streams-examples/blob/3.3.0-post/src/test/scala/io/confluent/examples/streams/ProbabilisticCountingScalaIntegrationTest.scala
>     
>     Hope this helps.
>     
>     
>     -Matthias
>     
>     On 11/21/17 5:53 PM, Ray Ruvinskiy wrote:
>     > Thanks for your reply! I am quite inexperienced when it comes to Kafka 
> and Kafka Streams and so would appreciate a little more guidance. How would 
> one keep messages within a sliding window sorted by timestamp? Would the sort 
> operation be done all in memory? I would be dealing potentially with hundreds 
> of thousands of messages per partition within every 5 minute interval and so 
> was looking for solutions that were not necessary limited by the amount of 
> RAM.
>     > 
>     > Ray
>     > 
>     > On 2017-11-21, 5:57 PM, "Matthias J. Sax" <matth...@confluent.io> wrote:
>     > 
>     >     This is possible, but I think you don't need the time-based index 
> for it :)
>     >     
>     >     You will just buffer up all messages for a 5 minute sliding-window 
> and
>     >     maintain all message sorted by timestamp in this window. Each time 
> the
>     >     window "moves" you write the oldest records that "drop out" of the
>     >     window to the topic. If you get a record with an older timestamp 
> that
>     >     allowed, you don't insert in into the window but drop it.
>     >     
>     >     The timestamp index is useful if you want to seek to a specific 
> offset
>     >     base on timestamp. But I don't think you need this for your use 
> case.
>     >     
>     >     
>     >     
>     >     -Matthias
>     >     
>     >     On 11/21/17 1:39 PM, Ray Ruvinskiy wrote:
>     >     > I’ve been reading 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index
>  and trying to determine whether I can use the time-based index as an 
> efficient way to sort a stream of messages into timestamp (CreateTime) order.
>     >     > 
>     >     > I am dealing with a number of sources emitting messages that are 
> then processed in a distributed fashion and written to a Kafka topic. During 
> this processing, the original order of the messages is not strictly 
> maintained. Each message has an embedded timestamp. I’d like to be able to 
> sort these messages back into timestamp order, allowing for a certain 
> lateness interval, before processing them further. For example, supposing the 
> lateness interval is 5 minutes, at time T I’d like to consume from the topic 
> all messages with timestamp up to (T - 5 minutes), in timestamp order. The 
> assumption is that a message should be no more than 5 minutes late; if it is 
> more than 5 minutes late, it can be discarded. Is this something that can be 
> done with the time-based index?
>     >     > 
>     >     > Thanks,
>     >     > 
>     >     > Ray
>     >     > 
>     >     
>     >     
>     > 
>     
>     
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to