Yes that is one of the methods. It will be available on the 0.10.2 release which is due at the beginning of February.
On Mon, 19 Dec 2016 at 12:17 Sachin Mittal <sjmit...@gmail.com> wrote: > I believe you are talking about this method. > public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final > Initializer<T> initializer, > final > Aggregator<K, V, T> aggregator, > final > Windows<W> windows, > final > StateStoreSupplier<WindowStore> storeSupplier) > > Will this api be part of next release? > > I can go about using this, however if in StateStoreSupplier we add some api > to update the logConfig, then we can pass all the topic level props as part > of streams config directly. > > Thanks > Sachin > > > > On Mon, Dec 19, 2016 at 5:32 PM, Damian Guy <damian....@gmail.com> wrote: > > > Hi Sachin, > > > > I think we have a way of doing what you want already. If you create a > > custom state store you can call the enableLogging method and pass in any > > configuration parameters you want: For example: > > > > final StateStoreSupplier supplier = Stores.create("store") > > .withKeys(Serdes.String()) > > .withValues(Serdes.String()) > > .persistent() > > .enableLogging(Collections.singletonMap("retention.ms", "1000")) > > .build(); > > > > You can then use the overloaded methods in the DSL to pass in the > > StateStoreSupplier to your aggregates (trunk only) > > > > > > On Mon, 19 Dec 2016 at 10:58 Sachin Mittal <sjmit...@gmail.com> wrote: > > > > > Hi, > > > I am working towards adding topic configs as part of streams config. > > > However I have run into an issue: > > > Code flow is like this > > > > > > KStreamBuilder builder = new KStreamBuilder(); > > > builder.stream(...) > > > ... > > > KafkaStreams streams = new KafkaStreams(builder, streamsProps); > > > streams.start(); > > > > > > So we can see we build the topology before building the streams. > > > While building topology it assigns state store. > > > That time no topic config props are available. > > > > > > So it creates the supplier with empty topic config. > > > > > > Further StateStoreSupplier has method just to get the config and not to > > > update it. > > > Map<String, Object> logConfig() > > > > > > One way to implement this is change this interface to be able to update > > the > > > log config props too. > > > And we the props are available to streams we update the topology > > builder's > > > state stores too with updated config. > > > > > > Other way is to change the KStreamBuilder and make it pass the topic > > > config. > > > However in second approach we would be splitting the streams config > into > > > two parts. > > > > > > Let me know how should one proceed with this. > > > > > > Thanks > > > Sachin > > > > > > > > > > > > On Thu, Dec 15, 2016 at 2:27 PM, Matthias J. Sax < > matth...@confluent.io> > > > wrote: > > > > > > > I agree. We got already multiple request to add an API for specifying > > > > topic parameters for internal topic... I am pretty sure we will add > it > > > > if time permits -- feel free to contribute this new feature! > > > > > > > > About chancing the value of until: that does not work, as the > changelog > > > > topic configuration would not be updated. > > > > > > > > > > > > -Matthias > > > > > > > > On 12/14/16 8:22 PM, Sachin Mittal wrote: > > > > > Hi, > > > > > I suggest to include topic config as well as part of streams config > > > > > properties like we do for producer and consumer configs. > > > > > The topic config supplied would be used for creating internal > > changelog > > > > > topics along with certain additional configs which are applied by > > > > default. > > > > > > > > > > This way we don't have to ever create internal topics manually. > > > > > > > > > > I had one doubt regarding until. > > > > > Say I specify one value and run my streams app. > > > > > Now I stop the app, specify different value and re start the app. > > > > > > > > > > Which value for retain would the old (pre existing) windows use. > > Would > > > it > > > > > be the older value or the new value? > > > > > > > > > > Thanks > > > > > Sachin > > > > > > > > > > > > > > > > > > > > On Thu, Dec 15, 2016 at 12:26 AM, Matthias J. Sax < > > > matth...@confluent.io > > > > > > > > > > wrote: > > > > > > > > > >> Understood. Makes sense. > > > > >> > > > > >> For this, you should apply Streams configs manually when creating > > > those > > > > >> topics. For retention parameter, use the value you specify in > > > > >> corresponding .until() method for it. > > > > >> > > > > >> > > > > >> -Matthias > > > > >> > > > > >> > > > > >> On 12/14/16 10:08 AM, Sachin Mittal wrote: > > > > >>> I was referring to internal change log topic. I had to create > them > > > > >> manually > > > > >>> because in some case the message size of these topic were greater > > > than > > > > >> the > > > > >>> default ones used by kafka streams. > > > > >>> > > > > >>> I think someone in this group recommended to create these topic > > > > >> manually. I > > > > >>> understand that it is better to have internal topics created by > > > streams > > > > >> app > > > > >>> and I will take a second look at these and see if that can be > done. > > > > >>> > > > > >>> I just wanted to make sure what all configs are applied to > internal > > > > >> topics > > > > >>> in order to decide to avoid them creating manually. > > > > >>> > > > > >>> Thanks > > > > >>> Sachin > > > > >>> > > > > >>> > > > > >>> On Wed, Dec 14, 2016 at 11:08 PM, Matthias J. Sax < > > > > matth...@confluent.io > > > > >>> > > > > >>> wrote: > > > > >>> > > > > >>>> I am wondering about "I create internal topic manually" -- which > > > > topics > > > > >>>> do you refer in detail? > > > > >>>> > > > > >>>> Kafka Streams create all kind of internal topics with > > auto-generated > > > > >>>> names. So it would be quite tricky to create all of them > manually > > > > >>>> (especially because you need to know those name in advance). > > > > >>>> > > > > >>>> IRRC, if a topic does exist, Kafka Streams does no change it's > > > > >>>> configuration. Only if Kafka Streams does create a topic, it > will > > > > >>>> specify certain config parameters on topic create step. > > > > >>>> > > > > >>>> > > > > >>>> -Matthias > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> On 12/13/16 8:16 PM, Sachin Mittal wrote: > > > > >>>>> Hi, > > > > >>>>> Thanks for the explanation. This illustration makes it super > easy > > > to > > > > >>>>> understand how until works. Perhaps we can update the wiki with > > > this > > > > >>>>> illustration. > > > > >>>>> It is basically the retention time for a past window. > > > > >>>>> I used to think until creates all the future windows for that > > > period > > > > >> and > > > > >>>>> when time passes that it used to delete all the past windows. > > > However > > > > >>>>> actually until retains a window for specified time. This makes > so > > > > much > > > > >>>> more > > > > >>>>> sense. > > > > >>>>> > > > > >>>>> I just had one pending query regarding: > > > > >>>>> > > > > >>>>>> windowstore.changelog.additional.retention.ms > > > > >>>>> > > > > >>>>> How does this relate to rentention.ms param of topic config? > > > > >>>>> I create internal topic manually using say rentention.ms > > =3600000. > > > > >>>>> In next release (post kafka_2.10-0.10.0.1) since we support > > delete > > > of > > > > >>>>> internal changelog topic as well and I want it to be retained > for > > > say > > > > >>>> just > > > > >>>>> 1 hour. > > > > >>>>> So how does that above parameter interfere with this topic > level > > > > >> setting. > > > > >>>>> Or now I just need to set above config as 3600000 and not add > > > > >>>>> rentention.ms=3600000 > > > > >>>>> while creating internal topic. > > > > >>>>> > > > > >>>>> Thanks > > > > >>>>> Sachin > > > > >>>>> > > > > >>>>> > > > > >>>>> On Tue, Dec 13, 2016 at 11:27 PM, Matthias J. Sax < > > > > >> matth...@confluent.io > > > > >>>>> > > > > >>>>> wrote: > > > > >>>>> > > > > >>>>>> First, windows are only created if there is actual data for a > > > > window. > > > > >> So > > > > >>>>>> you get windows [0, 50), [25, 75), [50, 100) only if there are > > > > record > > > > >>>>>> falling into each window (btw: window start-time is inclusive > > > while > > > > >>>>>> window end time is exclusive). If you have only 2 record with > > lets > > > > say > > > > >>>>>> ts=20 and ts=90 you will not have an open window [25,75). Each > > > > window > > > > >> is > > > > >>>>>> physically created each time the first record for it is > > processed. > > > > >>>>>> > > > > >>>>>> If you have above 4 windows and a record with ts=101 arrives, > a > > > new > > > > >>>>>> window [101,151) will be created. Window [0,50) will not be > > > deleted > > > > >> yet, > > > > >>>>>> because retention is 100 and thus Streams guarantees that all > > > record > > > > >>>>>> with ts >= 1 (= 101 - 100) are still processed correctly and > > those > > > > >>>>>> records would fall into window [0,50). > > > > >>>>>> > > > > >>>>>> Thus, window [0,50) can be dropped, if time advanced to TS = > > 150, > > > > but > > > > >>>>>> not before that. > > > > >>>>>> > > > > >>>>>> -Matthias > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> On 12/13/16 12:06 AM, Sachin Mittal wrote: > > > > >>>>>>> Hi, > > > > >>>>>>> So is until for future or past? > > > > >>>>>>> Say I get first record at t = 0 and until is 100 and my > window > > > size > > > > >> is > > > > >>>> 50 > > > > >>>>>>> advance by 25. > > > > >>>>>>> I understand it will create windows (0, 50), (25, 75), (50, > > 100) > > > > >>>>>>> Now at t = 101 it will drop > > > > >>>>>>> (0, 50), (25, 75), (50, 100) and create > > > > >>>>>>> (101, 150), (125, 175), (150, 200) > > > > >>>>>>> > > > > >>>>>>> Please confirm if this understanding us correct. It is not > > clear > > > > how > > > > >> it > > > > >>>>>>> will handle overlapping windows (75, 125) and (175, 225) and > so > > > on? > > > > >>>>>>> > > > > >>>>>>> What case is not clear again is that at say t = 102 I get > some > > > > >> message > > > > >>>>>> with > > > > >>>>>>> timestamp 99. What happens then? > > > > >>>>>>> Will the result added to previous aggregation of (50, 100) or > > > (75, > > > > >>>> 125), > > > > >>>>>>> like it should. > > > > >>>>>>> > > > > >>>>>>> Or it will recreate the old window (50, 100) and aggregate > the > > > > value > > > > >>>>>> there > > > > >>>>>>> and then drop it. This would result is wrong aggregated > value, > > as > > > > it > > > > >>>> does > > > > >>>>>>> not consider the previous aggregated values. > > > > >>>>>>> > > > > >>>>>>> So this is the pressing case I am not able to understand. > > Maybe I > > > > am > > > > >>>>>> wrong > > > > >>>>>>> at some basic understanding. > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> Next for > > > > >>>>>>> The parameter > > > > >>>>>>>> windowstore.changelog.additional.retention.ms > > > > >>>>>>> > > > > >>>>>>> How does this relate to rentention.ms param of topic config? > > > > >>>>>>> I create internal topic manually using say rentention.ms > > > =3600000. > > > > >>>>>>> In next release (post kafka_2.10-0.10.0.1) since we support > > > delete > > > > of > > > > >>>>>>> internal changelog topic as well and I want it to be retained > > for > > > > say > > > > >>>>>> just > > > > >>>>>>> 1 hour. > > > > >>>>>>> So how does that above parameter interfere with this topic > > level > > > > >>>> setting. > > > > >>>>>>> Or now I just need to set above config as 3600000 and not add > > > > >>>>>>> rentention.ms=3600000 > > > > >>>>>>> while creating internal topic. > > > > >>>>>>> This is just another doubt remaining here. > > > > >>>>>>> > > > > >>>>>>> Thanks > > > > >>>>>>> Sachin > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax < > > > > >>>> matth...@confluent.io> > > > > >>>>>>> wrote: > > > > >>>>>>> > > > > >>>>>>>> Sachin, > > > > >>>>>>>> > > > > >>>>>>>> There is no reason to have an .until() AND a .retain() -- > just > > > > >>>> increase > > > > >>>>>>>> the value of .until() > > > > >>>>>>>> > > > > >>>>>>>> If you have a window of let's say 1h size and you set > .until() > > > > also > > > > >> to > > > > >>>>>>>> 1h -- you can obviously not process any late arriving data. > If > > > you > > > > >> set > > > > >>>>>>>> until() to 2h is this example, you can process data that is > up > > > to > > > > 1h > > > > >>>>>>>> delayed. > > > > >>>>>>>> > > > > >>>>>>>> So basically, the retention should always be larger than you > > > > window > > > > >>>>>> size. > > > > >>>>>>>> > > > > >>>>>>>> The parameter > > > > >>>>>>>>> windowstore.changelog.additional.retention.ms > > > > >>>>>>>> > > > > >>>>>>>> is applies to changelog topics that backup window state > > stores. > > > > >> Those > > > > >>>>>>>> changelog topics are compacted. However, the used key does > > > encode > > > > an > > > > >>>>>>>> window ID and thus older data can never be cleaned up by > > > > compaction. > > > > >>>>>>>> Therefore, an additional retention time is applied to those > > > > topics, > > > > >>>> too. > > > > >>>>>>>> Thus, if an old window is not updated for this amount of > time, > > > it > > > > >> will > > > > >>>>>>>> get deleted eventually preventing this topic to grown > > > infinitely. > > > > >>>>>>>> > > > > >>>>>>>> The value will be determined by until(), i.e., whatever you > > > > specify > > > > >> in > > > > >>>>>>>> .until() will be used to set this parameter. > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> -Matthias > > > > >>>>>>>> > > > > >>>>>>>> On 12/12/16 1:07 AM, Sachin Mittal wrote: > > > > >>>>>>>>> Hi, > > > > >>>>>>>>> We are facing the exact problem as described by Matthias > > above. > > > > >>>>>>>>> We are keeping default until which is 1 day. > > > > >>>>>>>>> > > > > >>>>>>>>> Our record's times tamp extractor has a field which > increases > > > > with > > > > >>>>>> time. > > > > >>>>>>>>> However for short time we cannot guarantee the time stamp > is > > > > always > > > > >>>>>>>>> increases. So at the boundary ie after 24 hrs we can get > > > records > > > > >>>> which > > > > >>>>>>>> are > > > > >>>>>>>>> beyond that windows retention period. > > > > >>>>>>>>> > > > > >>>>>>>>> Then it happens like it is mentioned above and our > > aggregation > > > > >> fails. > > > > >>>>>>>>> > > > > >>>>>>>>> So just to sum up when we get record > > > > >>>>>>>>> 24h + 1 sec (it deletes older window and since the new > record > > > > >> belongs > > > > >>>>>> to > > > > >>>>>>>>> the new window its gets created) > > > > >>>>>>>>> Now when we get next record of 24 hs - 1 sec since older > > window > > > > is > > > > >>>>>>>> dropped > > > > >>>>>>>>> it does not get aggregated in that bucket. > > > > >>>>>>>>> > > > > >>>>>>>>> I suggest we have another setting next to until call retain > > > which > > > > >>>>>> retains > > > > >>>>>>>>> the older windows into next window. > > > > >>>>>>>>> > > > > >>>>>>>>> I think at stream window boundary level it should use a > > concept > > > > of > > > > >>>>>>>> sliding > > > > >>>>>>>>> window. So we can define window like > > > > >>>>>>>>> > > > > >>>>>>>>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 * > > > > >>>>>>>> 1000l).untill(7 > > > > >>>>>>>>> * 24 * 3600 * 1000l).retain(900 * 1000l) > > > > >>>>>>>>> > > > > >>>>>>>>> So after 7 days it retains the data covered by windows in > > last > > > 15 > > > > >>>>>> minutes > > > > >>>>>>>>> which rolls over the data in them to next window. This way > > > > streams > > > > >>>> work > > > > >>>>>>>>> continuously. > > > > >>>>>>>>> > > > > >>>>>>>>> Please let us know your thoughts on this. > > > > >>>>>>>>> > > > > >>>>>>>>> On another side question on this there is a setting: > > > > >>>>>>>>> > > > > >>>>>>>>> windowstore.changelog.additional.retention.ms > > > > >>>>>>>>> I is not clear what is does. Is this the default for until? > > > > >>>>>>>>> > > > > >>>>>>>>> Thanks > > > > >>>>>>>>> Sachin > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax < > > > > >>>>>> matth...@confluent.io > > > > >>>>>>>>> > > > > >>>>>>>>> wrote: > > > > >>>>>>>>> > > > > >>>>>>>>>> Windows are created on demand, ie, each time a new record > > > > arrives > > > > >>>> and > > > > >>>>>>>>>> there is no window yet for it, a new window will get > > created. > > > > >>>>>>>>>> > > > > >>>>>>>>>> Windows are accepting data until their retention time > (that > > > you > > > > >> can > > > > >>>>>>>>>> configure via .until()) passed. Thus, you will have many > > > windows > > > > >>>> being > > > > >>>>>>>>>> open in parallel. > > > > >>>>>>>>>> > > > > >>>>>>>>>> If you read older data, they will just be put into the > > > > >> corresponding > > > > >>>>>>>>>> windows (as long as window retention time did not pass). > If > > a > > > > >> window > > > > >>>>>> was > > > > >>>>>>>>>> discarded already, a new window with this single (later > > > > arriving) > > > > >>>>>> record > > > > >>>>>>>>>> will get created, the computation will be triggered, you > > get a > > > > >>>> result, > > > > >>>>>>>>>> and afterwards the window is deleted again (as it's > > retention > > > > time > > > > >>>>>>>>>> passed already). > > > > >>>>>>>>>> > > > > >>>>>>>>>> The retention time is driven by "stream-time", in internal > > > > tracked > > > > >>>>>> time > > > > >>>>>>>>>> that only progressed in forward direction. It gets it > value > > > from > > > > >> the > > > > >>>>>>>>>> timestamps provided by TimestampExtractor -- thus, per > > default > > > > it > > > > >>>> will > > > > >>>>>>>>>> be event-time. > > > > >>>>>>>>>> > > > > >>>>>>>>>> -Matthias > > > > >>>>>>>>>> > > > > >>>>>>>>>> On 12/11/16 3:47 PM, Jon Yeargers wrote: > > > > >>>>>>>>>>> I've read this and still have more questions than > answers. > > If > > > > my > > > > >>>> data > > > > >>>>>>>>>> skips > > > > >>>>>>>>>>> about (timewise) what determines when a given window will > > > > start / > > > > >>>>>> stop > > > > >>>>>>>>>>> accepting new data? What if Im reading data from some > time > > > ago? > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax < > > > > >>>>>>>> matth...@confluent.io> > > > > >>>>>>>>>>> wrote: > > > > >>>>>>>>>>> > > > > >>>>>>>>>>>> Please have a look here: > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> http://docs.confluent.io/current/streams/developer- > > > > >>>>>>>>>>>> guide.html#windowing-a-stream > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> If you have further question, just follow up :) > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> -Matthias > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote: > > > > >>>>>>>>>>>>> Ive added the 'until()' clause to some aggregation > steps > > > and > > > > >> it's > > > > >>>>>>>>>> working > > > > >>>>>>>>>>>>> wonders for keeping the size of the state store in > useful > > > > >>>>>>>> boundaries... > > > > >>>>>>>>>>>> But > > > > >>>>>>>>>>>>> Im not 100% clear on how it works. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> What is implied by the '.until()' clause? What > determines > > > > when > > > > >> to > > > > >>>>>>>> stop > > > > >>>>>>>>>>>>> receiving further data - is it clock time (since the > > window > > > > was > > > > >>>>>>>>>> created)? > > > > >>>>>>>>>>>>> It seems problematic for it to refer to EventTime as > this > > > may > > > > >>>>>> bounce > > > > >>>>>>>>>> all > > > > >>>>>>>>>>>>> over the place. For non-overlapping windows a given > > record > > > > can > > > > >>>> only > > > > >>>>>>>>>> fall > > > > >>>>>>>>>>>>> into a single aggregation period - so when would a > value > > > get > > > > >>>>>>>> discarded? > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Im using 'groupByKey(),aggregate(..., > TimeWindows.of(60 * > > > > >>>>>>>>>>>> 1000L).until(10 * > > > > >>>>>>>>>>>>> 1000L))' - but what is this accomplishing? > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>> > > > > >>>>>> > > > > >>>>>> > > > > >>>>> > > > > >>>> > > > > >>>> > > > > >>> > > > > >> > > > > >> > > > > > > > > > > > > > > > > > > >