Yeah, windowing seems perfect, if only I could find out the current window's start time (so I can log the current bucket's start & end times) and process window messages individually rather than as aggregates.
It doesn't seem like i can get this metadata from ProcessorContext though, from looking over the javadocs On Tue, Mar 21, 2017 at 12:38 AM, Michael Noll <mich...@confluent.io> wrote: > Ali, > > what you describe is (roughly!) how Kafka Streams implements the internal > state stores to support windowing. > > Some users have been following a similar approach as you outlined, using > the Processor API. > > > > On Mon, Mar 20, 2017 at 7:54 PM, Ali Akhtar <ali.rac...@gmail.com> wrote: > > > It would be helpful to know the 'start' and 'end' of the current > metadata, > > so if an out of order message arrives late, and is being processed in > > foreach(), you'd know which window / bucket it belongs to, and can handle > > it accordingly. > > > > I'm guessing that's not possible at the moment. > > > > (My use case is, i receive a stream of messages. Messages need to be > stored > > and sorted into 'buckets', to indicate 'sessions'. Each time there's a > gap > > of 30 mins or more since the last message (under a key), a new 'session' > > (bucket) should be started, and future messages should belong to that > > 'session', until the next 30+ min gap). > > > > On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll <mich...@confluent.io> > > wrote: > > > > > > Can windows only be used for aggregations, or can they also be used > for > > > foreach(), > > > and such? > > > > > > As of today, you can use windows only in aggregations. > > > > > > > And is it possible to get metadata on the message, such as whether or > > > not its > > > late, its index/position within the other messages, etc? > > > > > > If you use the Processor API of Kafka Streams, you can have access to > an > > > incoming record's topic, partition, offset, etc. via the so-called > > > ProcessorContext (which is updated for every new incoming record): > > > > > > http://docs.confluent.io/current/streams/javadocs/org/ > > > apache/kafka/streams/processor/Processor.html > > > - You can get/store a reference to the ProcessorContext from > > > `Processor#init()`. > > > > > > http://docs.confluent.io/current/streams/javadocs/org/ > > > apache/kafka/streams/processor/ProcessorContext.html > > > - The context can then be used within `Processor#process()` when you > > > process a new record. As I said, the context is updated behind the > > scenes > > > to match the record that is currently being processed. > > > > > > > > > Best, > > > Michael > > > > > > > > > > > > > > > On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar <ali.rac...@gmail.com> > > wrote: > > > > > > > Can windows only be used for aggregations, or can they also be used > for > > > > foreach(), and such? > > > > > > > > And is it possible to get metadata on the message, such as whether or > > not > > > > its late, its index/position within the other messages, etc? > > > > > > > > On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll <mich...@confluent.io> > > > > wrote: > > > > > > > > > And since you asked for a pointer, Ali: > > > > > http://docs.confluent.io/current/streams/concepts.html#windowing > > > > > > > > > > > > > > > On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll < > mich...@confluent.io> > > > > > wrote: > > > > > > > > > > > Late-arriving and out-of-order data is only treated specially for > > > > > windowed > > > > > > aggregations. > > > > > > > > > > > > For stateless operations such as `KStream#foreach()` or > > > > `KStream#map()`, > > > > > > records are processed in the order they arrive (per partition). > > > > > > > > > > > > -Michael > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar < > ali.rac...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > >> > later when message A arrives it will put that message back > into > > > > > >> > the right temporal context and publish an amended result for > the > > > > > proper > > > > > >> > time/session window as if message B were consumed in the > > timestamp > > > > > order > > > > > >> > before message A. > > > > > >> > > > > > >> Does this apply to the aggregation Kafka stream methods then, > and > > > not > > > > to > > > > > >> e.g foreach? > > > > > >> > > > > > >> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen < > > h...@confluent.io> > > > > > >> wrote: > > > > > >> > > > > > >> > Yes stream processing and CEP are subtlety different things. > > > > > >> > > > > > > >> > Kafka Streams helps you write stateful apps and allows that > > state > > > to > > > > > be > > > > > >> > preserved on disk (a local State store) as well as distributed > > for > > > > HA > > > > > or > > > > > >> > for parallel partitioned processing (via Kafka topic > partitions > > > and > > > > > >> > consumer groups) as well as in memory (as a performance > > > > enhancement). > > > > > >> > > > > > > >> > However a classical CEP engine with a pre-modeled state > machine > > > and > > > > > >> > pattern matching rules is something different from stream > > > > processing. > > > > > >> > > > > > > >> > It is on course possible to build a CEP system on top on Kafka > > > > Streams > > > > > >> and > > > > > >> > get the best of both worlds. > > > > > >> > > > > > > >> > -hans > > > > > >> > > > > > > >> > > On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan < > > > > > >> > sabarish....@gmail.com> wrote: > > > > > >> > > > > > > > >> > > Hans > > > > > >> > > > > > > > >> > > What you state would work for aggregations, but not for > state > > > > > machines > > > > > >> > and > > > > > >> > > CEP. > > > > > >> > > > > > > > >> > > Regards > > > > > >> > > Sab > > > > > >> > > > > > > > >> > >> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" < > > h...@confluent.io > > > > > > > > > >> wrote: > > > > > >> > >> > > > > > >> > >> The only way to make sure A is consumed first would be to > > delay > > > > the > > > > > >> > >> consumption of message B for at least 15 minutes which > would > > > fly > > > > in > > > > > >> the > > > > > >> > >> face of the principals of a true streaming platform so the > > > short > > > > > >> answer > > > > > >> > to > > > > > >> > >> your question is "no" because that would be batch > processing > > > not > > > > > >> stream > > > > > >> > >> processing. > > > > > >> > >> > > > > > >> > >> However, Kafka Streams does handle late arriving data. So > if > > > you > > > > > had > > > > > >> > some > > > > > >> > >> analytics that computes results on a time window or a > session > > > > > window > > > > > >> > then > > > > > >> > >> Kafka streams will compute on the stream in real time > > > (processing > > > > > >> > message > > > > > >> > >> B) and then later when message A arrives it will put that > > > message > > > > > >> back > > > > > >> > into > > > > > >> > >> the right temporal context and publish an amended result > for > > > the > > > > > >> proper > > > > > >> > >> time/session window as if message B were consumed in the > > > > timestamp > > > > > >> order > > > > > >> > >> before message A. The end result of this flow is that you > > > > > eventually > > > > > >> get > > > > > >> > >> the same results you would get in a batch processing system > > but > > > > > with > > > > > >> the > > > > > >> > >> added benefit of getting intermediary result at much lower > > > > latency. > > > > > >> > >> > > > > > >> > >> -hans > > > > > >> > >> > > > > > >> > >> /** > > > > > >> > >> * Hans Jespersen, Principal Systems Engineer, Confluent > Inc. > > > > > >> > >> * h...@confluent.io (650)924-2670 > > > > > >> > >> */ > > > > > >> > >> > > > > > >> > >>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar < > > > > > ali.rac...@gmail.com> > > > > > >> > wrote: > > > > > >> > >>> > > > > > >> > >>> Is it possible to have Kafka Streams order messages > > correctly > > > by > > > > > >> their > > > > > >> > >>> timestamps, even if they arrived out of order? > > > > > >> > >>> > > > > > >> > >>> E.g, say Message A with a timestamp of 5:00 PM and > Message B > > > > with > > > > > a > > > > > >> > >>> timestamp of 5:15 PM, are sent. > > > > > >> > >>> > > > > > >> > >>> Message B arrives sooner than Message A, due to network > > > issues. > > > > > >> > >>> > > > > > >> > >>> Is it possible to make sure that, across all consumers of > > > Kafka > > > > > >> Streams > > > > > >> > >>> (even if they are across different servers, but have the > > same > > > > > >> consumer > > > > > >> > >>> group), Message A is consumed first, before Message B? > > > > > >> > >>> > > > > > >> > >>> Thanks. > > > > > >> > >>> > > > > > >> > >> > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >