Michael,
This is slick! I am still writing unit tests to verify it. My code
looks something like:
KTable<Windowed<String>, CountSumMinMaxAvgObj> oneMinuteWindowed =
srcStream // my val object isnt really called that, just wanted
to show
a sample set of calculations the value can do!
.groupByKey(Serdes.String(), Serdes.Double())
.aggregate(/*initializer */, /* aggregator */,
TimeWindows.of(60*1000,
60*1000), "store1m");
// i used an aggregate here so I could have a non-primitive
value object
that does the calculations on each aggregator, pojo has an
.add(Double) in
it.
KTable<Tuple2<String, Long>, CountSumMinMaxAvgObj> fiveMinuteWindowed =
oneMinuteWindowed // I made my own Tuple2, will move window calc
into it
.groupBy( (windowedKey, value) -> new KeyValue<>(new
Tuple2<String,
Long>(windowedKey.key(), windowedKey.window().end() /1000/60/5
*1000*60*5),
value, keySerde, valSerde)
// the above rounds time down to a timestamp divisible by 5
minutes
.reduce(/*your adder*/, /*your subtractor*/, "store5m");
// where your subtractor can be as simple as (val, agg) ->
agg - val
for primitive types or as complex as you need,
// just make sure you get the order right (lesson hard
learnt ;) ),
subtraction is not commutative!
// again my val object has an .add(Obj) and a .sub() to handle
this, so nice!
KTable<Tuple2<String, Long>, CountSumMinMaxAvgObj>
fifteenMinuteWindowed =
fiveMinuteWindowed
.groupBy( (keyPair, value) -> new KeyValue<>(new
Tuple2(keyPair._1,
keyPair._2 /1000/60/15 *1000*60*15), value, keySerde, valSerde)
// the above rounds time down to a timestamp divisible by
15 minutes
.reduce(/*your adder*/, /*your subtractor*/, "store15m");
KTable<Tuple2<String, Long>, CountSumMinMaxAvgObj>
sixtyMinuteWindowed =
fifteeenMinuteWindowed
.groupBy( (keyPair, value) -> new KeyValue<>(new
Tuple2(keyPairair._1,
pair._2 /1000/60/60 *1000*60*60), value, keySerde, valSerde)
// the above rounds time down to a timestamp divisible by
60 minutes
.reduce(/*your adder*/, /*your subtractor*/, "store60m");
Notes thus far:
Doesn't look like I need to start the 5min with a windowed KTable
return
object, it starts with the regular KTable<Tuple2<String,Long>> in
this case.
I thinking about using windowedKey.window().start() instead of
end() as I
believe that is more consistent with what the windows themselves put
out.
They go into the stores bound by their start time I believe.
Serdes gets nuts as well as the Generic typing on some of these
classes
(yea you KeyValueMapper), makes for long code! I had to specify them
everywhere since the key/val's changed.
I didn't get enough time to mess with it today, I will wrap up the unit
tests and run it to see how it performs against my real data as well
tomorrow. I expect a huge reduction in resources (both streams and
kafka
storage) by moving to this.
Thank you!
On Mon, May 8, 2017 at 5:26 PM, Matthias J. Sax <matth...@confluent.io>
wrote:
Michal,
that's an interesting idea. In an ideal world, Kafka Streams should
have
an optimizer that is able to to this automatically under the hood. Too
bad we are not there yet.
@Garret: did you try this out?
This seems to be a question that might affect many users, and it might
we worth to document it somewhere as a recommended pattern.
-Matthias
On 5/8/17 1:43 AM, Michal Borowiecki wrote:
Apologies,
In the code snippet of course only oneMinuteWindowed KTable will
have a
Windowed key (KTable<Windowed<Key>, Value>), all others would be just
KTable<Tuple2<Key, Long>, Value>.
Michał
On 07/05/17 16:09, Michal Borowiecki wrote:
Hi Garrett,
I've encountered a similar challenge in a project I'm working on
(it's
still work in progress, so please take my suggestions with a
grain of
salt).
Yes, I believe KTable.groupBy lets you accomplish what you are
aiming
for with something like the following (same snippet attached as txt
file):
KTable<Windowed<Key>, Value> oneMinuteWindowed = yourStream //
where Key and Value stand for your actual key and value types
.groupByKey()
.reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000),
"store1m");
//where your adder can be as simple as (val, agg) -> agg
+ val
//for primitive types or as complex as you need
KTable<Windowed<Tuple2<Key, Long>>, Value> fiveMinuteWindowed =
oneMinuteWindowed // Tuple2 for this example as defined by
javaslang library
.groupBy( (windowedKey, value) -> new KeyValue<>(new
Tuple2<>(windowedKey.key(), windowedKey.window().end() /1000/60/5
*1000*60*5), value)
// the above rounds time down to a timestamp divisible by 5
minutes
.reduce(/*your adder*/, /*your subtractor*/, "store5m");
// where your subtractor can be as simple as (val, agg)
-> agg
- valfor primitive types or as complex as you need,
// just make sure you get the order right (lesson hard
learnt
;) ), subtraction is not commutative!
KTable<Windowed<Tuple2<Key, Long>>, Value> fifteenMinuteWindowed =
fiveMinuteWindowed
.groupBy( (keyPair, value) -> new KeyValue<>(new
Tuple2(keyPair._1, keyPair._2/1000/60/15 *1000*60*15), value)
// the above rounds time down to a timestamp divisible
by 15
minutes
.reduce(/*your adder*/, /*your subtractor*/, "store15m");
KTable<Windowed<Tuple2<Key, Long>>, Value> sixtyMinuteWindowed =
fifteeenMinuteWindowed
.groupBy( (keyPair, value) -> new KeyValue<>(new
Tuple2(keyPairair._1, pair._2 /1000/60/60 *1000*60*60), value)
// the above rounds time down to a timestamp divisible by 5
minutes
.reduce(/*your adder*/, /*your subtractor*/, "store60m");
So, step by step:
* You use a windowed aggregation only once, from there on you use
the KTable abstraction only (which doesn't have windowed
aggregations).
* In each subsequent groupBy you map the key to a pair of
(your-real-key, timestamp) where the timestamp is rounded down
with the precision of the size of the new window.
* reduce() on a KGroupedTable takes an adder and a subtractor
and it
will correctly update the new aggregate by first subtracting
the
previous value of the upstream record before adding the new
value
(this way, just as you said, the downstream is aware of the
statefulness of the upstream and correctly treats each
record as
an update)
* If you want to reduce message volume further, you can break
these
into separate KafkaStreams instances and configure
downstream ones
with a higher commit.interval.ms (unfortunately you can't have
different values of this setting in different places of the
same
topology I'm afraid)
* TODO: Look into retention policies, I haven't investigated
that in
any detail.
I haven't tested this exact code, so please excuse any typos.
Also, if someone with more experience could chip in and check if I'm
not talking nonsense here, or if there's an easier way to this, that
would be great.
I don't know if the alternative approach is possible, where you
convert each resulting KTable back into a stream and just do a
windowed aggregation somehow. That would feel more natural, but I
haven't figured out how to correctly window over a changelog in the
KStream abstraction, feels impossible in the high-level DSL.
Hope that helps,
Michal
On 02/05/17 18:03, Garrett Barton wrote:
Lets say I want to sum values over increasing window sizes of
1,5,15,60
minutes. Right now I have them running in parallel, meaning if
I am
producing 1k/sec records I am consuming 4k/sec to feed each
calculation.
In reality I am calculating far more than sum, and in this
pattern I'm
looking at something like (producing rate)*(calculations)*(windows)
for a
consumption rate.
So I had the idea, could I feed the 1 minute window into the 5
minute, and
5 into 15, and 15 into 60. Theoretically I would consume a fraction
of the
records, not have to scale as huge and be back to something like
(producing
rate)*(calculations)+(updates).
Thinking this is an awesome idea I went to try and implement
it and
got
twisted around. These are windowed grouping operations that
produce
KTables, which means instead of a raw stream I have an update
stream.
To
me this implies that downstream must be aware of this and consume
stateful
information, knowing that each record is an update and not an in
addition
to. Does the high level api handle that construct and let me do
that? For
a simple sum it would have to hold each of the latest values for
say
the 5
1 minute sum's in a given window, to perform the 5 minute sum.
Reading the
docs which are awesome, I cannot determine if the KTable.groupby()
would
work over a window, and would reduce or aggregate thus do what I
need?
Any ideas?
--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T: +44 208 742 1600
+44 203 249 8448
E: michal.borowie...@openbet.com
W: www.openbet.com <http://www.openbet.com/>
OpenBet Ltd
Chiswick Park Building 9
566 Chiswick High Rd
London
W4 5XT
UK
<https://www.openbet.com/email_promo>
This message is confidential and intended only for the addressee. If
you have received this message in error, please immediately
notify the
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it
from your system as well as any copies. The content of e-mails as
well
as traffic data may be monitored by OpenBet for employment and
security purposes. To protect the environment please do not print
this
e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick
Park
Building 9, 566 Chiswick High Road, London, W4 5XT, United
Kingdom. A
company registered in England and Wales. Registered no. 3134634. VAT
no. GB927523612
--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T: +44 208 742 1600
+44 203 249 8448
E: michal.borowie...@openbet.com
W: www.openbet.com <http://www.openbet.com/>
OpenBet Ltd
Chiswick Park Building 9
566 Chiswick High Rd
London
W4 5XT
UK
<https://www.openbet.com/email_promo>
This message is confidential and intended only for the addressee.
If you
have received this message in error, please immediately notify the
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it
from your system as well as any copies. The content of e-mails as
well
as traffic data may be monitored by OpenBet for employment and
security
purposes. To protect the environment please do not print this e-mail
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park
Building
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company
registered in England and Wales. Registered no. 3134634. VAT no.
GB927523612