You are right. We are more likely to be interested in value when window
expires and sometimes when retention limit is reached.
I lost my "time sense" when I read the last email!

I guess we can query a 12:00:00 window at 12:00:05(5 sec window).
That will be some sort of poll(loop) as opposed to a callback that triggers
and at the right moment.
May be a Flink style trigger interface will help too.

Eager to see how it works in practice when released.

Srikanth



On Thu, Jul 14, 2016 at 12:36 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Srikanth,
>
> In you do not care about the intermediate results but only want to query
> the results when the window is no longer retained, you can consider just
> querying the state stores at the time that the window is about to "expire"
> (i.e. it will no longer be retained). For example, with a tumbling window
> size 5 seconds, and retention period say 1 hour, you can query a window
> that is created at time 12:00:00 at around 12:59:00, and query window that
> is created at time 1200:05 at around 12:59:05, etc.
>
> But bare in mind that in practice you may not only be interested in
> querying the aggregated results at the time the window is "dropped",
> because window retention period is for handling possible late arrived data
> and hence it could be much later compared with the window length itself.
> For example in your case the tumbling window length is 5 seconds, but you
> may want to maintain each window for an hour just for late data, and only
> querying a windowed result for a 5-second window that is one hour ago may
> not really be useful in real-time applications. So I think in most cases
> you'd still want to get some results much earlier, say when the window
> length as expired.
>
> Guozhang
>
>
> On Thu, Jul 14, 2016 at 8:08 AM, Srikanth <srikanth...@gmail.com> wrote:
>
> > Michael,
> >
> >  > This allows Kafka Streams to retain old window buckets for a period of
> > time in order to wait for the late
> > arrival of records whose timestamps fall within the window interval. If a
> > record arrives
> > after the retention period has passed, the record cannot be processed and
> > is dropped.
> >
> > So, how do we know if a window is being closed when until() is reached?
> > Do we get a callback of some sort when this happens? So that we can query
> > the final result for a window.
> >
> > Otherwise, we have to keep reading the store say very n secs or m
> records.
> > These intermediate results may or may not be useful.
> > That's a lot of load but we still may miss the final computed value,
> which
> > is needed in most cases.
> >
> > Srikanth
> >
> > On Thu, Jul 14, 2016 at 5:04 AM, Michael Noll <mich...@confluent.io>
> > wrote:
> >
> > > Srikanth,
> > >
> > > > This would be useful in place where we use a key-value store just to
> > > > duplicate a KTable for get() operations.
> > > > Any rough idea when this is targeted for release?
> > >
> > > We are aiming to add the queryable state feature into the next release
> of
> > > Kafka.
> > >
> > >
> > > > Its still not clear how to use this for the case this thread was
> > started
> > > for.
> > > > Does Kafka Stream keep windows alive forever?
> > > > At some point we need to "complete" a window rt?
> > >
> > > Kafka Streams keeps windows alive until the so-called window retention
> > > period expires.
> > >
> > > Excerpt from
> > >
> > >
> >
> http://docs.confluent.io/3.0.0/streams/developer-guide.html#windowing-a-stream
> > > :
> > >
> > >     [For the DSL only]: A local state store is usually needed for a
> > > windowing operation
> > >     to store recently received records based on the window interval,
> > while
> > > old records
> > >     in the store are purged after the specified window retention
> period.
> > > The retention time
> > >     can be set via `Windows#until()`.
> > >
> > > Excerpt from
> > >
> > >
> >
> http://docs.confluent.io/3.0.0/streams/concepts.html#streams-concepts-windowing
> > > :
> > >
> > >     Windowing operations are available in the Kafka Streams DSL, where
> > > users can
> > >     specify a retention period for the window. This allows Kafka
> Streams
> > to
> > > retain
> > >     old window buckets for a period of time in order to wait for the
> late
> > > arrival of records
> > >     whose timestamps fall within the window interval. If a record
> arrives
> > > after the retention
> > >     period has passed, the record cannot be processed and is dropped.
> > >
> > >
> > > > Either based on processing time or event time + watermark, etc.
> > >
> > > The time semantics are based on the timestamp extractor you have
> > configured
> > > for your application.  The default timestamp extractor is
> > > `ConsumerRecordTimestampExtractor`, which yields event-time semantics.
> > If
> > > you want processing-time semantics, you need to configure your
> > application
> > > to use the `WallclockTimestampExtractor`.
> > >
> > > Hope this helps,
> > > Michael
> > >
> > >
> > >
> > >
> > > On Wed, Jul 13, 2016 at 8:19 PM, Srikanth <srikanth...@gmail.com>
> wrote:
> > >
> > > > Thanks.
> > > >
> > > > This would be useful in place where we use a key-value store just to
> > > > duplicate a KTable for get() operations.
> > > > Any rough idea when this is targeted for release?
> > > >
> > > > Its still not clear how to use this for the case this thread was
> > started
> > > > for.
> > > > Does Kafka Stream keep windows alive forever?
> > > > At some point we need to "complete" a window rt? Either based on
> > > processing
> > > > time or event time + watermark, etc.
> > > > How can we tie internal state store query with window completion?
> i.e,
> > > get
> > > > the final value.
> > > >
> > > > Srikanth
> > > >
> > > > On Thu, Jul 7, 2016 at 2:05 PM, Eno Thereska <eno.there...@gmail.com
> >
> > > > wrote:
> > > >
> > > > > Hi Srikanth, Clive,
> > > > >
> > > > > Today we just added some example code usage in the KIP after
> feedback
> > > > from
> > > > > the community. There is code that shows how to access a WindowStore
> > (in
> > > > > read-only mode).
> > > > >
> > > > > Thanks
> > > > > Eno
> > > > >
> > > > >
> > > > > > On 7 Jul 2016, at 15:57, Srikanth <srikanth...@gmail.com> wrote:
> > > > > >
> > > > > > Eno,
> > > > > >
> > > > > > I was also looking for something similar. To output aggregate
> value
> > > > once
> > > > > > the window is "complete".
> > > > > > I'm not sure getting individual update for an aggregate operator
> is
> > > > that
> > > > > > useful.
> > > > > >
> > > > > > With KIP-67, will we have access to Windowed[key]( key +
> timestamp)
> > > and
> > > > > > value?
> > > > > > Does until() clear this store when time passes?
> > > > > >
> > > > > > Srikanth
> > > > > >
> > > > > > On Thu, Jun 30, 2016 at 4:27 AM, Clive Cox
> > > > <clivej...@yahoo.co.uk.invalid
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > >> Hi Eno,
> > > > > >> I've looked at KIP-67. It looks good but its not clear what
> calls
> > I
> > > > > would
> > > > > >> make to do what I presently need: Get access to each windowed
> > store
> > > at
> > > > > some
> > > > > >> time soon after window end time. I can then use the methods
> > > specified
> > > > to
> > > > > >> iterate over keys and values. Can you point me to the relevant
> > > > > >> method/technique for this?
> > > > > >>
> > > > > >> Thanks,
> > > > > >> Clive
> > > > > >>
> > > > > >>
> > > > > >>    On Tuesday, 28 June 2016, 12:47, Eno Thereska <
> > > > > eno.there...@gmail.com>
> > > > > >> wrote:
> > > > > >>
> > > > > >>
> > > > > >> Hi Clive,
> > > > > >>
> > > > > >> As promised, here is the link to the KIP that just went out
> today.
> > > > > >> Feedback welcome:
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
> > > > > >> <
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67:+Queryable+state+for+Kafka+Streams
> > > > > >>>
> > > > > >>
> > > > > >> Thanks
> > > > > >> Eno
> > > > > >>
> > > > > >>> On 27 Jun 2016, at 20:56, Eno Thereska <eno.there...@gmail.com
> >
> > > > wrote:
> > > > > >>>
> > > > > >>> Hi Clive,
> > > > > >>>
> > > > > >>> We are working on exposing the state store behind a KTable as
> > part
> > > of
> > > > > >> allowing for queries to the structures currently hidden behind
> the
> > > > > language
> > > > > >> (DSL). The KIP should be out today or tomorrow for you to have a
> > > look.
> > > > > You
> > > > > >> can probably do what you need using the low-level processor API
> > but
> > > > then
> > > > > >> you'd lose the benefits of the DSL and would have to maintain
> your
> > > own
> > > > > >> structures.
> > > > > >>>
> > > > > >>> Thanks,
> > > > > >>> Eno
> > > > > >>>
> > > > > >>>> On 26 Jun 2016, at 18:42, Clive Cox
> > <clivej...@yahoo.co.uk.INVALID
> > > >
> > > > > >> wrote:
> > > > > >>>>
> > > > > >>>> Following on from this thread, if I want to iterate over a
> > KTable
> > > at
> > > > > >> the end of its hopping/tumbling Time Window how can I do this at
> > > > present
> > > > > >> myself? Is there a way to access these structures?
> > > > > >>>> If this is not possible it would seem I need to duplicate and
> > > manage
> > > > > >> something similar to a list of windowed KTables myself which is
> > not
> > > > > really
> > > > > >> ideal.
> > > > > >>>> Thanks for any help,
> > > > > >>>> Clive
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> On Monday, 13 June 2016, 16:03, Eno Thereska <
> > > > eno.there...@gmail.com>
> > > > > >> wrote:
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> Hi Clive,
> > > > > >>>>
> > > > > >>>> For now this optimisation is not present. We're working on it
> as
> > > > part
> > > > > >> of KIP-63. One manual work-around might be to use a simple
> > Key-value
> > > > > store
> > > > > >> to deduplicate the final output before sending to the backend.
> It
> > > > could
> > > > > >> have a simple policy like "output all values at 1 second
> > intervals"
> > > or
> > > > > >> "output after 10 records have been received".
> > > > > >>>>
> > > > > >>>> Eno
> > > > > >>>>
> > > > > >>>>
> > > > > >>>>> On 13 Jun 2016, at 13:36, Clive Cox
> > > <clivej...@yahoo.co.uk.INVALID
> > > > >
> > > > > >> wrote:
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> Thanks Eno for your comments and references.
> > > > > >>>>> Perhaps, I can explain what I want to achieve and maybe you
> can
> > > > > >> suggest the correct topology?
> > > > > >>>>> I want process a stream of events and do aggregation and send
> > to
> > > an
> > > > > >> analytics backend (Influxdb), so that rather than sending 1000
> > > > > points/sec
> > > > > >> to the analytics backend, I send a much lower value. I'm only
> > > > > interested in
> > > > > >> using the processing time of the event so in that respect there
> > are
> > > no
> > > > > >> "late arriving" events.I was hoping I could use a Tumbling
> window
> > > > which
> > > > > >> when its end-time had been passed I can send the consolidated
> > > > > aggregation
> > > > > >> for that window and then throw the Window away.
> > > > > >>>>>
> > > > > >>>>> It sounds like from the references you give that this is not
> > > > possible
> > > > > >> at present in Kafka Streams?
> > > > > >>>>>
> > > > > >>>>> Thanks,
> > > > > >>>>> Clive
> > > > > >>>>>
> > > > > >>>>>   On Monday, 13 June 2016, 11:32, Eno Thereska <
> > > > > >> eno.there...@gmail.com> wrote:
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> Hi Clive,
> > > > > >>>>>
> > > > > >>>>> The behaviour you are seeing is indeed correct (though not
> > > > > necessarily
> > > > > >> optimal in terms of performance as described in this JIRA:
> > > > > >> https://issues.apache.org/jira/browse/KAFKA-3101 <
> > > > > >> https://issues.apache.org/jira/browse/KAFKA-3101>)
> > > > > >>>>>
> > > > > >>>>> The key observation is that windows never close/complete.
> There
> > > > could
> > > > > >> always be late arriving events that appear long after a window's
> > end
> > > > > >> interval and those need to be accounted for properly. In Kafka
> > > Streams
> > > > > that
> > > > > >> means that such late arriving events continue to update the
> value
> > of
> > > > the
> > > > > >> window. As described in the above JIRA, some optimisations could
> > > still
> > > > > be
> > > > > >> possible (e.g., batch requests as described in KIP-63 <
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams
> > > > > >),
> > > > > >> however they are not implemented yet.
> > > > > >>>>>
> > > > > >>>>> So your code needs to handle each update.
> > > > > >>>>>
> > > > > >>>>> Thanks
> > > > > >>>>> Eno
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>>> On 13 Jun 2016, at 11:13, Clive Cox
> > > <clivej...@yahoo.co.uk.INVALID
> > > > >
> > > > > >> wrote:
> > > > > >>>>>>
> > > > > >>>>>> Hi,
> > > > > >>>>>> I would like to process a stream with a tumbling window of
> > > 5secs,
> > > > > >> create aggregated stats for keys and push the final aggregates
> at
> > > the
> > > > > end
> > > > > >> of each window period to a analytics backend. I have tried doing
> > > > > something
> > > > > >> like:
> > > > > >>>>>> stream
> > > > > >>>>>>       .map
> > > > > >>>>>>       .reduceByKey(...
> > > > > >>>>>>         , TimeWindows.of("mywindow", 5000L),...)
> > > > > >>>>>>       .foreach        {            send stats
> > > > > >>>>>>         }
> > > > > >>>>>> But I get every update to the ktable in the foreach.
> > > > > >>>>>> How do I just get the final values once the TumblingWindow
> is
> > > > > >> complete so I can iterate over them and send to some external
> > > system?
> > > > > >>>>>> Thanks,
> > > > > >>>>>> Clive
> > > > > >>>>>> PS Using kafka_2.10-0.10.0.0
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > >
> > > *Michael G. Noll | Product Manager | Confluent | +1
> 650.453.5860Download
> > > Apache Kafka and Confluent Platform: www.confluent.io/download
> > > <http://www.confluent.io/download>*
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to