Hello Dmitriy, You can "simulate" an lower-level processor API by 1) adding the stores you need via the builder#addStore(); 2) do a manual "through" call after "selectKey" (the selected key will be the same as your original groupBy call), and then from the repartitioned stream add the `transform()` operator to do manual windowed counting.
But before you really go into this route, I'd first like to validate if the provided `Aggregate`, `Initialize` functions really cannot meet your "overcomplicated version of record counting", could you elaborate a bit more on this logic so maybe we can still around it around with the pure high-level DSL? Guozhang On Fri, Apr 6, 2018 at 8:49 AM, Dmitriy Vsekhvalnov <dvsekhval...@gmail.com> wrote: > Hey, good day everyone, > > another kafka-streams friday question. > > We hit the wall with DSL implementation and would like to try low-level > Processor API. > > What we looking for is to: > - repartition incoming source stream via grouping records by some fields > + windowed (hourly, daily, e.t.c). > - and then apply custom Processor on grouped data. > > Processor gonna do some overcomplicated version of record counting and need > persistent KV state store access. > > The issue - neither KGroupedStream nor TimeWindowedKStream provides api to > hook processor into topology. > > Just to show some code: > > in.groupBy((key, value) -> .....) > .windowedBy(Hourly) > .transform(Processor) // Missing this one? > > > What our options to combine both? We were thinking that we can re-implement > grouping with low-level API after investigating source code, but looks like > overkill. > > Thank you. > -- -- Guozhang