That would require - Knowing the current window's id (or some other identifier) to differentiate it from other windows
- Being able to process individual messages in a window Are those 2 things possible w/ kafka streams? (java) On Tue, Mar 21, 2017 at 7:43 PM, Hans Jespersen <h...@confluent.io> wrote: > While it's not exactly the same as the window start/stop time you can > store (in the state store) the earliest and latest timestamps of any > messages in each window and use that as a good approximation for the window > boundary times. > > -hans > > > On Mar 20, 2017, at 1:00 PM, Ali Akhtar <ali.rac...@gmail.com> wrote: > > > > Yeah, windowing seems perfect, if only I could find out the current > > window's start time (so I can log the current bucket's start & end times) > > and process window messages individually rather than as aggregates. > > > > It doesn't seem like i can get this metadata from ProcessorContext > though, > > from looking over the javadocs > > > >> On Tue, Mar 21, 2017 at 12:38 AM, Michael Noll <mich...@confluent.io> > wrote: > >> > >> Ali, > >> > >> what you describe is (roughly!) how Kafka Streams implements the > internal > >> state stores to support windowing. > >> > >> Some users have been following a similar approach as you outlined, using > >> the Processor API. > >> > >> > >> > >>> On Mon, Mar 20, 2017 at 7:54 PM, Ali Akhtar <ali.rac...@gmail.com> > wrote: > >>> > >>> It would be helpful to know the 'start' and 'end' of the current > >> metadata, > >>> so if an out of order message arrives late, and is being processed in > >>> foreach(), you'd know which window / bucket it belongs to, and can > handle > >>> it accordingly. > >>> > >>> I'm guessing that's not possible at the moment. > >>> > >>> (My use case is, i receive a stream of messages. Messages need to be > >> stored > >>> and sorted into 'buckets', to indicate 'sessions'. Each time there's a > >> gap > >>> of 30 mins or more since the last message (under a key), a new > 'session' > >>> (bucket) should be started, and future messages should belong to that > >>> 'session', until the next 30+ min gap). > >>> > >>> On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll <mich...@confluent.io> > >>> wrote: > >>> > >>>>> Can windows only be used for aggregations, or can they also be used > >> for > >>>> foreach(), > >>>> and such? > >>>> > >>>> As of today, you can use windows only in aggregations. > >>>> > >>>>> And is it possible to get metadata on the message, such as whether or > >>>> not its > >>>> late, its index/position within the other messages, etc? > >>>> > >>>> If you use the Processor API of Kafka Streams, you can have access to > >> an > >>>> incoming record's topic, partition, offset, etc. via the so-called > >>>> ProcessorContext (which is updated for every new incoming record): > >>>> > >>>> http://docs.confluent.io/current/streams/javadocs/org/ > >>>> apache/kafka/streams/processor/Processor.html > >>>> - You can get/store a reference to the ProcessorContext from > >>>> `Processor#init()`. > >>>> > >>>> http://docs.confluent.io/current/streams/javadocs/org/ > >>>> apache/kafka/streams/processor/ProcessorContext.html > >>>> - The context can then be used within `Processor#process()` when you > >>>> process a new record. As I said, the context is updated behind the > >>> scenes > >>>> to match the record that is currently being processed. > >>>> > >>>> > >>>> Best, > >>>> Michael > >>>> > >>>> > >>>> > >>>> > >>>> On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar <ali.rac...@gmail.com> > >>> wrote: > >>>> > >>>>> Can windows only be used for aggregations, or can they also be used > >> for > >>>>> foreach(), and such? > >>>>> > >>>>> And is it possible to get metadata on the message, such as whether or > >>> not > >>>>> its late, its index/position within the other messages, etc? > >>>>> > >>>>> On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll <mich...@confluent.io> > >>>>> wrote: > >>>>> > >>>>>> And since you asked for a pointer, Ali: > >>>>>> http://docs.confluent.io/current/streams/concepts.html#windowing > >>>>>> > >>>>>> > >>>>>> On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll < > >> mich...@confluent.io> > >>>>>> wrote: > >>>>>> > >>>>>>> Late-arriving and out-of-order data is only treated specially for > >>>>>> windowed > >>>>>>> aggregations. > >>>>>>> > >>>>>>> For stateless operations such as `KStream#foreach()` or > >>>>> `KStream#map()`, > >>>>>>> records are processed in the order they arrive (per partition). > >>>>>>> > >>>>>>> -Michael > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar < > >> ali.rac...@gmail.com > >>>> > >>>>>> wrote: > >>>>>>> > >>>>>>>>> later when message A arrives it will put that message back > >> into > >>>>>>>>> the right temporal context and publish an amended result for > >> the > >>>>>> proper > >>>>>>>>> time/session window as if message B were consumed in the > >>> timestamp > >>>>>> order > >>>>>>>>> before message A. > >>>>>>>> > >>>>>>>> Does this apply to the aggregation Kafka stream methods then, > >> and > >>>> not > >>>>> to > >>>>>>>> e.g foreach? > >>>>>>>> > >>>>>>>> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen < > >>> h...@confluent.io> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Yes stream processing and CEP are subtlety different things. > >>>>>>>>> > >>>>>>>>> Kafka Streams helps you write stateful apps and allows that > >>> state > >>>> to > >>>>>> be > >>>>>>>>> preserved on disk (a local State store) as well as distributed > >>> for > >>>>> HA > >>>>>> or > >>>>>>>>> for parallel partitioned processing (via Kafka topic > >> partitions > >>>> and > >>>>>>>>> consumer groups) as well as in memory (as a performance > >>>>> enhancement). > >>>>>>>>> > >>>>>>>>> However a classical CEP engine with a pre-modeled state > >> machine > >>>> and > >>>>>>>>> pattern matching rules is something different from stream > >>>>> processing. > >>>>>>>>> > >>>>>>>>> It is on course possible to build a CEP system on top on Kafka > >>>>> Streams > >>>>>>>> and > >>>>>>>>> get the best of both worlds. > >>>>>>>>> > >>>>>>>>> -hans > >>>>>>>>> > >>>>>>>>>> On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan < > >>>>>>>>> sabarish....@gmail.com> wrote: > >>>>>>>>>> > >>>>>>>>>> Hans > >>>>>>>>>> > >>>>>>>>>> What you state would work for aggregations, but not for > >> state > >>>>>> machines > >>>>>>>>> and > >>>>>>>>>> CEP. > >>>>>>>>>> > >>>>>>>>>> Regards > >>>>>>>>>> Sab > >>>>>>>>>> > >>>>>>>>>>> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" < > >>> h...@confluent.io > >>>>> > >>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>> The only way to make sure A is consumed first would be to > >>> delay > >>>>> the > >>>>>>>>>>> consumption of message B for at least 15 minutes which > >> would > >>>> fly > >>>>> in > >>>>>>>> the > >>>>>>>>>>> face of the principals of a true streaming platform so the > >>>> short > >>>>>>>> answer > >>>>>>>>> to > >>>>>>>>>>> your question is "no" because that would be batch > >> processing > >>>> not > >>>>>>>> stream > >>>>>>>>>>> processing. > >>>>>>>>>>> > >>>>>>>>>>> However, Kafka Streams does handle late arriving data. So > >> if > >>>> you > >>>>>> had > >>>>>>>>> some > >>>>>>>>>>> analytics that computes results on a time window or a > >> session > >>>>>> window > >>>>>>>>> then > >>>>>>>>>>> Kafka streams will compute on the stream in real time > >>>> (processing > >>>>>>>>> message > >>>>>>>>>>> B) and then later when message A arrives it will put that > >>>> message > >>>>>>>> back > >>>>>>>>> into > >>>>>>>>>>> the right temporal context and publish an amended result > >> for > >>>> the > >>>>>>>> proper > >>>>>>>>>>> time/session window as if message B were consumed in the > >>>>> timestamp > >>>>>>>> order > >>>>>>>>>>> before message A. The end result of this flow is that you > >>>>>> eventually > >>>>>>>> get > >>>>>>>>>>> the same results you would get in a batch processing system > >>> but > >>>>>> with > >>>>>>>> the > >>>>>>>>>>> added benefit of getting intermediary result at much lower > >>>>> latency. > >>>>>>>>>>> > >>>>>>>>>>> -hans > >>>>>>>>>>> > >>>>>>>>>>> /** > >>>>>>>>>>> * Hans Jespersen, Principal Systems Engineer, Confluent > >> Inc. > >>>>>>>>>>> * h...@confluent.io (650)924-2670 > >>>>>>>>>>> */ > >>>>>>>>>>> > >>>>>>>>>>>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar < > >>>>>> ali.rac...@gmail.com> > >>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>> Is it possible to have Kafka Streams order messages > >>> correctly > >>>> by > >>>>>>>> their > >>>>>>>>>>>> timestamps, even if they arrived out of order? > >>>>>>>>>>>> > >>>>>>>>>>>> E.g, say Message A with a timestamp of 5:00 PM and > >> Message B > >>>>> with > >>>>>> a > >>>>>>>>>>>> timestamp of 5:15 PM, are sent. > >>>>>>>>>>>> > >>>>>>>>>>>> Message B arrives sooner than Message A, due to network > >>>> issues. > >>>>>>>>>>>> > >>>>>>>>>>>> Is it possible to make sure that, across all consumers of > >>>> Kafka > >>>>>>>> Streams > >>>>>>>>>>>> (even if they are across different servers, but have the > >>> same > >>>>>>>> consumer > >>>>>>>>>>>> group), Message A is consumed first, before Message B? > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks. > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> >