That would require

- Knowing the current window's id (or some other identifier) to
differentiate it from other windows

- Being able to process individual messages in a window

Are those 2 things possible w/ kafka streams? (java)

On Tue, Mar 21, 2017 at 7:43 PM, Hans Jespersen <h...@confluent.io> wrote:

> While it's not exactly the same as the window start/stop time you can
> store (in the state store) the earliest and latest timestamps of any
> messages in each window and use that as a good approximation for the window
> boundary times.
>
> -hans
>
> > On Mar 20, 2017, at 1:00 PM, Ali Akhtar <ali.rac...@gmail.com> wrote:
> >
> > Yeah, windowing seems perfect, if only I could find out the current
> > window's start time (so I can log the current bucket's start & end times)
> > and process window messages individually rather than as aggregates.
> >
> > It doesn't seem like i can get this metadata from ProcessorContext
> though,
> > from looking over the javadocs
> >
> >> On Tue, Mar 21, 2017 at 12:38 AM, Michael Noll <mich...@confluent.io>
> wrote:
> >>
> >> Ali,
> >>
> >> what you describe is (roughly!) how Kafka Streams implements the
> internal
> >> state stores to support windowing.
> >>
> >> Some users have been following a similar approach as you outlined, using
> >> the Processor API.
> >>
> >>
> >>
> >>> On Mon, Mar 20, 2017 at 7:54 PM, Ali Akhtar <ali.rac...@gmail.com>
> wrote:
> >>>
> >>> It would be helpful to know the 'start' and 'end' of the current
> >> metadata,
> >>> so if an out of order message arrives late, and is being processed in
> >>> foreach(), you'd know which window / bucket it belongs to, and can
> handle
> >>> it accordingly.
> >>>
> >>> I'm guessing that's not possible at the moment.
> >>>
> >>> (My use case is, i receive a stream of messages. Messages need to be
> >> stored
> >>> and sorted into 'buckets', to indicate 'sessions'. Each time there's a
> >> gap
> >>> of 30 mins or more since the last message (under a key), a new
> 'session'
> >>> (bucket) should be started, and future messages should belong to that
> >>> 'session', until the next 30+ min gap).
> >>>
> >>> On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll <mich...@confluent.io>
> >>> wrote:
> >>>
> >>>>> Can windows only be used for aggregations, or can they also be used
> >> for
> >>>> foreach(),
> >>>> and such?
> >>>>
> >>>> As of today, you can use windows only in aggregations.
> >>>>
> >>>>> And is it possible to get metadata on the message, such as whether or
> >>>> not its
> >>>> late, its index/position within the other messages, etc?
> >>>>
> >>>> If you use the Processor API of Kafka Streams, you can have access to
> >> an
> >>>> incoming record's topic, partition, offset, etc. via the so-called
> >>>> ProcessorContext (which is updated for every new incoming record):
> >>>>
> >>>> http://docs.confluent.io/current/streams/javadocs/org/
> >>>> apache/kafka/streams/processor/Processor.html
> >>>> - You can get/store a reference to the ProcessorContext from
> >>>> `Processor#init()`.
> >>>>
> >>>> http://docs.confluent.io/current/streams/javadocs/org/
> >>>> apache/kafka/streams/processor/ProcessorContext.html
> >>>> - The context can then be used within `Processor#process()` when you
> >>>> process a new record.  As I said, the context is updated behind the
> >>> scenes
> >>>> to match the record that is currently being processed.
> >>>>
> >>>>
> >>>> Best,
> >>>> Michael
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar <ali.rac...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> Can windows only be used for aggregations, or can they also be used
> >> for
> >>>>> foreach(), and such?
> >>>>>
> >>>>> And is it possible to get metadata on the message, such as whether or
> >>> not
> >>>>> its late, its index/position within the other messages, etc?
> >>>>>
> >>>>> On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll <mich...@confluent.io>
> >>>>> wrote:
> >>>>>
> >>>>>> And since you asked for a pointer, Ali:
> >>>>>> http://docs.confluent.io/current/streams/concepts.html#windowing
> >>>>>>
> >>>>>>
> >>>>>> On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll <
> >> mich...@confluent.io>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Late-arriving and out-of-order data is only treated specially for
> >>>>>> windowed
> >>>>>>> aggregations.
> >>>>>>>
> >>>>>>> For stateless operations such as `KStream#foreach()` or
> >>>>> `KStream#map()`,
> >>>>>>> records are processed in the order they arrive (per partition).
> >>>>>>>
> >>>>>>> -Michael
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar <
> >> ali.rac...@gmail.com
> >>>>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>>> later when message A arrives it will put that message back
> >> into
> >>>>>>>>> the right temporal context and publish an amended result for
> >> the
> >>>>>> proper
> >>>>>>>>> time/session window as if message B were consumed in the
> >>> timestamp
> >>>>>> order
> >>>>>>>>> before message A.
> >>>>>>>>
> >>>>>>>> Does this apply to the aggregation Kafka stream methods then,
> >> and
> >>>> not
> >>>>> to
> >>>>>>>> e.g foreach?
> >>>>>>>>
> >>>>>>>> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen <
> >>> h...@confluent.io>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Yes stream processing and CEP are subtlety different things.
> >>>>>>>>>
> >>>>>>>>> Kafka Streams helps you write stateful apps and allows that
> >>> state
> >>>> to
> >>>>>> be
> >>>>>>>>> preserved on disk (a local State store) as well as distributed
> >>> for
> >>>>> HA
> >>>>>> or
> >>>>>>>>> for parallel partitioned processing (via Kafka topic
> >> partitions
> >>>> and
> >>>>>>>>> consumer groups) as well as in memory (as a performance
> >>>>> enhancement).
> >>>>>>>>>
> >>>>>>>>> However a classical CEP engine with a pre-modeled state
> >> machine
> >>>> and
> >>>>>>>>> pattern matching rules is something different from stream
> >>>>> processing.
> >>>>>>>>>
> >>>>>>>>> It is on course possible to build a CEP system on top on Kafka
> >>>>> Streams
> >>>>>>>> and
> >>>>>>>>> get the best of both worlds.
> >>>>>>>>>
> >>>>>>>>> -hans
> >>>>>>>>>
> >>>>>>>>>> On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <
> >>>>>>>>> sabarish....@gmail.com> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Hans
> >>>>>>>>>>
> >>>>>>>>>> What you state would work for aggregations, but not for
> >> state
> >>>>>> machines
> >>>>>>>>> and
> >>>>>>>>>> CEP.
> >>>>>>>>>>
> >>>>>>>>>> Regards
> >>>>>>>>>> Sab
> >>>>>>>>>>
> >>>>>>>>>>> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" <
> >>> h...@confluent.io
> >>>>>
> >>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> The only way to make sure A is consumed first would be to
> >>> delay
> >>>>> the
> >>>>>>>>>>> consumption of message B for at least 15 minutes which
> >> would
> >>>> fly
> >>>>> in
> >>>>>>>> the
> >>>>>>>>>>> face of the principals of a true streaming platform so the
> >>>> short
> >>>>>>>> answer
> >>>>>>>>> to
> >>>>>>>>>>> your question is "no" because that would be batch
> >> processing
> >>>> not
> >>>>>>>> stream
> >>>>>>>>>>> processing.
> >>>>>>>>>>>
> >>>>>>>>>>> However, Kafka Streams does handle late arriving data. So
> >> if
> >>>> you
> >>>>>> had
> >>>>>>>>> some
> >>>>>>>>>>> analytics that computes results on a time window or a
> >> session
> >>>>>> window
> >>>>>>>>> then
> >>>>>>>>>>> Kafka streams will compute on the stream in real time
> >>>> (processing
> >>>>>>>>> message
> >>>>>>>>>>> B) and then later when message A arrives it will put that
> >>>> message
> >>>>>>>> back
> >>>>>>>>> into
> >>>>>>>>>>> the right temporal context and publish an amended result
> >> for
> >>>> the
> >>>>>>>> proper
> >>>>>>>>>>> time/session window as if message B were consumed in the
> >>>>> timestamp
> >>>>>>>> order
> >>>>>>>>>>> before message A. The end result of this flow is that you
> >>>>>> eventually
> >>>>>>>> get
> >>>>>>>>>>> the same results you would get in a batch processing system
> >>> but
> >>>>>> with
> >>>>>>>> the
> >>>>>>>>>>> added benefit of getting intermediary result at much lower
> >>>>> latency.
> >>>>>>>>>>>
> >>>>>>>>>>> -hans
> >>>>>>>>>>>
> >>>>>>>>>>> /**
> >>>>>>>>>>> * Hans Jespersen, Principal Systems Engineer, Confluent
> >> Inc.
> >>>>>>>>>>> * h...@confluent.io (650)924-2670
> >>>>>>>>>>> */
> >>>>>>>>>>>
> >>>>>>>>>>>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <
> >>>>>> ali.rac...@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Is it possible to have Kafka Streams order messages
> >>> correctly
> >>>> by
> >>>>>>>> their
> >>>>>>>>>>>> timestamps, even if they arrived out of order?
> >>>>>>>>>>>>
> >>>>>>>>>>>> E.g, say Message A with a timestamp of 5:00 PM and
> >> Message B
> >>>>> with
> >>>>>> a
> >>>>>>>>>>>> timestamp of 5:15 PM, are sent.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Message B arrives sooner than Message A, due to network
> >>>> issues.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Is it possible to make sure that, across all consumers of
> >>>> Kafka
> >>>>>>>> Streams
> >>>>>>>>>>>> (even if they are across different servers, but have the
> >>> same
> >>>>>>>> consumer
> >>>>>>>>>>>> group), Message A is consumed first, before Message B?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks.
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
>

Reply via email to