KGroupedStream and TimeWindowedKStream are only logical representations
at DSL level. They don't really "do" anything.

Thus, you can mimic them as follows:

builder.addStore(...)
in.selectKey().through(...).transform(..., "storeName").

selectKey() set's the new key for the grouping and the through() does
the actual repartitioning. Note, that you need to create the topic you
specify in through() manually and with the desired number of partitions
before you start your application.

The transform() is doing the windowing and aggregation in a single
operator. If you want to do windowing, you might want to use the
WindowedStore implementation from the DSL.


Hope this helps.


-Matthias

On 4/6/18 8:49 AM, Dmitriy Vsekhvalnov wrote:
> Hey, good day everyone,
> 
> another kafka-streams friday question.
> 
> We hit the wall with DSL implementation and would like to try low-level
> Processor API.
> 
> What we looking for is to:
>   - repartition incoming source stream via grouping records by some fields
> + windowed (hourly, daily, e.t.c).
>    - and then apply custom Processor on grouped data.
> 
> Processor gonna do some overcomplicated version of record counting and need
> persistent KV state store access.
> 
> The issue - neither KGroupedStream nor TimeWindowedKStream provides api to
> hook processor into topology.
> 
> Just to show some code:
> 
> in.groupBy((key, value) -> .....)
>    .windowedBy(Hourly)
>    .transform(Processor) // Missing this one?
> 
> 
> What our options to combine both? We were thinking that we can re-implement
> grouping with low-level API after investigating source code, but looks like
> overkill.
> 
> Thank you.
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to