Thinking about this once more (and also having a fresh memory of another
thread about KTables), I am wondering if this approach needs some extra
tuning:

As the result of the first window aggregation produces an output stream
with unbounded key space, the following (non-windowed) KTables would
grow indefinitely, if I don't miss anything.

Thus, it might be required to put a transform() that only forwards all
data 1-to-1, but additionally registers a punctuation schedule. When
punctuation is called, it would be required to send tombstone messages
downstream (or a simliar) that deletes windows that are older than the
retention time. Sound tricky to implement though... `transform()` would
need to keep track of used keys to send appropriate tombstones in an
custom state. Also. `transform` is only available for KStream and
transforming (windowed) KTable into KStream into KTable while preserving
the required semantics seems not to be straight forwards.

Any thoughts about this potential issue?


-Matthias


On 5/8/17 3:05 PM, Garrett Barton wrote:
> Michael,
>   This is slick!  I am still writing unit tests to verify it.  My code
> looks something like:
> 
> KTable<Windowed<String>, CountSumMinMaxAvgObj> oneMinuteWindowed =
> srcStream    // my val object isnt really called that, just wanted to show
> a sample set of calculations the value can do!
>     .groupByKey(Serdes.String(), Serdes.Double())
>     .aggregate(/*initializer */, /* aggregator */, TimeWindows.of(60*1000,
> 60*1000), "store1m");
> 
>    // i used an aggregate here so I could have a non-primitive value object
> that does the calculations on each aggregator, pojo has an .add(Double) in
> it.
> KTable<Tuple2<String, Long>, CountSumMinMaxAvgObj> fiveMinuteWindowed =
> oneMinuteWindowed    // I made my own Tuple2, will move window calc into it
>     .groupBy( (windowedKey, value) -> new KeyValue<>(new Tuple2<String,
> Long>(windowedKey.key(), windowedKey.window().end() /1000/60/5 *1000*60*5),
> value, keySerde, valSerde)
> 
>         // the above rounds time down to a timestamp divisible by 5 minutes
> 
>     .reduce(/*your adder*/, /*your subtractor*/, "store5m");
> 
>         // where your subtractor can be as simple as (val, agg) -> agg - val
> for primitive types or as complex as you need,
> 
>         // just make sure you get the order right (lesson hard learnt ;) ),
> subtraction is not commutative!
> 
>         // again my val object has an .add(Obj) and a .sub() to handle
> this, so nice!
> 
> 
> KTable<Tuple2<String, Long>, CountSumMinMaxAvgObj> fifteenMinuteWindowed =
> fiveMinuteWindowed
> 
>     .groupBy( (keyPair, value) -> new KeyValue<>(new Tuple2(keyPair._1,
> keyPair._2 /1000/60/15 *1000*60*15), value, keySerde, valSerde)
> 
>         // the above rounds time down to a timestamp divisible by 15 minutes
> 
>     .reduce(/*your adder*/, /*your subtractor*/, "store15m");
> 
> 
> KTable<Tuple2<String, Long>, CountSumMinMaxAvgObj> sixtyMinuteWindowed =
> fifteeenMinuteWindowed
> 
>     .groupBy( (keyPair, value) -> new KeyValue<>(new Tuple2(keyPairair._1,
> pair._2 /1000/60/60 *1000*60*60), value, keySerde, valSerde)
> 
>         // the above rounds time down to a timestamp divisible by 60 minutes
> 
>     .reduce(/*your adder*/, /*your subtractor*/, "store60m");
> 
> 
> Notes thus far:
>   Doesn't look like I need to start the 5min with a windowed KTable return
> object, it starts with the regular KTable<Tuple2<String,Long>> in this case.
>   I thinking about using windowedKey.window().start() instead of end() as I
> believe that is more consistent with what the windows themselves put out.
> They go into the stores bound by their start time I believe.
>   Serdes gets nuts as well as the Generic typing on some of these classes
> (yea you KeyValueMapper), makes for long code!  I had to specify them
> everywhere since the key/val's changed.
> 
> 
> I didn't get enough time to mess with it today, I will wrap up the unit
> tests and run it to see how it performs against my real data as well
> tomorrow.  I expect a huge reduction in resources (both streams and kafka
> storage) by moving to this.
> Thank you!
> 
> 
> 
> On Mon, May 8, 2017 at 5:26 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> Michal,
>>
>> that's an interesting idea. In an ideal world, Kafka Streams should have
>> an optimizer that is able to to this automatically under the hood. Too
>> bad we are not there yet.
>>
>> @Garret: did you try this out?
>>
>> This seems to be a question that might affect many users, and it might
>> we worth to document it somewhere as a recommended pattern.
>>
>>
>> -Matthias
>>
>>
>> On 5/8/17 1:43 AM, Michal Borowiecki wrote:
>>> Apologies,
>>>
>>> In the code snippet of course only oneMinuteWindowed KTable will have a
>>> Windowed key (KTable<Windowed<Key>, Value>), all others would be just
>>> KTable<Tuple2<Key, Long>, Value>.
>>>
>>> MichaƂ
>>>
>>> On 07/05/17 16:09, Michal Borowiecki wrote:
>>>>
>>>> Hi Garrett,
>>>>
>>>> I've encountered a similar challenge in a project I'm working on (it's
>>>> still work in progress, so please take my suggestions with a grain of
>>>> salt).
>>>>
>>>> Yes, I believe KTable.groupBy lets you accomplish what you are aiming
>>>> for with something like the following (same snippet attached as txt
>> file):
>>>>
>>>>
>>>> KTable<Windowed<Key>, Value> oneMinuteWindowed = yourStream    //
>>>> where Key and Value stand for your actual key and value types
>>>>
>>>>     .groupByKey()
>>>>
>>>>     .reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000),
>> "store1m");
>>>>
>>>>         //where your adder can be as simple as (val, agg) -> agg + val
>>>>
>>>>         //for primitive types or as complex as you need
>>>>
>>>>
>>>> KTable<Windowed<Tuple2<Key, Long>>, Value> fiveMinuteWindowed =
>>>> oneMinuteWindowed    // Tuple2 for this example as defined by
>>>> javaslang library
>>>>
>>>>     .groupBy( (windowedKey, value) -> new KeyValue<>(new
>>>> Tuple2<>(windowedKey.key(), windowedKey.window().end() /1000/60/5
>>>> *1000*60*5), value)
>>>>
>>>>         // the above rounds time down to a timestamp divisible by 5
>>>> minutes
>>>>
>>>>     .reduce(/*your adder*/, /*your subtractor*/, "store5m");
>>>>
>>>>         // where your subtractor can be as simple as (val, agg) -> agg
>>>> - valfor primitive types or as complex as you need,
>>>>
>>>>         // just make sure you get the order right (lesson hard learnt
>>>> ;) ), subtraction is not commutative!
>>>>
>>>>
>>>> KTable<Windowed<Tuple2<Key, Long>>, Value> fifteenMinuteWindowed =
>>>> fiveMinuteWindowed
>>>>
>>>>     .groupBy( (keyPair, value) -> new KeyValue<>(new
>>>> Tuple2(keyPair._1, keyPair._2/1000/60/15 *1000*60*15), value)
>>>>
>>>>         // the above rounds time down to a timestamp divisible by 15
>>>> minutes
>>>>
>>>>     .reduce(/*your adder*/, /*your subtractor*/, "store15m");
>>>>
>>>>
>>>> KTable<Windowed<Tuple2<Key, Long>>, Value> sixtyMinuteWindowed =
>>>> fifteeenMinuteWindowed
>>>>
>>>>     .groupBy( (keyPair, value) -> new KeyValue<>(new
>>>> Tuple2(keyPairair._1, pair._2 /1000/60/60 *1000*60*60), value)
>>>>
>>>>         // the above rounds time down to a timestamp divisible by 5
>>>> minutes
>>>>
>>>>     .reduce(/*your adder*/, /*your subtractor*/, "store60m");
>>>>
>>>>
>>>> So, step by step:
>>>>
>>>>   * You use a windowed aggregation only once, from there on you use
>>>>     the KTable abstraction only (which doesn't have windowed
>>>>     aggregations).
>>>>   * In each subsequent groupBy you map the key to a pair of
>>>>     (your-real-key, timestamp) where the timestamp is rounded down
>>>>     with the precision of the size of the new window.
>>>>   * reduce() on a KGroupedTable takes an adder and a subtractor and it
>>>>     will correctly update the new aggregate by first subtracting the
>>>>     previous value of the upstream record before adding the new value
>>>>     (this way, just as you said, the downstream is aware of the
>>>>     statefulness of the upstream and correctly treats each record as
>>>>     an update)
>>>>   * If you want to reduce message volume further, you can break these
>>>>     into separate KafkaStreams instances and configure downstream ones
>>>>     with a higher commit.interval.ms (unfortunately you can't have
>>>>     different values of this setting in different places of the same
>>>>     topology I'm afraid)
>>>>   * TODO: Look into retention policies, I haven't investigated that in
>>>>     any detail.
>>>>
>>>> I haven't tested this exact code, so please excuse any typos.
>>>>
>>>> Also, if someone with more experience could chip in and check if I'm
>>>> not talking nonsense here, or if there's an easier way to this, that
>>>> would be great.
>>>>
>>>>
>>>> I don't know if the alternative approach is possible, where you
>>>> convert each resulting KTable back into a stream and just do a
>>>> windowed aggregation somehow. That would feel more natural, but I
>>>> haven't figured out how to correctly window over a changelog in the
>>>> KStream abstraction, feels impossible in the high-level DSL.
>>>>
>>>> Hope that helps,
>>>> Michal
>>>>
>>>> On 02/05/17 18:03, Garrett Barton wrote:
>>>>> Lets say I want to sum values over increasing window sizes of 1,5,15,60
>>>>> minutes.  Right now I have them running in parallel, meaning if I am
>>>>> producing 1k/sec records I am consuming 4k/sec to feed each
>> calculation.
>>>>> In reality I am calculating far more than sum, and in this pattern I'm
>>>>> looking at something like (producing rate)*(calculations)*(windows)
>> for a
>>>>> consumption rate.
>>>>>
>>>>>  So I had the idea, could I feed the 1 minute window into the 5
>> minute, and
>>>>> 5 into 15, and 15 into 60.  Theoretically I would consume a fraction
>> of the
>>>>> records, not have to scale as huge and be back to something like
>> (producing
>>>>> rate)*(calculations)+(updates).
>>>>>
>>>>>   Thinking this is an awesome idea I went to try and implement it and
>> got
>>>>> twisted around.  These are windowed grouping operations that produce
>>>>> KTables, which means instead of a raw stream I have an update stream.
>> To
>>>>> me this implies that downstream must be aware of this and consume
>> stateful
>>>>> information, knowing that each record is an update and not an in
>> addition
>>>>> to.  Does the high level api handle that construct and let me do
>> that?  For
>>>>> a simple sum it would have to hold each of the latest values for say
>> the 5
>>>>> 1 minute sum's in a given window, to perform the 5 minute sum.
>> Reading the
>>>>> docs which are awesome, I cannot determine if the KTable.groupby()
>> would
>>>>> work over a window, and would reduce or aggregate thus do what I need?
>>>>>
>>>>> Any ideas?
>>>>>
>>>>
>>>> --
>>>> Signature
>>>> <http://www.openbet.com/>    Michal Borowiecki
>>>> Senior Software Engineer L4
>>>>      T:      +44 208 742 1600
>>>>
>>>>
>>>>      +44 203 249 8448
>>>>
>>>>
>>>>
>>>>      E:      michal.borowie...@openbet.com
>>>>      W:      www.openbet.com <http://www.openbet.com/>
>>>>
>>>>
>>>>      OpenBet Ltd
>>>>
>>>>      Chiswick Park Building 9
>>>>
>>>>      566 Chiswick High Rd
>>>>
>>>>      London
>>>>
>>>>      W4 5XT
>>>>
>>>>      UK
>>>>
>>>>
>>>> <https://www.openbet.com/email_promo>
>>>>
>>>> This message is confidential and intended only for the addressee. If
>>>> you have received this message in error, please immediately notify the
>>>> postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it
>>>> from your system as well as any copies. The content of e-mails as well
>>>> as traffic data may be monitored by OpenBet for employment and
>>>> security purposes. To protect the environment please do not print this
>>>> e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park
>>>> Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A
>>>> company registered in England and Wales. Registered no. 3134634. VAT
>>>> no. GB927523612
>>>>
>>>
>>> --
>>> Signature
>>> <http://www.openbet.com/>     Michal Borowiecki
>>> Senior Software Engineer L4
>>>       T:      +44 208 742 1600
>>>
>>>
>>>       +44 203 249 8448
>>>
>>>
>>>
>>>       E:      michal.borowie...@openbet.com
>>>       W:      www.openbet.com <http://www.openbet.com/>
>>>
>>>
>>>       OpenBet Ltd
>>>
>>>       Chiswick Park Building 9
>>>
>>>       566 Chiswick High Rd
>>>
>>>       London
>>>
>>>       W4 5XT
>>>
>>>       UK
>>>
>>>
>>> <https://www.openbet.com/email_promo>
>>>
>>> This message is confidential and intended only for the addressee. If you
>>> have received this message in error, please immediately notify the
>>> postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it
>>> from your system as well as any copies. The content of e-mails as well
>>> as traffic data may be monitored by OpenBet for employment and security
>>> purposes. To protect the environment please do not print this e-mail
>>> unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building
>>> 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company
>>> registered in England and Wales. Registered no. 3134634. VAT no.
>>> GB927523612
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to