Michal, that's an interesting idea. In an ideal world, Kafka Streams should have an optimizer that is able to to this automatically under the hood. Too bad we are not there yet.
@Garret: did you try this out? This seems to be a question that might affect many users, and it might we worth to document it somewhere as a recommended pattern. -Matthias On 5/8/17 1:43 AM, Michal Borowiecki wrote: > Apologies, > > In the code snippet of course only oneMinuteWindowed KTable will have a > Windowed key (KTable<Windowed<Key>, Value>), all others would be just > KTable<Tuple2<Key, Long>, Value>. > > MichaĆ > > On 07/05/17 16:09, Michal Borowiecki wrote: >> >> Hi Garrett, >> >> I've encountered a similar challenge in a project I'm working on (it's >> still work in progress, so please take my suggestions with a grain of >> salt). >> >> Yes, I believe KTable.groupBy lets you accomplish what you are aiming >> for with something like the following (same snippet attached as txt file): >> >> >> KTable<Windowed<Key>, Value> oneMinuteWindowed = yourStream // >> where Key and Value stand for your actual key and value types >> >> .groupByKey() >> >> .reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000), "store1m"); >> >> //where your adder can be as simple as (val, agg) -> agg + val >> >> //for primitive types or as complex as you need >> >> >> KTable<Windowed<Tuple2<Key, Long>>, Value> fiveMinuteWindowed = >> oneMinuteWindowed // Tuple2 for this example as defined by >> javaslang library >> >> .groupBy( (windowedKey, value) -> new KeyValue<>(new >> Tuple2<>(windowedKey.key(), windowedKey.window().end() /1000/60/5 >> *1000*60*5), value) >> >> // the above rounds time down to a timestamp divisible by 5 >> minutes >> >> .reduce(/*your adder*/, /*your subtractor*/, "store5m"); >> >> // where your subtractor can be as simple as (val, agg) -> agg >> - valfor primitive types or as complex as you need, >> >> // just make sure you get the order right (lesson hard learnt >> ;) ), subtraction is not commutative! >> >> >> KTable<Windowed<Tuple2<Key, Long>>, Value> fifteenMinuteWindowed = >> fiveMinuteWindowed >> >> .groupBy( (keyPair, value) -> new KeyValue<>(new >> Tuple2(keyPair._1, keyPair._2/1000/60/15 *1000*60*15), value) >> >> // the above rounds time down to a timestamp divisible by 15 >> minutes >> >> .reduce(/*your adder*/, /*your subtractor*/, "store15m"); >> >> >> KTable<Windowed<Tuple2<Key, Long>>, Value> sixtyMinuteWindowed = >> fifteeenMinuteWindowed >> >> .groupBy( (keyPair, value) -> new KeyValue<>(new >> Tuple2(keyPairair._1, pair._2 /1000/60/60 *1000*60*60), value) >> >> // the above rounds time down to a timestamp divisible by 5 >> minutes >> >> .reduce(/*your adder*/, /*your subtractor*/, "store60m"); >> >> >> So, step by step: >> >> * You use a windowed aggregation only once, from there on you use >> the KTable abstraction only (which doesn't have windowed >> aggregations). >> * In each subsequent groupBy you map the key to a pair of >> (your-real-key, timestamp) where the timestamp is rounded down >> with the precision of the size of the new window. >> * reduce() on a KGroupedTable takes an adder and a subtractor and it >> will correctly update the new aggregate by first subtracting the >> previous value of the upstream record before adding the new value >> (this way, just as you said, the downstream is aware of the >> statefulness of the upstream and correctly treats each record as >> an update) >> * If you want to reduce message volume further, you can break these >> into separate KafkaStreams instances and configure downstream ones >> with a higher commit.interval.ms (unfortunately you can't have >> different values of this setting in different places of the same >> topology I'm afraid) >> * TODO: Look into retention policies, I haven't investigated that in >> any detail. >> >> I haven't tested this exact code, so please excuse any typos. >> >> Also, if someone with more experience could chip in and check if I'm >> not talking nonsense here, or if there's an easier way to this, that >> would be great. >> >> >> I don't know if the alternative approach is possible, where you >> convert each resulting KTable back into a stream and just do a >> windowed aggregation somehow. That would feel more natural, but I >> haven't figured out how to correctly window over a changelog in the >> KStream abstraction, feels impossible in the high-level DSL. >> >> Hope that helps, >> Michal >> >> On 02/05/17 18:03, Garrett Barton wrote: >>> Lets say I want to sum values over increasing window sizes of 1,5,15,60 >>> minutes. Right now I have them running in parallel, meaning if I am >>> producing 1k/sec records I am consuming 4k/sec to feed each calculation. >>> In reality I am calculating far more than sum, and in this pattern I'm >>> looking at something like (producing rate)*(calculations)*(windows) for a >>> consumption rate. >>> >>> So I had the idea, could I feed the 1 minute window into the 5 minute, and >>> 5 into 15, and 15 into 60. Theoretically I would consume a fraction of the >>> records, not have to scale as huge and be back to something like (producing >>> rate)*(calculations)+(updates). >>> >>> Thinking this is an awesome idea I went to try and implement it and got >>> twisted around. These are windowed grouping operations that produce >>> KTables, which means instead of a raw stream I have an update stream. To >>> me this implies that downstream must be aware of this and consume stateful >>> information, knowing that each record is an update and not an in addition >>> to. Does the high level api handle that construct and let me do that? For >>> a simple sum it would have to hold each of the latest values for say the 5 >>> 1 minute sum's in a given window, to perform the 5 minute sum. Reading the >>> docs which are awesome, I cannot determine if the KTable.groupby() would >>> work over a window, and would reduce or aggregate thus do what I need? >>> >>> Any ideas? >>> >> >> -- >> Signature >> <http://www.openbet.com/> Michal Borowiecki >> Senior Software Engineer L4 >> T: +44 208 742 1600 >> >> >> +44 203 249 8448 >> >> >> >> E: michal.borowie...@openbet.com >> W: www.openbet.com <http://www.openbet.com/> >> >> >> OpenBet Ltd >> >> Chiswick Park Building 9 >> >> 566 Chiswick High Rd >> >> London >> >> W4 5XT >> >> UK >> >> >> <https://www.openbet.com/email_promo> >> >> This message is confidential and intended only for the addressee. If >> you have received this message in error, please immediately notify the >> postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it >> from your system as well as any copies. The content of e-mails as well >> as traffic data may be monitored by OpenBet for employment and >> security purposes. To protect the environment please do not print this >> e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park >> Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A >> company registered in England and Wales. Registered no. 3134634. VAT >> no. GB927523612 >> > > -- > Signature > <http://www.openbet.com/> Michal Borowiecki > Senior Software Engineer L4 > T: +44 208 742 1600 > > > +44 203 249 8448 > > > > E: michal.borowie...@openbet.com > W: www.openbet.com <http://www.openbet.com/> > > > OpenBet Ltd > > Chiswick Park Building 9 > > 566 Chiswick High Rd > > London > > W4 5XT > > UK > > > <https://www.openbet.com/email_promo> > > This message is confidential and intended only for the addressee. If you > have received this message in error, please immediately notify the > postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it > from your system as well as any copies. The content of e-mails as well > as traffic data may be monitored by OpenBet for employment and security > purposes. To protect the environment please do not print this e-mail > unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building > 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company > registered in England and Wales. Registered no. 3134634. VAT no. > GB927523612 >
signature.asc
Description: OpenPGP digital signature