Hello Dmitriy,

You can "simulate" an lower-level processor API by 1) adding the stores you
need via the builder#addStore(); 2) do a manual "through" call after
"selectKey" (the selected key will be the same as your original groupBy
call), and then from the repartitioned stream add the `transform()`
operator to do manual windowed counting.

But before you really go into this route, I'd first like to validate if the
provided `Aggregate`, `Initialize` functions really cannot meet your
"overcomplicated
version of record counting", could you elaborate a bit more on this logic
so maybe we can still around it around with the pure high-level DSL?


Guozhang


On Fri, Apr 6, 2018 at 8:49 AM, Dmitriy Vsekhvalnov <dvsekhval...@gmail.com>
wrote:

> Hey, good day everyone,
>
> another kafka-streams friday question.
>
> We hit the wall with DSL implementation and would like to try low-level
> Processor API.
>
> What we looking for is to:
>   - repartition incoming source stream via grouping records by some fields
> + windowed (hourly, daily, e.t.c).
>    - and then apply custom Processor on grouped data.
>
> Processor gonna do some overcomplicated version of record counting and need
> persistent KV state store access.
>
> The issue - neither KGroupedStream nor TimeWindowedKStream provides api to
> hook processor into topology.
>
> Just to show some code:
>
> in.groupBy((key, value) -> .....)
>    .windowedBy(Hourly)
>    .transform(Processor) // Missing this one?
>
>
> What our options to combine both? We were thinking that we can re-implement
> grouping with low-level API after investigating source code, but looks like
> overkill.
>
> Thank you.
>



-- 
-- Guozhang

Reply via email to