Michal,

that's an interesting idea. In an ideal world, Kafka Streams should have
an optimizer that is able to to this automatically under the hood. Too
bad we are not there yet.

@Garret: did you try this out?

This seems to be a question that might affect many users, and it might
we worth to document it somewhere as a recommended pattern.


-Matthias


On 5/8/17 1:43 AM, Michal Borowiecki wrote:
> Apologies,
> 
> In the code snippet of course only oneMinuteWindowed KTable will have a
> Windowed key (KTable<Windowed<Key>, Value>), all others would be just
> KTable<Tuple2<Key, Long>, Value>.
> 
> MichaƂ
> 
> On 07/05/17 16:09, Michal Borowiecki wrote:
>>
>> Hi Garrett,
>>
>> I've encountered a similar challenge in a project I'm working on (it's
>> still work in progress, so please take my suggestions with a grain of
>> salt).
>>
>> Yes, I believe KTable.groupBy lets you accomplish what you are aiming
>> for with something like the following (same snippet attached as txt file):
>>
>>
>> KTable<Windowed<Key>, Value> oneMinuteWindowed = yourStream    //
>> where Key and Value stand for your actual key and value types
>>
>>     .groupByKey()
>>
>>     .reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000), "store1m");
>>
>>         //where your adder can be as simple as (val, agg) -> agg + val
>>
>>         //for primitive types or as complex as you need
>>
>>
>> KTable<Windowed<Tuple2<Key, Long>>, Value> fiveMinuteWindowed =
>> oneMinuteWindowed    // Tuple2 for this example as defined by
>> javaslang library
>>
>>     .groupBy( (windowedKey, value) -> new KeyValue<>(new
>> Tuple2<>(windowedKey.key(), windowedKey.window().end() /1000/60/5
>> *1000*60*5), value)
>>
>>         // the above rounds time down to a timestamp divisible by 5
>> minutes
>>
>>     .reduce(/*your adder*/, /*your subtractor*/, "store5m");
>>
>>         // where your subtractor can be as simple as (val, agg) -> agg
>> - valfor primitive types or as complex as you need,
>>
>>         // just make sure you get the order right (lesson hard learnt
>> ;) ), subtraction is not commutative!
>>
>>
>> KTable<Windowed<Tuple2<Key, Long>>, Value> fifteenMinuteWindowed =
>> fiveMinuteWindowed
>>
>>     .groupBy( (keyPair, value) -> new KeyValue<>(new
>> Tuple2(keyPair._1, keyPair._2/1000/60/15 *1000*60*15), value)
>>
>>         // the above rounds time down to a timestamp divisible by 15
>> minutes
>>
>>     .reduce(/*your adder*/, /*your subtractor*/, "store15m");
>>
>>
>> KTable<Windowed<Tuple2<Key, Long>>, Value> sixtyMinuteWindowed =
>> fifteeenMinuteWindowed
>>
>>     .groupBy( (keyPair, value) -> new KeyValue<>(new
>> Tuple2(keyPairair._1, pair._2 /1000/60/60 *1000*60*60), value)
>>
>>         // the above rounds time down to a timestamp divisible by 5
>> minutes
>>
>>     .reduce(/*your adder*/, /*your subtractor*/, "store60m");
>>
>>
>> So, step by step:
>>
>>   * You use a windowed aggregation only once, from there on you use
>>     the KTable abstraction only (which doesn't have windowed
>>     aggregations).
>>   * In each subsequent groupBy you map the key to a pair of
>>     (your-real-key, timestamp) where the timestamp is rounded down
>>     with the precision of the size of the new window.
>>   * reduce() on a KGroupedTable takes an adder and a subtractor and it
>>     will correctly update the new aggregate by first subtracting the
>>     previous value of the upstream record before adding the new value
>>     (this way, just as you said, the downstream is aware of the
>>     statefulness of the upstream and correctly treats each record as
>>     an update)
>>   * If you want to reduce message volume further, you can break these
>>     into separate KafkaStreams instances and configure downstream ones
>>     with a higher commit.interval.ms (unfortunately you can't have
>>     different values of this setting in different places of the same
>>     topology I'm afraid)
>>   * TODO: Look into retention policies, I haven't investigated that in
>>     any detail.
>>
>> I haven't tested this exact code, so please excuse any typos.
>>
>> Also, if someone with more experience could chip in and check if I'm
>> not talking nonsense here, or if there's an easier way to this, that
>> would be great.
>>
>>
>> I don't know if the alternative approach is possible, where you
>> convert each resulting KTable back into a stream and just do a
>> windowed aggregation somehow. That would feel more natural, but I
>> haven't figured out how to correctly window over a changelog in the
>> KStream abstraction, feels impossible in the high-level DSL.
>>
>> Hope that helps,
>> Michal
>>
>> On 02/05/17 18:03, Garrett Barton wrote:
>>> Lets say I want to sum values over increasing window sizes of 1,5,15,60
>>> minutes.  Right now I have them running in parallel, meaning if I am
>>> producing 1k/sec records I am consuming 4k/sec to feed each calculation.
>>> In reality I am calculating far more than sum, and in this pattern I'm
>>> looking at something like (producing rate)*(calculations)*(windows) for a
>>> consumption rate.
>>>
>>>  So I had the idea, could I feed the 1 minute window into the 5 minute, and
>>> 5 into 15, and 15 into 60.  Theoretically I would consume a fraction of the
>>> records, not have to scale as huge and be back to something like (producing
>>> rate)*(calculations)+(updates).
>>>
>>>   Thinking this is an awesome idea I went to try and implement it and got
>>> twisted around.  These are windowed grouping operations that produce
>>> KTables, which means instead of a raw stream I have an update stream.  To
>>> me this implies that downstream must be aware of this and consume stateful
>>> information, knowing that each record is an update and not an in addition
>>> to.  Does the high level api handle that construct and let me do that?  For
>>> a simple sum it would have to hold each of the latest values for say the 5
>>> 1 minute sum's in a given window, to perform the 5 minute sum.  Reading the
>>> docs which are awesome, I cannot determine if the KTable.groupby() would
>>> work over a window, and would reduce or aggregate thus do what I need?
>>>
>>> Any ideas?
>>>
>>
>> -- 
>> Signature
>> <http://www.openbet.com/>    Michal Borowiecki
>> Senior Software Engineer L4
>>      T:      +44 208 742 1600
>>
>>      
>>      +44 203 249 8448
>>
>>      
>>       
>>      E:      michal.borowie...@openbet.com
>>      W:      www.openbet.com <http://www.openbet.com/>
>>
>>      
>>      OpenBet Ltd
>>
>>      Chiswick Park Building 9
>>
>>      566 Chiswick High Rd
>>
>>      London
>>
>>      W4 5XT
>>
>>      UK
>>
>>      
>> <https://www.openbet.com/email_promo>
>>
>> This message is confidential and intended only for the addressee. If
>> you have received this message in error, please immediately notify the
>> postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it
>> from your system as well as any copies. The content of e-mails as well
>> as traffic data may be monitored by OpenBet for employment and
>> security purposes. To protect the environment please do not print this
>> e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park
>> Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A
>> company registered in England and Wales. Registered no. 3134634. VAT
>> no. GB927523612
>>
> 
> -- 
> Signature
> <http://www.openbet.com/>     Michal Borowiecki
> Senior Software Engineer L4
>       T:      +44 208 742 1600
> 
>       
>       +44 203 249 8448
> 
>       
>        
>       E:      michal.borowie...@openbet.com
>       W:      www.openbet.com <http://www.openbet.com/>
> 
>       
>       OpenBet Ltd
> 
>       Chiswick Park Building 9
> 
>       566 Chiswick High Rd
> 
>       London
> 
>       W4 5XT
> 
>       UK
> 
>       
> <https://www.openbet.com/email_promo>
> 
> This message is confidential and intended only for the addressee. If you
> have received this message in error, please immediately notify the
> postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it
> from your system as well as any copies. The content of e-mails as well
> as traffic data may be monitored by OpenBet for employment and security
> purposes. To protect the environment please do not print this e-mail
> unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building
> 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company
> registered in England and Wales. Registered no. 3134634. VAT no.
> GB927523612
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to