Yes that is one of the methods. It will be available on the 0.10.2 release
which is due at the beginning of February.

On Mon, 19 Dec 2016 at 12:17 Sachin Mittal <sjmit...@gmail.com> wrote:

> I believe you are talking about this method.
> public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final
> Initializer<T> initializer,
>                                                                   final
> Aggregator<K, V, T> aggregator,
>                                                                   final
> Windows<W> windows,
>                                                                   final
> StateStoreSupplier<WindowStore> storeSupplier)
>
> Will this api be part of next release?
>
> I can go about using this, however if in StateStoreSupplier we add some api
> to update the logConfig, then we can pass all the topic level props as part
> of streams config directly.
>
> Thanks
> Sachin
>
>
>
> On Mon, Dec 19, 2016 at 5:32 PM, Damian Guy <damian....@gmail.com> wrote:
>
> > Hi Sachin,
> >
> > I think we have a way of doing what you want already. If you create a
> > custom state store you can call the enableLogging method and pass in any
> > configuration parameters you want: For example:
> >
> > final StateStoreSupplier supplier = Stores.create("store")
> >         .withKeys(Serdes.String())
> >         .withValues(Serdes.String())
> >         .persistent()
> >         .enableLogging(Collections.singletonMap("retention.ms", "1000"))
> >         .build();
> >
> > You can then use the overloaded methods in the DSL to pass in the
> > StateStoreSupplier to your aggregates (trunk only)
> >
> >
> > On Mon, 19 Dec 2016 at 10:58 Sachin Mittal <sjmit...@gmail.com> wrote:
> >
> > > Hi,
> > > I am working towards adding topic configs as part of streams config.
> > > However I have run into an issue:
> > > Code flow is like this
> > >
> > > KStreamBuilder builder = new KStreamBuilder();
> > > builder.stream(...)
> > > ...
> > > KafkaStreams streams = new KafkaStreams(builder, streamsProps);
> > > streams.start();
> > >
> > > So we can see we build the topology before building the streams.
> > > While building topology it assigns state store.
> > > That time no topic config props are available.
> > >
> > > So it creates the supplier with empty topic config.
> > >
> > > Further StateStoreSupplier has method just to get the config and not to
> > > update it.
> > > Map<String, Object> logConfig()
> > >
> > > One way to implement this is change this interface to be able to update
> > the
> > > log config props too.
> > > And we the props are available to streams we update the topology
> > builder's
> > > state stores too with updated config.
> > >
> > > Other way is to change the KStreamBuilder and make it pass the topic
> > > config.
> > > However in second approach we would be splitting the streams config
> into
> > > two parts.
> > >
> > > Let me know how should one proceed with this.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > >
> > > On Thu, Dec 15, 2016 at 2:27 PM, Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> > > > I agree. We got already multiple request to add an API for specifying
> > > > topic parameters for internal topic... I am pretty sure we will add
> it
> > > > if time permits -- feel free to contribute this new feature!
> > > >
> > > > About chancing the value of until: that does not work, as the
> changelog
> > > > topic configuration would not be updated.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 12/14/16 8:22 PM, Sachin Mittal wrote:
> > > > > Hi,
> > > > > I suggest to include topic config as well as part of streams config
> > > > > properties like we do for producer and consumer configs.
> > > > > The topic config supplied would be used for creating internal
> > changelog
> > > > > topics along with certain additional configs which are applied by
> > > > default.
> > > > >
> > > > > This way we don't have to ever create internal topics manually.
> > > > >
> > > > > I had one doubt regarding until.
> > > > > Say I specify one value and run my streams app.
> > > > > Now I stop the app, specify different value and re start the app.
> > > > >
> > > > > Which value for retain would the old (pre existing) windows use.
> > Would
> > > it
> > > > > be the older value or the new value?
> > > > >
> > > > > Thanks
> > > > > Sachin
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Dec 15, 2016 at 12:26 AM, Matthias J. Sax <
> > > matth...@confluent.io
> > > > >
> > > > > wrote:
> > > > >
> > > > >> Understood. Makes sense.
> > > > >>
> > > > >> For this, you should apply Streams configs manually when creating
> > > those
> > > > >> topics. For retention parameter, use the value you specify in
> > > > >> corresponding .until() method for it.
> > > > >>
> > > > >>
> > > > >> -Matthias
> > > > >>
> > > > >>
> > > > >> On 12/14/16 10:08 AM, Sachin Mittal wrote:
> > > > >>> I was referring to internal change log topic. I had to create
> them
> > > > >> manually
> > > > >>> because in some case the message size of these topic were greater
> > > than
> > > > >> the
> > > > >>> default ones used by kafka streams.
> > > > >>>
> > > > >>> I think someone in this group recommended to create these topic
> > > > >> manually. I
> > > > >>> understand that it is better to have internal topics created by
> > > streams
> > > > >> app
> > > > >>> and I will take a second look at these and see if that can be
> done.
> > > > >>>
> > > > >>> I just wanted to make sure what all configs are applied to
> internal
> > > > >> topics
> > > > >>> in order to decide to avoid them creating manually.
> > > > >>>
> > > > >>> Thanks
> > > > >>> Sachin
> > > > >>>
> > > > >>>
> > > > >>> On Wed, Dec 14, 2016 at 11:08 PM, Matthias J. Sax <
> > > > matth...@confluent.io
> > > > >>>
> > > > >>> wrote:
> > > > >>>
> > > > >>>> I am wondering about "I create internal topic manually" -- which
> > > > topics
> > > > >>>> do you refer in detail?
> > > > >>>>
> > > > >>>> Kafka Streams create all kind of internal topics with
> > auto-generated
> > > > >>>> names. So it would be quite tricky to create all of them
> manually
> > > > >>>> (especially because you need to know those name in advance).
> > > > >>>>
> > > > >>>> IRRC, if a topic does exist, Kafka Streams does no change it's
> > > > >>>> configuration. Only if Kafka Streams does create a topic, it
> will
> > > > >>>> specify certain config parameters on topic create step.
> > > > >>>>
> > > > >>>>
> > > > >>>> -Matthias
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>> On 12/13/16 8:16 PM, Sachin Mittal wrote:
> > > > >>>>> Hi,
> > > > >>>>> Thanks for the explanation. This illustration makes it super
> easy
> > > to
> > > > >>>>> understand how until works. Perhaps we can update the wiki with
> > > this
> > > > >>>>> illustration.
> > > > >>>>> It is basically the retention time for a past window.
> > > > >>>>> I used to think until creates all the future windows for that
> > > period
> > > > >> and
> > > > >>>>> when time passes that it used to delete all the past windows.
> > > However
> > > > >>>>> actually until retains a window for specified time. This makes
> so
> > > > much
> > > > >>>> more
> > > > >>>>> sense.
> > > > >>>>>
> > > > >>>>> I just had one pending query regarding:
> > > > >>>>>
> > > > >>>>>> windowstore.changelog.additional.retention.ms
> > > > >>>>>
> > > > >>>>> How does this relate to rentention.ms param of topic config?
> > > > >>>>> I create internal topic manually using say rentention.ms
> > =3600000.
> > > > >>>>> In next release (post kafka_2.10-0.10.0.1) since we support
> > delete
> > > of
> > > > >>>>> internal changelog topic as well and I want it to be retained
> for
> > > say
> > > > >>>> just
> > > > >>>>> 1 hour.
> > > > >>>>> So how does that above parameter interfere with this topic
> level
> > > > >> setting.
> > > > >>>>> Or now I just need to set above config as 3600000 and not add
> > > > >>>>> rentention.ms=3600000
> > > > >>>>> while creating internal topic.
> > > > >>>>>
> > > > >>>>> Thanks
> > > > >>>>> Sachin
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> On Tue, Dec 13, 2016 at 11:27 PM, Matthias J. Sax <
> > > > >> matth...@confluent.io
> > > > >>>>>
> > > > >>>>> wrote:
> > > > >>>>>
> > > > >>>>>> First, windows are only created if there is actual data for a
> > > > window.
> > > > >> So
> > > > >>>>>> you get windows [0, 50), [25, 75), [50, 100) only if there are
> > > > record
> > > > >>>>>> falling into each window (btw: window start-time is inclusive
> > > while
> > > > >>>>>> window end time is exclusive). If you have only 2 record with
> > lets
> > > > say
> > > > >>>>>> ts=20 and ts=90 you will not have an open window [25,75). Each
> > > > window
> > > > >> is
> > > > >>>>>> physically created each time the first record for it is
> > processed.
> > > > >>>>>>
> > > > >>>>>> If you have above 4 windows and a record with ts=101 arrives,
> a
> > > new
> > > > >>>>>> window [101,151) will be created. Window [0,50) will not be
> > > deleted
> > > > >> yet,
> > > > >>>>>> because retention is 100 and thus Streams guarantees that all
> > > record
> > > > >>>>>> with ts >= 1 (= 101 - 100) are still processed correctly and
> > those
> > > > >>>>>> records would fall into window [0,50).
> > > > >>>>>>
> > > > >>>>>> Thus, window [0,50) can be dropped, if time advanced to TS =
> > 150,
> > > > but
> > > > >>>>>> not before that.
> > > > >>>>>>
> > > > >>>>>> -Matthias
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> On 12/13/16 12:06 AM, Sachin Mittal wrote:
> > > > >>>>>>> Hi,
> > > > >>>>>>> So is until for future or past?
> > > > >>>>>>> Say I get first record at t = 0 and until is 100 and my
> window
> > > size
> > > > >> is
> > > > >>>> 50
> > > > >>>>>>> advance by 25.
> > > > >>>>>>> I understand it will create windows (0, 50), (25, 75), (50,
> > 100)
> > > > >>>>>>> Now at t = 101 it will drop
> > > > >>>>>>> (0, 50), (25, 75), (50, 100) and create
> > > > >>>>>>> (101, 150), (125, 175), (150, 200)
> > > > >>>>>>>
> > > > >>>>>>> Please confirm if this understanding us correct. It is not
> > clear
> > > > how
> > > > >> it
> > > > >>>>>>> will handle overlapping windows (75, 125) and (175, 225) and
> so
> > > on?
> > > > >>>>>>>
> > > > >>>>>>> What case is not clear again is that at say t = 102 I get
> some
> > > > >> message
> > > > >>>>>> with
> > > > >>>>>>> timestamp 99. What happens then?
> > > > >>>>>>> Will the result added to previous aggregation of (50, 100) or
> > > (75,
> > > > >>>> 125),
> > > > >>>>>>> like it should.
> > > > >>>>>>>
> > > > >>>>>>> Or it will recreate the old window (50, 100) and aggregate
> the
> > > > value
> > > > >>>>>> there
> > > > >>>>>>> and then drop it. This would result is wrong aggregated
> value,
> > as
> > > > it
> > > > >>>> does
> > > > >>>>>>> not consider the previous aggregated values.
> > > > >>>>>>>
> > > > >>>>>>> So this is the pressing case I am not able to understand.
> > Maybe I
> > > > am
> > > > >>>>>> wrong
> > > > >>>>>>> at some basic understanding.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> Next for
> > > > >>>>>>> The parameter
> > > > >>>>>>>> windowstore.changelog.additional.retention.ms
> > > > >>>>>>>
> > > > >>>>>>> How does this relate to rentention.ms param of topic config?
> > > > >>>>>>> I create internal topic manually using say rentention.ms
> > > =3600000.
> > > > >>>>>>> In next release (post kafka_2.10-0.10.0.1) since we support
> > > delete
> > > > of
> > > > >>>>>>> internal changelog topic as well and I want it to be retained
> > for
> > > > say
> > > > >>>>>> just
> > > > >>>>>>> 1 hour.
> > > > >>>>>>> So how does that above parameter interfere with this topic
> > level
> > > > >>>> setting.
> > > > >>>>>>> Or now I just need to set above config as 3600000 and not add
> > > > >>>>>>> rentention.ms=3600000
> > > > >>>>>>> while creating internal topic.
> > > > >>>>>>> This is just another doubt remaining here.
> > > > >>>>>>>
> > > > >>>>>>> Thanks
> > > > >>>>>>> Sachin
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax <
> > > > >>>> matth...@confluent.io>
> > > > >>>>>>> wrote:
> > > > >>>>>>>
> > > > >>>>>>>> Sachin,
> > > > >>>>>>>>
> > > > >>>>>>>> There is no reason to have an .until() AND a .retain() --
> just
> > > > >>>> increase
> > > > >>>>>>>> the value of .until()
> > > > >>>>>>>>
> > > > >>>>>>>> If you have a window of let's say 1h size and you set
> .until()
> > > > also
> > > > >> to
> > > > >>>>>>>> 1h -- you can obviously not process any late arriving data.
> If
> > > you
> > > > >> set
> > > > >>>>>>>> until() to 2h is this example, you can process data that is
> up
> > > to
> > > > 1h
> > > > >>>>>>>> delayed.
> > > > >>>>>>>>
> > > > >>>>>>>> So basically, the retention should always be larger than you
> > > > window
> > > > >>>>>> size.
> > > > >>>>>>>>
> > > > >>>>>>>> The parameter
> > > > >>>>>>>>> windowstore.changelog.additional.retention.ms
> > > > >>>>>>>>
> > > > >>>>>>>> is applies to changelog topics that backup window state
> > stores.
> > > > >> Those
> > > > >>>>>>>> changelog topics are compacted. However, the used key does
> > > encode
> > > > an
> > > > >>>>>>>> window ID and thus older data can never be cleaned up by
> > > > compaction.
> > > > >>>>>>>> Therefore, an additional retention time is applied to those
> > > > topics,
> > > > >>>> too.
> > > > >>>>>>>> Thus, if an old window is not updated for this amount of
> time,
> > > it
> > > > >> will
> > > > >>>>>>>> get deleted eventually preventing this topic to grown
> > > infinitely.
> > > > >>>>>>>>
> > > > >>>>>>>> The value will be determined by until(), i.e., whatever you
> > > > specify
> > > > >> in
> > > > >>>>>>>> .until() will be used to set this parameter.
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> -Matthias
> > > > >>>>>>>>
> > > > >>>>>>>> On 12/12/16 1:07 AM, Sachin Mittal wrote:
> > > > >>>>>>>>> Hi,
> > > > >>>>>>>>> We are facing the exact problem as described by Matthias
> > above.
> > > > >>>>>>>>> We are keeping default until which is 1 day.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Our record's times tamp extractor has a field which
> increases
> > > > with
> > > > >>>>>> time.
> > > > >>>>>>>>> However for short time we cannot guarantee the time stamp
> is
> > > > always
> > > > >>>>>>>>> increases. So at the boundary ie after 24 hrs we can get
> > > records
> > > > >>>> which
> > > > >>>>>>>> are
> > > > >>>>>>>>> beyond that windows retention period.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Then it happens like it is mentioned above and our
> > aggregation
> > > > >> fails.
> > > > >>>>>>>>>
> > > > >>>>>>>>> So just to sum up when we get record
> > > > >>>>>>>>> 24h + 1 sec (it deletes older window and since the new
> record
> > > > >> belongs
> > > > >>>>>> to
> > > > >>>>>>>>> the new window its gets created)
> > > > >>>>>>>>> Now when we get next record of 24 hs - 1 sec since older
> > window
> > > > is
> > > > >>>>>>>> dropped
> > > > >>>>>>>>> it does not get aggregated in that bucket.
> > > > >>>>>>>>>
> > > > >>>>>>>>> I suggest we have another setting next to until call retain
> > > which
> > > > >>>>>> retains
> > > > >>>>>>>>> the older windows into next window.
> > > > >>>>>>>>>
> > > > >>>>>>>>> I think at stream window boundary level it should use a
> > concept
> > > > of
> > > > >>>>>>>> sliding
> > > > >>>>>>>>> window. So we can define window like
> > > > >>>>>>>>>
> > > > >>>>>>>>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
> > > > >>>>>>>> 1000l).untill(7
> > > > >>>>>>>>> * 24 * 3600 * 1000l).retain(900 * 1000l)
> > > > >>>>>>>>>
> > > > >>>>>>>>> So after 7 days it retains the data covered by windows in
> > last
> > > 15
> > > > >>>>>> minutes
> > > > >>>>>>>>> which rolls over the data in them to next window. This way
> > > > streams
> > > > >>>> work
> > > > >>>>>>>>> continuously.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Please let us know your thoughts on this.
> > > > >>>>>>>>>
> > > > >>>>>>>>> On another side question on this there is a setting:
> > > > >>>>>>>>>
> > > > >>>>>>>>> windowstore.changelog.additional.retention.ms
> > > > >>>>>>>>> I is not clear what is does. Is this the default for until?
> > > > >>>>>>>>>
> > > > >>>>>>>>> Thanks
> > > > >>>>>>>>> Sachin
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <
> > > > >>>>>> matth...@confluent.io
> > > > >>>>>>>>>
> > > > >>>>>>>>> wrote:
> > > > >>>>>>>>>
> > > > >>>>>>>>>> Windows are created on demand, ie, each time a new record
> > > > arrives
> > > > >>>> and
> > > > >>>>>>>>>> there is no window yet for it, a new window will get
> > created.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Windows are accepting data until their retention time
> (that
> > > you
> > > > >> can
> > > > >>>>>>>>>> configure via .until()) passed. Thus, you will have many
> > > windows
> > > > >>>> being
> > > > >>>>>>>>>> open in parallel.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> If you read older data, they will just be put into the
> > > > >> corresponding
> > > > >>>>>>>>>> windows (as long as window retention time did not pass).
> If
> > a
> > > > >> window
> > > > >>>>>> was
> > > > >>>>>>>>>> discarded already, a new window with this single (later
> > > > arriving)
> > > > >>>>>> record
> > > > >>>>>>>>>> will get created, the computation will be triggered, you
> > get a
> > > > >>>> result,
> > > > >>>>>>>>>> and afterwards the window is deleted again (as it's
> > retention
> > > > time
> > > > >>>>>>>>>> passed already).
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> The retention time is driven by "stream-time", in internal
> > > > tracked
> > > > >>>>>> time
> > > > >>>>>>>>>> that only progressed in forward direction. It gets it
> value
> > > from
> > > > >> the
> > > > >>>>>>>>>> timestamps provided by TimestampExtractor -- thus, per
> > default
> > > > it
> > > > >>>> will
> > > > >>>>>>>>>> be event-time.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> -Matthias
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> On 12/11/16 3:47 PM, Jon Yeargers wrote:
> > > > >>>>>>>>>>> I've read this and still have more questions than
> answers.
> > If
> > > > my
> > > > >>>> data
> > > > >>>>>>>>>> skips
> > > > >>>>>>>>>>> about (timewise) what determines when a given window will
> > > > start /
> > > > >>>>>> stop
> > > > >>>>>>>>>>> accepting new data? What if Im reading data from some
> time
> > > ago?
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <
> > > > >>>>>>>> matth...@confluent.io>
> > > > >>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>> Please have a look here:
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> http://docs.confluent.io/current/streams/developer-
> > > > >>>>>>>>>>>> guide.html#windowing-a-stream
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> If you have further question, just follow up :)
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> -Matthias
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
> > > > >>>>>>>>>>>>> Ive added the 'until()' clause to some aggregation
> steps
> > > and
> > > > >> it's
> > > > >>>>>>>>>> working
> > > > >>>>>>>>>>>>> wonders for keeping the size of the state store in
> useful
> > > > >>>>>>>> boundaries...
> > > > >>>>>>>>>>>> But
> > > > >>>>>>>>>>>>> Im not 100% clear on how it works.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> What is implied by the '.until()' clause? What
> determines
> > > > when
> > > > >> to
> > > > >>>>>>>> stop
> > > > >>>>>>>>>>>>> receiving further data - is it clock time (since the
> > window
> > > > was
> > > > >>>>>>>>>> created)?
> > > > >>>>>>>>>>>>> It seems problematic for it to refer to EventTime as
> this
> > > may
> > > > >>>>>> bounce
> > > > >>>>>>>>>> all
> > > > >>>>>>>>>>>>> over the place. For non-overlapping windows a given
> > record
> > > > can
> > > > >>>> only
> > > > >>>>>>>>>> fall
> > > > >>>>>>>>>>>>> into a single aggregation period - so when would a
> value
> > > get
> > > > >>>>>>>> discarded?
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Im using 'groupByKey(),aggregate(...,
> TimeWindows.of(60 *
> > > > >>>>>>>>>>>> 1000L).until(10 *
> > > > >>>>>>>>>>>>> 1000L))'  - but what is this accomplishing?
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > > >>
> > > > >
> > > >
> > > >
> > >
> >
>

Reply via email to