Hans, Which class's javadocs should i look at? From my initial look at the javadocs and discussion with Michael, it doesn't seem possible.
On Tue, Mar 21, 2017 at 10:44 PM, Hans Jespersen <h...@confluent.io> wrote: > Yes, and yes! > > -hans > > > > > On Mar 21, 2017, at 7:45 AM, Ali Akhtar <ali.rac...@gmail.com> wrote: > > > > That would require > > > > - Knowing the current window's id (or some other identifier) to > > differentiate it from other windows > > > > - Being able to process individual messages in a window > > > > Are those 2 things possible w/ kafka streams? (java) > > > > On Tue, Mar 21, 2017 at 7:43 PM, Hans Jespersen <h...@confluent.io> > wrote: > > > >> While it's not exactly the same as the window start/stop time you can > >> store (in the state store) the earliest and latest timestamps of any > >> messages in each window and use that as a good approximation for the > window > >> boundary times. > >> > >> -hans > >> > >>> On Mar 20, 2017, at 1:00 PM, Ali Akhtar <ali.rac...@gmail.com> wrote: > >>> > >>> Yeah, windowing seems perfect, if only I could find out the current > >>> window's start time (so I can log the current bucket's start & end > times) > >>> and process window messages individually rather than as aggregates. > >>> > >>> It doesn't seem like i can get this metadata from ProcessorContext > >> though, > >>> from looking over the javadocs > >>> > >>>> On Tue, Mar 21, 2017 at 12:38 AM, Michael Noll <mich...@confluent.io> > >> wrote: > >>>> > >>>> Ali, > >>>> > >>>> what you describe is (roughly!) how Kafka Streams implements the > >> internal > >>>> state stores to support windowing. > >>>> > >>>> Some users have been following a similar approach as you outlined, > using > >>>> the Processor API. > >>>> > >>>> > >>>> > >>>>> On Mon, Mar 20, 2017 at 7:54 PM, Ali Akhtar <ali.rac...@gmail.com> > >> wrote: > >>>>> > >>>>> It would be helpful to know the 'start' and 'end' of the current > >>>> metadata, > >>>>> so if an out of order message arrives late, and is being processed in > >>>>> foreach(), you'd know which window / bucket it belongs to, and can > >> handle > >>>>> it accordingly. > >>>>> > >>>>> I'm guessing that's not possible at the moment. > >>>>> > >>>>> (My use case is, i receive a stream of messages. Messages need to be > >>>> stored > >>>>> and sorted into 'buckets', to indicate 'sessions'. Each time there's > a > >>>> gap > >>>>> of 30 mins or more since the last message (under a key), a new > >> 'session' > >>>>> (bucket) should be started, and future messages should belong to that > >>>>> 'session', until the next 30+ min gap). > >>>>> > >>>>> On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll <mich...@confluent.io > > > >>>>> wrote: > >>>>> > >>>>>>> Can windows only be used for aggregations, or can they also be used > >>>> for > >>>>>> foreach(), > >>>>>> and such? > >>>>>> > >>>>>> As of today, you can use windows only in aggregations. > >>>>>> > >>>>>>> And is it possible to get metadata on the message, such as whether > or > >>>>>> not its > >>>>>> late, its index/position within the other messages, etc? > >>>>>> > >>>>>> If you use the Processor API of Kafka Streams, you can have access > to > >>>> an > >>>>>> incoming record's topic, partition, offset, etc. via the so-called > >>>>>> ProcessorContext (which is updated for every new incoming record): > >>>>>> > >>>>>> http://docs.confluent.io/current/streams/javadocs/org/ > >>>>>> apache/kafka/streams/processor/Processor.html > >>>>>> - You can get/store a reference to the ProcessorContext from > >>>>>> `Processor#init()`. > >>>>>> > >>>>>> http://docs.confluent.io/current/streams/javadocs/org/ > >>>>>> apache/kafka/streams/processor/ProcessorContext.html > >>>>>> - The context can then be used within `Processor#process()` when you > >>>>>> process a new record. As I said, the context is updated behind the > >>>>> scenes > >>>>>> to match the record that is currently being processed. > >>>>>> > >>>>>> > >>>>>> Best, > >>>>>> Michael > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar <ali.rac...@gmail.com> > >>>>> wrote: > >>>>>> > >>>>>>> Can windows only be used for aggregations, or can they also be used > >>>> for > >>>>>>> foreach(), and such? > >>>>>>> > >>>>>>> And is it possible to get metadata on the message, such as whether > or > >>>>> not > >>>>>>> its late, its index/position within the other messages, etc? > >>>>>>> > >>>>>>> On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll < > mich...@confluent.io> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> And since you asked for a pointer, Ali: > >>>>>>>> http://docs.confluent.io/current/streams/concepts.html#windowing > >>>>>>>> > >>>>>>>> > >>>>>>>> On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll < > >>>> mich...@confluent.io> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Late-arriving and out-of-order data is only treated specially for > >>>>>>>> windowed > >>>>>>>>> aggregations. > >>>>>>>>> > >>>>>>>>> For stateless operations such as `KStream#foreach()` or > >>>>>>> `KStream#map()`, > >>>>>>>>> records are processed in the order they arrive (per partition). > >>>>>>>>> > >>>>>>>>> -Michael > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar < > >>>> ali.rac...@gmail.com > >>>>>> > >>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>>> later when message A arrives it will put that message back > >>>> into > >>>>>>>>>>> the right temporal context and publish an amended result for > >>>> the > >>>>>>>> proper > >>>>>>>>>>> time/session window as if message B were consumed in the > >>>>> timestamp > >>>>>>>> order > >>>>>>>>>>> before message A. > >>>>>>>>>> > >>>>>>>>>> Does this apply to the aggregation Kafka stream methods then, > >>>> and > >>>>>> not > >>>>>>> to > >>>>>>>>>> e.g foreach? > >>>>>>>>>> > >>>>>>>>>> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen < > >>>>> h...@confluent.io> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Yes stream processing and CEP are subtlety different things. > >>>>>>>>>>> > >>>>>>>>>>> Kafka Streams helps you write stateful apps and allows that > >>>>> state > >>>>>> to > >>>>>>>> be > >>>>>>>>>>> preserved on disk (a local State store) as well as distributed > >>>>> for > >>>>>>> HA > >>>>>>>> or > >>>>>>>>>>> for parallel partitioned processing (via Kafka topic > >>>> partitions > >>>>>> and > >>>>>>>>>>> consumer groups) as well as in memory (as a performance > >>>>>>> enhancement). > >>>>>>>>>>> > >>>>>>>>>>> However a classical CEP engine with a pre-modeled state > >>>> machine > >>>>>> and > >>>>>>>>>>> pattern matching rules is something different from stream > >>>>>>> processing. > >>>>>>>>>>> > >>>>>>>>>>> It is on course possible to build a CEP system on top on Kafka > >>>>>>> Streams > >>>>>>>>>> and > >>>>>>>>>>> get the best of both worlds. > >>>>>>>>>>> > >>>>>>>>>>> -hans > >>>>>>>>>>> > >>>>>>>>>>>> On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan < > >>>>>>>>>>> sabarish....@gmail.com> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>> Hans > >>>>>>>>>>>> > >>>>>>>>>>>> What you state would work for aggregations, but not for > >>>> state > >>>>>>>> machines > >>>>>>>>>>> and > >>>>>>>>>>>> CEP. > >>>>>>>>>>>> > >>>>>>>>>>>> Regards > >>>>>>>>>>>> Sab > >>>>>>>>>>>> > >>>>>>>>>>>>> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" < > >>>>> h...@confluent.io > >>>>>>> > >>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>> The only way to make sure A is consumed first would be to > >>>>> delay > >>>>>>> the > >>>>>>>>>>>>> consumption of message B for at least 15 minutes which > >>>> would > >>>>>> fly > >>>>>>> in > >>>>>>>>>> the > >>>>>>>>>>>>> face of the principals of a true streaming platform so the > >>>>>> short > >>>>>>>>>> answer > >>>>>>>>>>> to > >>>>>>>>>>>>> your question is "no" because that would be batch > >>>> processing > >>>>>> not > >>>>>>>>>> stream > >>>>>>>>>>>>> processing. > >>>>>>>>>>>>> > >>>>>>>>>>>>> However, Kafka Streams does handle late arriving data. So > >>>> if > >>>>>> you > >>>>>>>> had > >>>>>>>>>>> some > >>>>>>>>>>>>> analytics that computes results on a time window or a > >>>> session > >>>>>>>> window > >>>>>>>>>>> then > >>>>>>>>>>>>> Kafka streams will compute on the stream in real time > >>>>>> (processing > >>>>>>>>>>> message > >>>>>>>>>>>>> B) and then later when message A arrives it will put that > >>>>>> message > >>>>>>>>>> back > >>>>>>>>>>> into > >>>>>>>>>>>>> the right temporal context and publish an amended result > >>>> for > >>>>>> the > >>>>>>>>>> proper > >>>>>>>>>>>>> time/session window as if message B were consumed in the > >>>>>>> timestamp > >>>>>>>>>> order > >>>>>>>>>>>>> before message A. The end result of this flow is that you > >>>>>>>> eventually > >>>>>>>>>> get > >>>>>>>>>>>>> the same results you would get in a batch processing system > >>>>> but > >>>>>>>> with > >>>>>>>>>> the > >>>>>>>>>>>>> added benefit of getting intermediary result at much lower > >>>>>>> latency. > >>>>>>>>>>>>> > >>>>>>>>>>>>> -hans > >>>>>>>>>>>>> > >>>>>>>>>>>>> /** > >>>>>>>>>>>>> * Hans Jespersen, Principal Systems Engineer, Confluent > >>>> Inc. > >>>>>>>>>>>>> * h...@confluent.io (650)924-2670 > >>>>>>>>>>>>> */ > >>>>>>>>>>>>> > >>>>>>>>>>>>>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar < > >>>>>>>> ali.rac...@gmail.com> > >>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Is it possible to have Kafka Streams order messages > >>>>> correctly > >>>>>> by > >>>>>>>>>> their > >>>>>>>>>>>>>> timestamps, even if they arrived out of order? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> E.g, say Message A with a timestamp of 5:00 PM and > >>>> Message B > >>>>>>> with > >>>>>>>> a > >>>>>>>>>>>>>> timestamp of 5:15 PM, are sent. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Message B arrives sooner than Message A, due to network > >>>>>> issues. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Is it possible to make sure that, across all consumers of > >>>>>> Kafka > >>>>>>>>>> Streams > >>>>>>>>>>>>>> (even if they are across different servers, but have the > >>>>> same > >>>>>>>>>> consumer > >>>>>>>>>>>>>> group), Message A is consumed first, before Message B? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks. > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >> > >