I believe you are talking about this method. public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final Aggregator<K, V, T> aggregator, final Windows<W> windows, final StateStoreSupplier<WindowStore> storeSupplier)
Will this api be part of next release? I can go about using this, however if in StateStoreSupplier we add some api to update the logConfig, then we can pass all the topic level props as part of streams config directly. Thanks Sachin On Mon, Dec 19, 2016 at 5:32 PM, Damian Guy <damian....@gmail.com> wrote: > Hi Sachin, > > I think we have a way of doing what you want already. If you create a > custom state store you can call the enableLogging method and pass in any > configuration parameters you want: For example: > > final StateStoreSupplier supplier = Stores.create("store") > .withKeys(Serdes.String()) > .withValues(Serdes.String()) > .persistent() > .enableLogging(Collections.singletonMap("retention.ms", "1000")) > .build(); > > You can then use the overloaded methods in the DSL to pass in the > StateStoreSupplier to your aggregates (trunk only) > > > On Mon, 19 Dec 2016 at 10:58 Sachin Mittal <sjmit...@gmail.com> wrote: > > > Hi, > > I am working towards adding topic configs as part of streams config. > > However I have run into an issue: > > Code flow is like this > > > > KStreamBuilder builder = new KStreamBuilder(); > > builder.stream(...) > > ... > > KafkaStreams streams = new KafkaStreams(builder, streamsProps); > > streams.start(); > > > > So we can see we build the topology before building the streams. > > While building topology it assigns state store. > > That time no topic config props are available. > > > > So it creates the supplier with empty topic config. > > > > Further StateStoreSupplier has method just to get the config and not to > > update it. > > Map<String, Object> logConfig() > > > > One way to implement this is change this interface to be able to update > the > > log config props too. > > And we the props are available to streams we update the topology > builder's > > state stores too with updated config. > > > > Other way is to change the KStreamBuilder and make it pass the topic > > config. > > However in second approach we would be splitting the streams config into > > two parts. > > > > Let me know how should one proceed with this. > > > > Thanks > > Sachin > > > > > > > > On Thu, Dec 15, 2016 at 2:27 PM, Matthias J. Sax <matth...@confluent.io> > > wrote: > > > > > I agree. We got already multiple request to add an API for specifying > > > topic parameters for internal topic... I am pretty sure we will add it > > > if time permits -- feel free to contribute this new feature! > > > > > > About chancing the value of until: that does not work, as the changelog > > > topic configuration would not be updated. > > > > > > > > > -Matthias > > > > > > On 12/14/16 8:22 PM, Sachin Mittal wrote: > > > > Hi, > > > > I suggest to include topic config as well as part of streams config > > > > properties like we do for producer and consumer configs. > > > > The topic config supplied would be used for creating internal > changelog > > > > topics along with certain additional configs which are applied by > > > default. > > > > > > > > This way we don't have to ever create internal topics manually. > > > > > > > > I had one doubt regarding until. > > > > Say I specify one value and run my streams app. > > > > Now I stop the app, specify different value and re start the app. > > > > > > > > Which value for retain would the old (pre existing) windows use. > Would > > it > > > > be the older value or the new value? > > > > > > > > Thanks > > > > Sachin > > > > > > > > > > > > > > > > On Thu, Dec 15, 2016 at 12:26 AM, Matthias J. Sax < > > matth...@confluent.io > > > > > > > > wrote: > > > > > > > >> Understood. Makes sense. > > > >> > > > >> For this, you should apply Streams configs manually when creating > > those > > > >> topics. For retention parameter, use the value you specify in > > > >> corresponding .until() method for it. > > > >> > > > >> > > > >> -Matthias > > > >> > > > >> > > > >> On 12/14/16 10:08 AM, Sachin Mittal wrote: > > > >>> I was referring to internal change log topic. I had to create them > > > >> manually > > > >>> because in some case the message size of these topic were greater > > than > > > >> the > > > >>> default ones used by kafka streams. > > > >>> > > > >>> I think someone in this group recommended to create these topic > > > >> manually. I > > > >>> understand that it is better to have internal topics created by > > streams > > > >> app > > > >>> and I will take a second look at these and see if that can be done. > > > >>> > > > >>> I just wanted to make sure what all configs are applied to internal > > > >> topics > > > >>> in order to decide to avoid them creating manually. > > > >>> > > > >>> Thanks > > > >>> Sachin > > > >>> > > > >>> > > > >>> On Wed, Dec 14, 2016 at 11:08 PM, Matthias J. Sax < > > > matth...@confluent.io > > > >>> > > > >>> wrote: > > > >>> > > > >>>> I am wondering about "I create internal topic manually" -- which > > > topics > > > >>>> do you refer in detail? > > > >>>> > > > >>>> Kafka Streams create all kind of internal topics with > auto-generated > > > >>>> names. So it would be quite tricky to create all of them manually > > > >>>> (especially because you need to know those name in advance). > > > >>>> > > > >>>> IRRC, if a topic does exist, Kafka Streams does no change it's > > > >>>> configuration. Only if Kafka Streams does create a topic, it will > > > >>>> specify certain config parameters on topic create step. > > > >>>> > > > >>>> > > > >>>> -Matthias > > > >>>> > > > >>>> > > > >>>> > > > >>>> On 12/13/16 8:16 PM, Sachin Mittal wrote: > > > >>>>> Hi, > > > >>>>> Thanks for the explanation. This illustration makes it super easy > > to > > > >>>>> understand how until works. Perhaps we can update the wiki with > > this > > > >>>>> illustration. > > > >>>>> It is basically the retention time for a past window. > > > >>>>> I used to think until creates all the future windows for that > > period > > > >> and > > > >>>>> when time passes that it used to delete all the past windows. > > However > > > >>>>> actually until retains a window for specified time. This makes so > > > much > > > >>>> more > > > >>>>> sense. > > > >>>>> > > > >>>>> I just had one pending query regarding: > > > >>>>> > > > >>>>>> windowstore.changelog.additional.retention.ms > > > >>>>> > > > >>>>> How does this relate to rentention.ms param of topic config? > > > >>>>> I create internal topic manually using say rentention.ms > =3600000. > > > >>>>> In next release (post kafka_2.10-0.10.0.1) since we support > delete > > of > > > >>>>> internal changelog topic as well and I want it to be retained for > > say > > > >>>> just > > > >>>>> 1 hour. > > > >>>>> So how does that above parameter interfere with this topic level > > > >> setting. > > > >>>>> Or now I just need to set above config as 3600000 and not add > > > >>>>> rentention.ms=3600000 > > > >>>>> while creating internal topic. > > > >>>>> > > > >>>>> Thanks > > > >>>>> Sachin > > > >>>>> > > > >>>>> > > > >>>>> On Tue, Dec 13, 2016 at 11:27 PM, Matthias J. Sax < > > > >> matth...@confluent.io > > > >>>>> > > > >>>>> wrote: > > > >>>>> > > > >>>>>> First, windows are only created if there is actual data for a > > > window. > > > >> So > > > >>>>>> you get windows [0, 50), [25, 75), [50, 100) only if there are > > > record > > > >>>>>> falling into each window (btw: window start-time is inclusive > > while > > > >>>>>> window end time is exclusive). If you have only 2 record with > lets > > > say > > > >>>>>> ts=20 and ts=90 you will not have an open window [25,75). Each > > > window > > > >> is > > > >>>>>> physically created each time the first record for it is > processed. > > > >>>>>> > > > >>>>>> If you have above 4 windows and a record with ts=101 arrives, a > > new > > > >>>>>> window [101,151) will be created. Window [0,50) will not be > > deleted > > > >> yet, > > > >>>>>> because retention is 100 and thus Streams guarantees that all > > record > > > >>>>>> with ts >= 1 (= 101 - 100) are still processed correctly and > those > > > >>>>>> records would fall into window [0,50). > > > >>>>>> > > > >>>>>> Thus, window [0,50) can be dropped, if time advanced to TS = > 150, > > > but > > > >>>>>> not before that. > > > >>>>>> > > > >>>>>> -Matthias > > > >>>>>> > > > >>>>>> > > > >>>>>> On 12/13/16 12:06 AM, Sachin Mittal wrote: > > > >>>>>>> Hi, > > > >>>>>>> So is until for future or past? > > > >>>>>>> Say I get first record at t = 0 and until is 100 and my window > > size > > > >> is > > > >>>> 50 > > > >>>>>>> advance by 25. > > > >>>>>>> I understand it will create windows (0, 50), (25, 75), (50, > 100) > > > >>>>>>> Now at t = 101 it will drop > > > >>>>>>> (0, 50), (25, 75), (50, 100) and create > > > >>>>>>> (101, 150), (125, 175), (150, 200) > > > >>>>>>> > > > >>>>>>> Please confirm if this understanding us correct. It is not > clear > > > how > > > >> it > > > >>>>>>> will handle overlapping windows (75, 125) and (175, 225) and so > > on? > > > >>>>>>> > > > >>>>>>> What case is not clear again is that at say t = 102 I get some > > > >> message > > > >>>>>> with > > > >>>>>>> timestamp 99. What happens then? > > > >>>>>>> Will the result added to previous aggregation of (50, 100) or > > (75, > > > >>>> 125), > > > >>>>>>> like it should. > > > >>>>>>> > > > >>>>>>> Or it will recreate the old window (50, 100) and aggregate the > > > value > > > >>>>>> there > > > >>>>>>> and then drop it. This would result is wrong aggregated value, > as > > > it > > > >>>> does > > > >>>>>>> not consider the previous aggregated values. > > > >>>>>>> > > > >>>>>>> So this is the pressing case I am not able to understand. > Maybe I > > > am > > > >>>>>> wrong > > > >>>>>>> at some basic understanding. > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> Next for > > > >>>>>>> The parameter > > > >>>>>>>> windowstore.changelog.additional.retention.ms > > > >>>>>>> > > > >>>>>>> How does this relate to rentention.ms param of topic config? > > > >>>>>>> I create internal topic manually using say rentention.ms > > =3600000. > > > >>>>>>> In next release (post kafka_2.10-0.10.0.1) since we support > > delete > > > of > > > >>>>>>> internal changelog topic as well and I want it to be retained > for > > > say > > > >>>>>> just > > > >>>>>>> 1 hour. > > > >>>>>>> So how does that above parameter interfere with this topic > level > > > >>>> setting. > > > >>>>>>> Or now I just need to set above config as 3600000 and not add > > > >>>>>>> rentention.ms=3600000 > > > >>>>>>> while creating internal topic. > > > >>>>>>> This is just another doubt remaining here. > > > >>>>>>> > > > >>>>>>> Thanks > > > >>>>>>> Sachin > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax < > > > >>>> matth...@confluent.io> > > > >>>>>>> wrote: > > > >>>>>>> > > > >>>>>>>> Sachin, > > > >>>>>>>> > > > >>>>>>>> There is no reason to have an .until() AND a .retain() -- just > > > >>>> increase > > > >>>>>>>> the value of .until() > > > >>>>>>>> > > > >>>>>>>> If you have a window of let's say 1h size and you set .until() > > > also > > > >> to > > > >>>>>>>> 1h -- you can obviously not process any late arriving data. If > > you > > > >> set > > > >>>>>>>> until() to 2h is this example, you can process data that is up > > to > > > 1h > > > >>>>>>>> delayed. > > > >>>>>>>> > > > >>>>>>>> So basically, the retention should always be larger than you > > > window > > > >>>>>> size. > > > >>>>>>>> > > > >>>>>>>> The parameter > > > >>>>>>>>> windowstore.changelog.additional.retention.ms > > > >>>>>>>> > > > >>>>>>>> is applies to changelog topics that backup window state > stores. > > > >> Those > > > >>>>>>>> changelog topics are compacted. However, the used key does > > encode > > > an > > > >>>>>>>> window ID and thus older data can never be cleaned up by > > > compaction. > > > >>>>>>>> Therefore, an additional retention time is applied to those > > > topics, > > > >>>> too. > > > >>>>>>>> Thus, if an old window is not updated for this amount of time, > > it > > > >> will > > > >>>>>>>> get deleted eventually preventing this topic to grown > > infinitely. > > > >>>>>>>> > > > >>>>>>>> The value will be determined by until(), i.e., whatever you > > > specify > > > >> in > > > >>>>>>>> .until() will be used to set this parameter. > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> -Matthias > > > >>>>>>>> > > > >>>>>>>> On 12/12/16 1:07 AM, Sachin Mittal wrote: > > > >>>>>>>>> Hi, > > > >>>>>>>>> We are facing the exact problem as described by Matthias > above. > > > >>>>>>>>> We are keeping default until which is 1 day. > > > >>>>>>>>> > > > >>>>>>>>> Our record's times tamp extractor has a field which increases > > > with > > > >>>>>> time. > > > >>>>>>>>> However for short time we cannot guarantee the time stamp is > > > always > > > >>>>>>>>> increases. So at the boundary ie after 24 hrs we can get > > records > > > >>>> which > > > >>>>>>>> are > > > >>>>>>>>> beyond that windows retention period. > > > >>>>>>>>> > > > >>>>>>>>> Then it happens like it is mentioned above and our > aggregation > > > >> fails. > > > >>>>>>>>> > > > >>>>>>>>> So just to sum up when we get record > > > >>>>>>>>> 24h + 1 sec (it deletes older window and since the new record > > > >> belongs > > > >>>>>> to > > > >>>>>>>>> the new window its gets created) > > > >>>>>>>>> Now when we get next record of 24 hs - 1 sec since older > window > > > is > > > >>>>>>>> dropped > > > >>>>>>>>> it does not get aggregated in that bucket. > > > >>>>>>>>> > > > >>>>>>>>> I suggest we have another setting next to until call retain > > which > > > >>>>>> retains > > > >>>>>>>>> the older windows into next window. > > > >>>>>>>>> > > > >>>>>>>>> I think at stream window boundary level it should use a > concept > > > of > > > >>>>>>>> sliding > > > >>>>>>>>> window. So we can define window like > > > >>>>>>>>> > > > >>>>>>>>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 * > > > >>>>>>>> 1000l).untill(7 > > > >>>>>>>>> * 24 * 3600 * 1000l).retain(900 * 1000l) > > > >>>>>>>>> > > > >>>>>>>>> So after 7 days it retains the data covered by windows in > last > > 15 > > > >>>>>> minutes > > > >>>>>>>>> which rolls over the data in them to next window. This way > > > streams > > > >>>> work > > > >>>>>>>>> continuously. > > > >>>>>>>>> > > > >>>>>>>>> Please let us know your thoughts on this. > > > >>>>>>>>> > > > >>>>>>>>> On another side question on this there is a setting: > > > >>>>>>>>> > > > >>>>>>>>> windowstore.changelog.additional.retention.ms > > > >>>>>>>>> I is not clear what is does. Is this the default for until? > > > >>>>>>>>> > > > >>>>>>>>> Thanks > > > >>>>>>>>> Sachin > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax < > > > >>>>>> matth...@confluent.io > > > >>>>>>>>> > > > >>>>>>>>> wrote: > > > >>>>>>>>> > > > >>>>>>>>>> Windows are created on demand, ie, each time a new record > > > arrives > > > >>>> and > > > >>>>>>>>>> there is no window yet for it, a new window will get > created. > > > >>>>>>>>>> > > > >>>>>>>>>> Windows are accepting data until their retention time (that > > you > > > >> can > > > >>>>>>>>>> configure via .until()) passed. Thus, you will have many > > windows > > > >>>> being > > > >>>>>>>>>> open in parallel. > > > >>>>>>>>>> > > > >>>>>>>>>> If you read older data, they will just be put into the > > > >> corresponding > > > >>>>>>>>>> windows (as long as window retention time did not pass). If > a > > > >> window > > > >>>>>> was > > > >>>>>>>>>> discarded already, a new window with this single (later > > > arriving) > > > >>>>>> record > > > >>>>>>>>>> will get created, the computation will be triggered, you > get a > > > >>>> result, > > > >>>>>>>>>> and afterwards the window is deleted again (as it's > retention > > > time > > > >>>>>>>>>> passed already). > > > >>>>>>>>>> > > > >>>>>>>>>> The retention time is driven by "stream-time", in internal > > > tracked > > > >>>>>> time > > > >>>>>>>>>> that only progressed in forward direction. It gets it value > > from > > > >> the > > > >>>>>>>>>> timestamps provided by TimestampExtractor -- thus, per > default > > > it > > > >>>> will > > > >>>>>>>>>> be event-time. > > > >>>>>>>>>> > > > >>>>>>>>>> -Matthias > > > >>>>>>>>>> > > > >>>>>>>>>> On 12/11/16 3:47 PM, Jon Yeargers wrote: > > > >>>>>>>>>>> I've read this and still have more questions than answers. > If > > > my > > > >>>> data > > > >>>>>>>>>> skips > > > >>>>>>>>>>> about (timewise) what determines when a given window will > > > start / > > > >>>>>> stop > > > >>>>>>>>>>> accepting new data? What if Im reading data from some time > > ago? > > > >>>>>>>>>>> > > > >>>>>>>>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax < > > > >>>>>>>> matth...@confluent.io> > > > >>>>>>>>>>> wrote: > > > >>>>>>>>>>> > > > >>>>>>>>>>>> Please have a look here: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> http://docs.confluent.io/current/streams/developer- > > > >>>>>>>>>>>> guide.html#windowing-a-stream > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> If you have further question, just follow up :) > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> -Matthias > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote: > > > >>>>>>>>>>>>> Ive added the 'until()' clause to some aggregation steps > > and > > > >> it's > > > >>>>>>>>>> working > > > >>>>>>>>>>>>> wonders for keeping the size of the state store in useful > > > >>>>>>>> boundaries... > > > >>>>>>>>>>>> But > > > >>>>>>>>>>>>> Im not 100% clear on how it works. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> What is implied by the '.until()' clause? What determines > > > when > > > >> to > > > >>>>>>>> stop > > > >>>>>>>>>>>>> receiving further data - is it clock time (since the > window > > > was > > > >>>>>>>>>> created)? > > > >>>>>>>>>>>>> It seems problematic for it to refer to EventTime as this > > may > > > >>>>>> bounce > > > >>>>>>>>>> all > > > >>>>>>>>>>>>> over the place. For non-overlapping windows a given > record > > > can > > > >>>> only > > > >>>>>>>>>> fall > > > >>>>>>>>>>>>> into a single aggregation period - so when would a value > > get > > > >>>>>>>> discarded? > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 * > > > >>>>>>>>>>>> 1000L).until(10 * > > > >>>>>>>>>>>>> 1000L))' - but what is this accomplishing? > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>> > > > >>>> > > > >>>> > > > >>> > > > >> > > > >> > > > > > > > > > > > > >