You are right. We are more likely to be interested in value when window expires and sometimes when retention limit is reached. I lost my "time sense" when I read the last email!
I guess we can query a 12:00:00 window at 12:00:05(5 sec window). That will be some sort of poll(loop) as opposed to a callback that triggers and at the right moment. May be a Flink style trigger interface will help too. Eager to see how it works in practice when released. Srikanth On Thu, Jul 14, 2016 at 12:36 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hi Srikanth, > > In you do not care about the intermediate results but only want to query > the results when the window is no longer retained, you can consider just > querying the state stores at the time that the window is about to "expire" > (i.e. it will no longer be retained). For example, with a tumbling window > size 5 seconds, and retention period say 1 hour, you can query a window > that is created at time 12:00:00 at around 12:59:00, and query window that > is created at time 1200:05 at around 12:59:05, etc. > > But bare in mind that in practice you may not only be interested in > querying the aggregated results at the time the window is "dropped", > because window retention period is for handling possible late arrived data > and hence it could be much later compared with the window length itself. > For example in your case the tumbling window length is 5 seconds, but you > may want to maintain each window for an hour just for late data, and only > querying a windowed result for a 5-second window that is one hour ago may > not really be useful in real-time applications. So I think in most cases > you'd still want to get some results much earlier, say when the window > length as expired. > > Guozhang > > > On Thu, Jul 14, 2016 at 8:08 AM, Srikanth <srikanth...@gmail.com> wrote: > > > Michael, > > > > > This allows Kafka Streams to retain old window buckets for a period of > > time in order to wait for the late > > arrival of records whose timestamps fall within the window interval. If a > > record arrives > > after the retention period has passed, the record cannot be processed and > > is dropped. > > > > So, how do we know if a window is being closed when until() is reached? > > Do we get a callback of some sort when this happens? So that we can query > > the final result for a window. > > > > Otherwise, we have to keep reading the store say very n secs or m > records. > > These intermediate results may or may not be useful. > > That's a lot of load but we still may miss the final computed value, > which > > is needed in most cases. > > > > Srikanth > > > > On Thu, Jul 14, 2016 at 5:04 AM, Michael Noll <mich...@confluent.io> > > wrote: > > > > > Srikanth, > > > > > > > This would be useful in place where we use a key-value store just to > > > > duplicate a KTable for get() operations. > > > > Any rough idea when this is targeted for release? > > > > > > We are aiming to add the queryable state feature into the next release > of > > > Kafka. > > > > > > > > > > Its still not clear how to use this for the case this thread was > > started > > > for. > > > > Does Kafka Stream keep windows alive forever? > > > > At some point we need to "complete" a window rt? > > > > > > Kafka Streams keeps windows alive until the so-called window retention > > > period expires. > > > > > > Excerpt from > > > > > > > > > http://docs.confluent.io/3.0.0/streams/developer-guide.html#windowing-a-stream > > > : > > > > > > [For the DSL only]: A local state store is usually needed for a > > > windowing operation > > > to store recently received records based on the window interval, > > while > > > old records > > > in the store are purged after the specified window retention > period. > > > The retention time > > > can be set via `Windows#until()`. > > > > > > Excerpt from > > > > > > > > > http://docs.confluent.io/3.0.0/streams/concepts.html#streams-concepts-windowing > > > : > > > > > > Windowing operations are available in the Kafka Streams DSL, where > > > users can > > > specify a retention period for the window. This allows Kafka > Streams > > to > > > retain > > > old window buckets for a period of time in order to wait for the > late > > > arrival of records > > > whose timestamps fall within the window interval. If a record > arrives > > > after the retention > > > period has passed, the record cannot be processed and is dropped. > > > > > > > > > > Either based on processing time or event time + watermark, etc. > > > > > > The time semantics are based on the timestamp extractor you have > > configured > > > for your application. The default timestamp extractor is > > > `ConsumerRecordTimestampExtractor`, which yields event-time semantics. > > If > > > you want processing-time semantics, you need to configure your > > application > > > to use the `WallclockTimestampExtractor`. > > > > > > Hope this helps, > > > Michael > > > > > > > > > > > > > > > On Wed, Jul 13, 2016 at 8:19 PM, Srikanth <srikanth...@gmail.com> > wrote: > > > > > > > Thanks. > > > > > > > > This would be useful in place where we use a key-value store just to > > > > duplicate a KTable for get() operations. > > > > Any rough idea when this is targeted for release? > > > > > > > > Its still not clear how to use this for the case this thread was > > started > > > > for. > > > > Does Kafka Stream keep windows alive forever? > > > > At some point we need to "complete" a window rt? Either based on > > > processing > > > > time or event time + watermark, etc. > > > > How can we tie internal state store query with window completion? > i.e, > > > get > > > > the final value. > > > > > > > > Srikanth > > > > > > > > On Thu, Jul 7, 2016 at 2:05 PM, Eno Thereska <eno.there...@gmail.com > > > > > > wrote: > > > > > > > > > Hi Srikanth, Clive, > > > > > > > > > > Today we just added some example code usage in the KIP after > feedback > > > > from > > > > > the community. There is code that shows how to access a WindowStore > > (in > > > > > read-only mode). > > > > > > > > > > Thanks > > > > > Eno > > > > > > > > > > > > > > > > On 7 Jul 2016, at 15:57, Srikanth <srikanth...@gmail.com> wrote: > > > > > > > > > > > > Eno, > > > > > > > > > > > > I was also looking for something similar. To output aggregate > value > > > > once > > > > > > the window is "complete". > > > > > > I'm not sure getting individual update for an aggregate operator > is > > > > that > > > > > > useful. > > > > > > > > > > > > With KIP-67, will we have access to Windowed[key]( key + > timestamp) > > > and > > > > > > value? > > > > > > Does until() clear this store when time passes? > > > > > > > > > > > > Srikanth > > > > > > > > > > > > On Thu, Jun 30, 2016 at 4:27 AM, Clive Cox > > > > <clivej...@yahoo.co.uk.invalid > > > > > > > > > > > > wrote: > > > > > > > > > > > >> Hi Eno, > > > > > >> I've looked at KIP-67. It looks good but its not clear what > calls > > I > > > > > would > > > > > >> make to do what I presently need: Get access to each windowed > > store > > > at > > > > > some > > > > > >> time soon after window end time. I can then use the methods > > > specified > > > > to > > > > > >> iterate over keys and values. Can you point me to the relevant > > > > > >> method/technique for this? > > > > > >> > > > > > >> Thanks, > > > > > >> Clive > > > > > >> > > > > > >> > > > > > >> On Tuesday, 28 June 2016, 12:47, Eno Thereska < > > > > > eno.there...@gmail.com> > > > > > >> wrote: > > > > > >> > > > > > >> > > > > > >> Hi Clive, > > > > > >> > > > > > >> As promised, here is the link to the KIP that just went out > today. > > > > > >> Feedback welcome: > > > > > >> > > > > > >> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams > > > > > >> < > > > > > >> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-67:+Queryable+state+for+Kafka+Streams > > > > > >>> > > > > > >> > > > > > >> Thanks > > > > > >> Eno > > > > > >> > > > > > >>> On 27 Jun 2016, at 20:56, Eno Thereska <eno.there...@gmail.com > > > > > > wrote: > > > > > >>> > > > > > >>> Hi Clive, > > > > > >>> > > > > > >>> We are working on exposing the state store behind a KTable as > > part > > > of > > > > > >> allowing for queries to the structures currently hidden behind > the > > > > > language > > > > > >> (DSL). The KIP should be out today or tomorrow for you to have a > > > look. > > > > > You > > > > > >> can probably do what you need using the low-level processor API > > but > > > > then > > > > > >> you'd lose the benefits of the DSL and would have to maintain > your > > > own > > > > > >> structures. > > > > > >>> > > > > > >>> Thanks, > > > > > >>> Eno > > > > > >>> > > > > > >>>> On 26 Jun 2016, at 18:42, Clive Cox > > <clivej...@yahoo.co.uk.INVALID > > > > > > > > > >> wrote: > > > > > >>>> > > > > > >>>> Following on from this thread, if I want to iterate over a > > KTable > > > at > > > > > >> the end of its hopping/tumbling Time Window how can I do this at > > > > present > > > > > >> myself? Is there a way to access these structures? > > > > > >>>> If this is not possible it would seem I need to duplicate and > > > manage > > > > > >> something similar to a list of windowed KTables myself which is > > not > > > > > really > > > > > >> ideal. > > > > > >>>> Thanks for any help, > > > > > >>>> Clive > > > > > >>>> > > > > > >>>> > > > > > >>>> On Monday, 13 June 2016, 16:03, Eno Thereska < > > > > eno.there...@gmail.com> > > > > > >> wrote: > > > > > >>>> > > > > > >>>> > > > > > >>>> Hi Clive, > > > > > >>>> > > > > > >>>> For now this optimisation is not present. We're working on it > as > > > > part > > > > > >> of KIP-63. One manual work-around might be to use a simple > > Key-value > > > > > store > > > > > >> to deduplicate the final output before sending to the backend. > It > > > > could > > > > > >> have a simple policy like "output all values at 1 second > > intervals" > > > or > > > > > >> "output after 10 records have been received". > > > > > >>>> > > > > > >>>> Eno > > > > > >>>> > > > > > >>>> > > > > > >>>>> On 13 Jun 2016, at 13:36, Clive Cox > > > <clivej...@yahoo.co.uk.INVALID > > > > > > > > > > >> wrote: > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> Thanks Eno for your comments and references. > > > > > >>>>> Perhaps, I can explain what I want to achieve and maybe you > can > > > > > >> suggest the correct topology? > > > > > >>>>> I want process a stream of events and do aggregation and send > > to > > > an > > > > > >> analytics backend (Influxdb), so that rather than sending 1000 > > > > > points/sec > > > > > >> to the analytics backend, I send a much lower value. I'm only > > > > > interested in > > > > > >> using the processing time of the event so in that respect there > > are > > > no > > > > > >> "late arriving" events.I was hoping I could use a Tumbling > window > > > > which > > > > > >> when its end-time had been passed I can send the consolidated > > > > > aggregation > > > > > >> for that window and then throw the Window away. > > > > > >>>>> > > > > > >>>>> It sounds like from the references you give that this is not > > > > possible > > > > > >> at present in Kafka Streams? > > > > > >>>>> > > > > > >>>>> Thanks, > > > > > >>>>> Clive > > > > > >>>>> > > > > > >>>>> On Monday, 13 June 2016, 11:32, Eno Thereska < > > > > > >> eno.there...@gmail.com> wrote: > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> Hi Clive, > > > > > >>>>> > > > > > >>>>> The behaviour you are seeing is indeed correct (though not > > > > > necessarily > > > > > >> optimal in terms of performance as described in this JIRA: > > > > > >> https://issues.apache.org/jira/browse/KAFKA-3101 < > > > > > >> https://issues.apache.org/jira/browse/KAFKA-3101>) > > > > > >>>>> > > > > > >>>>> The key observation is that windows never close/complete. > There > > > > could > > > > > >> always be late arriving events that appear long after a window's > > end > > > > > >> interval and those need to be accounted for properly. In Kafka > > > Streams > > > > > that > > > > > >> means that such late arriving events continue to update the > value > > of > > > > the > > > > > >> window. As described in the above JIRA, some optimisations could > > > still > > > > > be > > > > > >> possible (e.g., batch requests as described in KIP-63 < > > > > > >> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams > > > > > >), > > > > > >> however they are not implemented yet. > > > > > >>>>> > > > > > >>>>> So your code needs to handle each update. > > > > > >>>>> > > > > > >>>>> Thanks > > > > > >>>>> Eno > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> > > > > > >>>>>> On 13 Jun 2016, at 11:13, Clive Cox > > > <clivej...@yahoo.co.uk.INVALID > > > > > > > > > > >> wrote: > > > > > >>>>>> > > > > > >>>>>> Hi, > > > > > >>>>>> I would like to process a stream with a tumbling window of > > > 5secs, > > > > > >> create aggregated stats for keys and push the final aggregates > at > > > the > > > > > end > > > > > >> of each window period to a analytics backend. I have tried doing > > > > > something > > > > > >> like: > > > > > >>>>>> stream > > > > > >>>>>> .map > > > > > >>>>>> .reduceByKey(... > > > > > >>>>>> , TimeWindows.of("mywindow", 5000L),...) > > > > > >>>>>> .foreach { send stats > > > > > >>>>>> } > > > > > >>>>>> But I get every update to the ktable in the foreach. > > > > > >>>>>> How do I just get the final values once the TumblingWindow > is > > > > > >> complete so I can iterate over them and send to some external > > > system? > > > > > >>>>>> Thanks, > > > > > >>>>>> Clive > > > > > >>>>>> PS Using kafka_2.10-0.10.0.0 > > > > > >>>>>> > > > > > >>>>> > > > > > >>>>> > > > > > >>>> > > > > > >>>> > > > > > >>> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > *Michael G. Noll | Product Manager | Confluent | +1 > 650.453.5860Download > > > Apache Kafka and Confluent Platform: www.confluent.io/download > > > <http://www.confluent.io/download>* > > > > > > > > > -- > -- Guozhang >