> Can windows only be used for aggregations, or can they also be used for 
> foreach(),
and such?

As of today, you can use windows only in aggregations.

> And is it possible to get metadata on the message, such as whether or not its
late, its index/position within the other messages, etc?

If you use the Processor API of Kafka Streams, you can have access to an
incoming record's topic, partition, offset, etc. via the so-called
ProcessorContext (which is updated for every new incoming record):

http://docs.confluent.io/current/streams/javadocs/org/apache/kafka/streams/processor/Processor.html
- You can get/store a reference to the ProcessorContext from
`Processor#init()`.

http://docs.confluent.io/current/streams/javadocs/org/apache/kafka/streams/processor/ProcessorContext.html
- The context can then be used within `Processor#process()` when you
process a new record.  As I said, the context is updated behind the scenes
to match the record that is currently being processed.


Best,
Michael




On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar <ali.rac...@gmail.com> wrote:

> Can windows only be used for aggregations, or can they also be used for
> foreach(), and such?
>
> And is it possible to get metadata on the message, such as whether or not
> its late, its index/position within the other messages, etc?
>
> On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll <mich...@confluent.io>
> wrote:
>
> > And since you asked for a pointer, Ali:
> > http://docs.confluent.io/current/streams/concepts.html#windowing
> >
> >
> > On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll <mich...@confluent.io>
> > wrote:
> >
> > > Late-arriving and out-of-order data is only treated specially for
> > windowed
> > > aggregations.
> > >
> > > For stateless operations such as `KStream#foreach()` or
> `KStream#map()`,
> > > records are processed in the order they arrive (per partition).
> > >
> > > -Michael
> > >
> > >
> > >
> > >
> > > On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar <ali.rac...@gmail.com>
> > wrote:
> > >
> > >> > later when message A arrives it will put that message back into
> > >> > the right temporal context and publish an amended result for the
> > proper
> > >> > time/session window as if message B were consumed in the timestamp
> > order
> > >> > before message A.
> > >>
> > >> Does this apply to the aggregation Kafka stream methods then, and not
> to
> > >> e.g foreach?
> > >>
> > >> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen <h...@confluent.io>
> > >> wrote:
> > >>
> > >> > Yes stream processing and CEP are subtlety different things.
> > >> >
> > >> > Kafka Streams helps you write stateful apps and allows that state to
> > be
> > >> > preserved on disk (a local State store) as well as distributed for
> HA
> > or
> > >> > for parallel partitioned processing (via Kafka topic partitions and
> > >> > consumer groups) as well as in memory (as a performance
> enhancement).
> > >> >
> > >> > However a classical CEP engine with a pre-modeled state machine and
> > >> > pattern matching rules is something different from stream
> processing.
> > >> >
> > >> > It is on course possible to build a CEP system on top on Kafka
> Streams
> > >> and
> > >> > get the best of both worlds.
> > >> >
> > >> > -hans
> > >> >
> > >> > > On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <
> > >> > sabarish....@gmail.com> wrote:
> > >> > >
> > >> > > Hans
> > >> > >
> > >> > > What you state would work for aggregations, but not for state
> > machines
> > >> > and
> > >> > > CEP.
> > >> > >
> > >> > > Regards
> > >> > > Sab
> > >> > >
> > >> > >> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" <h...@confluent.io>
> > >> wrote:
> > >> > >>
> > >> > >> The only way to make sure A is consumed first would be to delay
> the
> > >> > >> consumption of message B for at least 15 minutes which would fly
> in
> > >> the
> > >> > >> face of the principals of a true streaming platform so the short
> > >> answer
> > >> > to
> > >> > >> your question is "no" because that would be batch processing not
> > >> stream
> > >> > >> processing.
> > >> > >>
> > >> > >> However, Kafka Streams does handle late arriving data. So if you
> > had
> > >> > some
> > >> > >> analytics that computes results on a time window or a session
> > window
> > >> > then
> > >> > >> Kafka streams will compute on the stream in real time (processing
> > >> > message
> > >> > >> B) and then later when message A arrives it will put that message
> > >> back
> > >> > into
> > >> > >> the right temporal context and publish an amended result for the
> > >> proper
> > >> > >> time/session window as if message B were consumed in the
> timestamp
> > >> order
> > >> > >> before message A. The end result of this flow is that you
> > eventually
> > >> get
> > >> > >> the same results you would get in a batch processing system but
> > with
> > >> the
> > >> > >> added benefit of getting intermediary result at much lower
> latency.
> > >> > >>
> > >> > >> -hans
> > >> > >>
> > >> > >> /**
> > >> > >> * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
> > >> > >> * h...@confluent.io (650)924-2670
> > >> > >> */
> > >> > >>
> > >> > >>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <
> > ali.rac...@gmail.com>
> > >> > wrote:
> > >> > >>>
> > >> > >>> Is it possible to have Kafka Streams order messages correctly by
> > >> their
> > >> > >>> timestamps, even if they arrived out of order?
> > >> > >>>
> > >> > >>> E.g, say Message A with a timestamp of 5:00 PM and Message B
> with
> > a
> > >> > >>> timestamp of 5:15 PM, are sent.
> > >> > >>>
> > >> > >>> Message B arrives sooner than Message A, due to network issues.
> > >> > >>>
> > >> > >>> Is it possible to make sure that, across all consumers of Kafka
> > >> Streams
> > >> > >>> (even if they are across different servers, but have the same
> > >> consumer
> > >> > >>> group), Message A is consumed first, before Message B?
> > >> > >>>
> > >> > >>> Thanks.
> > >> > >>>
> > >> > >>
> > >> >
> > >>
> > >
> > >
> > >
> >
>

Reply via email to