It would be helpful to know the 'start' and 'end' of the current metadata, so if an out of order message arrives late, and is being processed in foreach(), you'd know which window / bucket it belongs to, and can handle it accordingly.
I'm guessing that's not possible at the moment. (My use case is, i receive a stream of messages. Messages need to be stored and sorted into 'buckets', to indicate 'sessions'. Each time there's a gap of 30 mins or more since the last message (under a key), a new 'session' (bucket) should be started, and future messages should belong to that 'session', until the next 30+ min gap). On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll <mich...@confluent.io> wrote: > > Can windows only be used for aggregations, or can they also be used for > foreach(), > and such? > > As of today, you can use windows only in aggregations. > > > And is it possible to get metadata on the message, such as whether or > not its > late, its index/position within the other messages, etc? > > If you use the Processor API of Kafka Streams, you can have access to an > incoming record's topic, partition, offset, etc. via the so-called > ProcessorContext (which is updated for every new incoming record): > > http://docs.confluent.io/current/streams/javadocs/org/ > apache/kafka/streams/processor/Processor.html > - You can get/store a reference to the ProcessorContext from > `Processor#init()`. > > http://docs.confluent.io/current/streams/javadocs/org/ > apache/kafka/streams/processor/ProcessorContext.html > - The context can then be used within `Processor#process()` when you > process a new record. As I said, the context is updated behind the scenes > to match the record that is currently being processed. > > > Best, > Michael > > > > > On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar <ali.rac...@gmail.com> wrote: > > > Can windows only be used for aggregations, or can they also be used for > > foreach(), and such? > > > > And is it possible to get metadata on the message, such as whether or not > > its late, its index/position within the other messages, etc? > > > > On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll <mich...@confluent.io> > > wrote: > > > > > And since you asked for a pointer, Ali: > > > http://docs.confluent.io/current/streams/concepts.html#windowing > > > > > > > > > On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll <mich...@confluent.io> > > > wrote: > > > > > > > Late-arriving and out-of-order data is only treated specially for > > > windowed > > > > aggregations. > > > > > > > > For stateless operations such as `KStream#foreach()` or > > `KStream#map()`, > > > > records are processed in the order they arrive (per partition). > > > > > > > > -Michael > > > > > > > > > > > > > > > > > > > > On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar <ali.rac...@gmail.com> > > > wrote: > > > > > > > >> > later when message A arrives it will put that message back into > > > >> > the right temporal context and publish an amended result for the > > > proper > > > >> > time/session window as if message B were consumed in the timestamp > > > order > > > >> > before message A. > > > >> > > > >> Does this apply to the aggregation Kafka stream methods then, and > not > > to > > > >> e.g foreach? > > > >> > > > >> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen <h...@confluent.io> > > > >> wrote: > > > >> > > > >> > Yes stream processing and CEP are subtlety different things. > > > >> > > > > >> > Kafka Streams helps you write stateful apps and allows that state > to > > > be > > > >> > preserved on disk (a local State store) as well as distributed for > > HA > > > or > > > >> > for parallel partitioned processing (via Kafka topic partitions > and > > > >> > consumer groups) as well as in memory (as a performance > > enhancement). > > > >> > > > > >> > However a classical CEP engine with a pre-modeled state machine > and > > > >> > pattern matching rules is something different from stream > > processing. > > > >> > > > > >> > It is on course possible to build a CEP system on top on Kafka > > Streams > > > >> and > > > >> > get the best of both worlds. > > > >> > > > > >> > -hans > > > >> > > > > >> > > On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan < > > > >> > sabarish....@gmail.com> wrote: > > > >> > > > > > >> > > Hans > > > >> > > > > > >> > > What you state would work for aggregations, but not for state > > > machines > > > >> > and > > > >> > > CEP. > > > >> > > > > > >> > > Regards > > > >> > > Sab > > > >> > > > > > >> > >> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" <h...@confluent.io > > > > > >> wrote: > > > >> > >> > > > >> > >> The only way to make sure A is consumed first would be to delay > > the > > > >> > >> consumption of message B for at least 15 minutes which would > fly > > in > > > >> the > > > >> > >> face of the principals of a true streaming platform so the > short > > > >> answer > > > >> > to > > > >> > >> your question is "no" because that would be batch processing > not > > > >> stream > > > >> > >> processing. > > > >> > >> > > > >> > >> However, Kafka Streams does handle late arriving data. So if > you > > > had > > > >> > some > > > >> > >> analytics that computes results on a time window or a session > > > window > > > >> > then > > > >> > >> Kafka streams will compute on the stream in real time > (processing > > > >> > message > > > >> > >> B) and then later when message A arrives it will put that > message > > > >> back > > > >> > into > > > >> > >> the right temporal context and publish an amended result for > the > > > >> proper > > > >> > >> time/session window as if message B were consumed in the > > timestamp > > > >> order > > > >> > >> before message A. The end result of this flow is that you > > > eventually > > > >> get > > > >> > >> the same results you would get in a batch processing system but > > > with > > > >> the > > > >> > >> added benefit of getting intermediary result at much lower > > latency. > > > >> > >> > > > >> > >> -hans > > > >> > >> > > > >> > >> /** > > > >> > >> * Hans Jespersen, Principal Systems Engineer, Confluent Inc. > > > >> > >> * h...@confluent.io (650)924-2670 > > > >> > >> */ > > > >> > >> > > > >> > >>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar < > > > ali.rac...@gmail.com> > > > >> > wrote: > > > >> > >>> > > > >> > >>> Is it possible to have Kafka Streams order messages correctly > by > > > >> their > > > >> > >>> timestamps, even if they arrived out of order? > > > >> > >>> > > > >> > >>> E.g, say Message A with a timestamp of 5:00 PM and Message B > > with > > > a > > > >> > >>> timestamp of 5:15 PM, are sent. > > > >> > >>> > > > >> > >>> Message B arrives sooner than Message A, due to network > issues. > > > >> > >>> > > > >> > >>> Is it possible to make sure that, across all consumers of > Kafka > > > >> Streams > > > >> > >>> (even if they are across different servers, but have the same > > > >> consumer > > > >> > >>> group), Message A is consumed first, before Message B? > > > >> > >>> > > > >> > >>> Thanks. > > > >> > >>> > > > >> > >> > > > >> > > > > >> > > > > > > > > > > > > > > > > > >