Thinking about this once more (and also having a fresh memory of another thread about KTables), I am wondering if this approach needs some extra tuning:
As the result of the first window aggregation produces an output stream with unbounded key space, the following (non-windowed) KTables would grow indefinitely, if I don't miss anything. Thus, it might be required to put a transform() that only forwards all data 1-to-1, but additionally registers a punctuation schedule. When punctuation is called, it would be required to send tombstone messages downstream (or a simliar) that deletes windows that are older than the retention time. Sound tricky to implement though... `transform()` would need to keep track of used keys to send appropriate tombstones in an custom state. Also. `transform` is only available for KStream and transforming (windowed) KTable into KStream into KTable while preserving the required semantics seems not to be straight forwards. Any thoughts about this potential issue? -Matthias On 5/8/17 3:05 PM, Garrett Barton wrote: > Michael, > This is slick! I am still writing unit tests to verify it. My code > looks something like: > > KTable<Windowed<String>, CountSumMinMaxAvgObj> oneMinuteWindowed = > srcStream // my val object isnt really called that, just wanted to show > a sample set of calculations the value can do! > .groupByKey(Serdes.String(), Serdes.Double()) > .aggregate(/*initializer */, /* aggregator */, TimeWindows.of(60*1000, > 60*1000), "store1m"); > > // i used an aggregate here so I could have a non-primitive value object > that does the calculations on each aggregator, pojo has an .add(Double) in > it. > KTable<Tuple2<String, Long>, CountSumMinMaxAvgObj> fiveMinuteWindowed = > oneMinuteWindowed // I made my own Tuple2, will move window calc into it > .groupBy( (windowedKey, value) -> new KeyValue<>(new Tuple2<String, > Long>(windowedKey.key(), windowedKey.window().end() /1000/60/5 *1000*60*5), > value, keySerde, valSerde) > > // the above rounds time down to a timestamp divisible by 5 minutes > > .reduce(/*your adder*/, /*your subtractor*/, "store5m"); > > // where your subtractor can be as simple as (val, agg) -> agg - val > for primitive types or as complex as you need, > > // just make sure you get the order right (lesson hard learnt ;) ), > subtraction is not commutative! > > // again my val object has an .add(Obj) and a .sub() to handle > this, so nice! > > > KTable<Tuple2<String, Long>, CountSumMinMaxAvgObj> fifteenMinuteWindowed = > fiveMinuteWindowed > > .groupBy( (keyPair, value) -> new KeyValue<>(new Tuple2(keyPair._1, > keyPair._2 /1000/60/15 *1000*60*15), value, keySerde, valSerde) > > // the above rounds time down to a timestamp divisible by 15 minutes > > .reduce(/*your adder*/, /*your subtractor*/, "store15m"); > > > KTable<Tuple2<String, Long>, CountSumMinMaxAvgObj> sixtyMinuteWindowed = > fifteeenMinuteWindowed > > .groupBy( (keyPair, value) -> new KeyValue<>(new Tuple2(keyPairair._1, > pair._2 /1000/60/60 *1000*60*60), value, keySerde, valSerde) > > // the above rounds time down to a timestamp divisible by 60 minutes > > .reduce(/*your adder*/, /*your subtractor*/, "store60m"); > > > Notes thus far: > Doesn't look like I need to start the 5min with a windowed KTable return > object, it starts with the regular KTable<Tuple2<String,Long>> in this case. > I thinking about using windowedKey.window().start() instead of end() as I > believe that is more consistent with what the windows themselves put out. > They go into the stores bound by their start time I believe. > Serdes gets nuts as well as the Generic typing on some of these classes > (yea you KeyValueMapper), makes for long code! I had to specify them > everywhere since the key/val's changed. > > > I didn't get enough time to mess with it today, I will wrap up the unit > tests and run it to see how it performs against my real data as well > tomorrow. I expect a huge reduction in resources (both streams and kafka > storage) by moving to this. > Thank you! > > > > On Mon, May 8, 2017 at 5:26 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> Michal, >> >> that's an interesting idea. In an ideal world, Kafka Streams should have >> an optimizer that is able to to this automatically under the hood. Too >> bad we are not there yet. >> >> @Garret: did you try this out? >> >> This seems to be a question that might affect many users, and it might >> we worth to document it somewhere as a recommended pattern. >> >> >> -Matthias >> >> >> On 5/8/17 1:43 AM, Michal Borowiecki wrote: >>> Apologies, >>> >>> In the code snippet of course only oneMinuteWindowed KTable will have a >>> Windowed key (KTable<Windowed<Key>, Value>), all others would be just >>> KTable<Tuple2<Key, Long>, Value>. >>> >>> MichaĆ >>> >>> On 07/05/17 16:09, Michal Borowiecki wrote: >>>> >>>> Hi Garrett, >>>> >>>> I've encountered a similar challenge in a project I'm working on (it's >>>> still work in progress, so please take my suggestions with a grain of >>>> salt). >>>> >>>> Yes, I believe KTable.groupBy lets you accomplish what you are aiming >>>> for with something like the following (same snippet attached as txt >> file): >>>> >>>> >>>> KTable<Windowed<Key>, Value> oneMinuteWindowed = yourStream // >>>> where Key and Value stand for your actual key and value types >>>> >>>> .groupByKey() >>>> >>>> .reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000), >> "store1m"); >>>> >>>> //where your adder can be as simple as (val, agg) -> agg + val >>>> >>>> //for primitive types or as complex as you need >>>> >>>> >>>> KTable<Windowed<Tuple2<Key, Long>>, Value> fiveMinuteWindowed = >>>> oneMinuteWindowed // Tuple2 for this example as defined by >>>> javaslang library >>>> >>>> .groupBy( (windowedKey, value) -> new KeyValue<>(new >>>> Tuple2<>(windowedKey.key(), windowedKey.window().end() /1000/60/5 >>>> *1000*60*5), value) >>>> >>>> // the above rounds time down to a timestamp divisible by 5 >>>> minutes >>>> >>>> .reduce(/*your adder*/, /*your subtractor*/, "store5m"); >>>> >>>> // where your subtractor can be as simple as (val, agg) -> agg >>>> - valfor primitive types or as complex as you need, >>>> >>>> // just make sure you get the order right (lesson hard learnt >>>> ;) ), subtraction is not commutative! >>>> >>>> >>>> KTable<Windowed<Tuple2<Key, Long>>, Value> fifteenMinuteWindowed = >>>> fiveMinuteWindowed >>>> >>>> .groupBy( (keyPair, value) -> new KeyValue<>(new >>>> Tuple2(keyPair._1, keyPair._2/1000/60/15 *1000*60*15), value) >>>> >>>> // the above rounds time down to a timestamp divisible by 15 >>>> minutes >>>> >>>> .reduce(/*your adder*/, /*your subtractor*/, "store15m"); >>>> >>>> >>>> KTable<Windowed<Tuple2<Key, Long>>, Value> sixtyMinuteWindowed = >>>> fifteeenMinuteWindowed >>>> >>>> .groupBy( (keyPair, value) -> new KeyValue<>(new >>>> Tuple2(keyPairair._1, pair._2 /1000/60/60 *1000*60*60), value) >>>> >>>> // the above rounds time down to a timestamp divisible by 5 >>>> minutes >>>> >>>> .reduce(/*your adder*/, /*your subtractor*/, "store60m"); >>>> >>>> >>>> So, step by step: >>>> >>>> * You use a windowed aggregation only once, from there on you use >>>> the KTable abstraction only (which doesn't have windowed >>>> aggregations). >>>> * In each subsequent groupBy you map the key to a pair of >>>> (your-real-key, timestamp) where the timestamp is rounded down >>>> with the precision of the size of the new window. >>>> * reduce() on a KGroupedTable takes an adder and a subtractor and it >>>> will correctly update the new aggregate by first subtracting the >>>> previous value of the upstream record before adding the new value >>>> (this way, just as you said, the downstream is aware of the >>>> statefulness of the upstream and correctly treats each record as >>>> an update) >>>> * If you want to reduce message volume further, you can break these >>>> into separate KafkaStreams instances and configure downstream ones >>>> with a higher commit.interval.ms (unfortunately you can't have >>>> different values of this setting in different places of the same >>>> topology I'm afraid) >>>> * TODO: Look into retention policies, I haven't investigated that in >>>> any detail. >>>> >>>> I haven't tested this exact code, so please excuse any typos. >>>> >>>> Also, if someone with more experience could chip in and check if I'm >>>> not talking nonsense here, or if there's an easier way to this, that >>>> would be great. >>>> >>>> >>>> I don't know if the alternative approach is possible, where you >>>> convert each resulting KTable back into a stream and just do a >>>> windowed aggregation somehow. That would feel more natural, but I >>>> haven't figured out how to correctly window over a changelog in the >>>> KStream abstraction, feels impossible in the high-level DSL. >>>> >>>> Hope that helps, >>>> Michal >>>> >>>> On 02/05/17 18:03, Garrett Barton wrote: >>>>> Lets say I want to sum values over increasing window sizes of 1,5,15,60 >>>>> minutes. Right now I have them running in parallel, meaning if I am >>>>> producing 1k/sec records I am consuming 4k/sec to feed each >> calculation. >>>>> In reality I am calculating far more than sum, and in this pattern I'm >>>>> looking at something like (producing rate)*(calculations)*(windows) >> for a >>>>> consumption rate. >>>>> >>>>> So I had the idea, could I feed the 1 minute window into the 5 >> minute, and >>>>> 5 into 15, and 15 into 60. Theoretically I would consume a fraction >> of the >>>>> records, not have to scale as huge and be back to something like >> (producing >>>>> rate)*(calculations)+(updates). >>>>> >>>>> Thinking this is an awesome idea I went to try and implement it and >> got >>>>> twisted around. These are windowed grouping operations that produce >>>>> KTables, which means instead of a raw stream I have an update stream. >> To >>>>> me this implies that downstream must be aware of this and consume >> stateful >>>>> information, knowing that each record is an update and not an in >> addition >>>>> to. Does the high level api handle that construct and let me do >> that? For >>>>> a simple sum it would have to hold each of the latest values for say >> the 5 >>>>> 1 minute sum's in a given window, to perform the 5 minute sum. >> Reading the >>>>> docs which are awesome, I cannot determine if the KTable.groupby() >> would >>>>> work over a window, and would reduce or aggregate thus do what I need? >>>>> >>>>> Any ideas? >>>>> >>>> >>>> -- >>>> Signature >>>> <http://www.openbet.com/> Michal Borowiecki >>>> Senior Software Engineer L4 >>>> T: +44 208 742 1600 >>>> >>>> >>>> +44 203 249 8448 >>>> >>>> >>>> >>>> E: michal.borowie...@openbet.com >>>> W: www.openbet.com <http://www.openbet.com/> >>>> >>>> >>>> OpenBet Ltd >>>> >>>> Chiswick Park Building 9 >>>> >>>> 566 Chiswick High Rd >>>> >>>> London >>>> >>>> W4 5XT >>>> >>>> UK >>>> >>>> >>>> <https://www.openbet.com/email_promo> >>>> >>>> This message is confidential and intended only for the addressee. If >>>> you have received this message in error, please immediately notify the >>>> postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it >>>> from your system as well as any copies. The content of e-mails as well >>>> as traffic data may be monitored by OpenBet for employment and >>>> security purposes. To protect the environment please do not print this >>>> e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park >>>> Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A >>>> company registered in England and Wales. Registered no. 3134634. VAT >>>> no. GB927523612 >>>> >>> >>> -- >>> Signature >>> <http://www.openbet.com/> Michal Borowiecki >>> Senior Software Engineer L4 >>> T: +44 208 742 1600 >>> >>> >>> +44 203 249 8448 >>> >>> >>> >>> E: michal.borowie...@openbet.com >>> W: www.openbet.com <http://www.openbet.com/> >>> >>> >>> OpenBet Ltd >>> >>> Chiswick Park Building 9 >>> >>> 566 Chiswick High Rd >>> >>> London >>> >>> W4 5XT >>> >>> UK >>> >>> >>> <https://www.openbet.com/email_promo> >>> >>> This message is confidential and intended only for the addressee. If you >>> have received this message in error, please immediately notify the >>> postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it >>> from your system as well as any copies. The content of e-mails as well >>> as traffic data may be monitored by OpenBet for employment and security >>> purposes. To protect the environment please do not print this e-mail >>> unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building >>> 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company >>> registered in England and Wales. Registered no. 3134634. VAT no. >>> GB927523612 >>> >> >> >
signature.asc
Description: OpenPGP digital signature