Thanks guys,

ok, question then - is it possible to use state store with .aggregate()?

Here are some details on counting, we basically looking for TopN +
Remaining calculation.

Example:

- incoming data: api url -> hit count

- we want output: Top 20 urls per each domain per hour + remaining count
per domain (e.g. sum of all other urls hits that do not belong to top 10
per each domain per hour).

With some grouping variations.

Make some sense? Always open for better ideas :)







On Fri, Apr 6, 2018 at 8:21 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Dmitriy,
>
> You can "simulate" an lower-level processor API by 1) adding the stores you
> need via the builder#addStore(); 2) do a manual "through" call after
> "selectKey" (the selected key will be the same as your original groupBy
> call), and then from the repartitioned stream add the `transform()`
> operator to do manual windowed counting.
>
> But before you really go into this route, I'd first like to validate if the
> provided `Aggregate`, `Initialize` functions really cannot meet your
> "overcomplicated
> version of record counting", could you elaborate a bit more on this logic
> so maybe we can still around it around with the pure high-level DSL?
>
>
> Guozhang
>
>
> On Fri, Apr 6, 2018 at 8:49 AM, Dmitriy Vsekhvalnov <
> dvsekhval...@gmail.com>
> wrote:
>
> > Hey, good day everyone,
> >
> > another kafka-streams friday question.
> >
> > We hit the wall with DSL implementation and would like to try low-level
> > Processor API.
> >
> > What we looking for is to:
> >   - repartition incoming source stream via grouping records by some
> fields
> > + windowed (hourly, daily, e.t.c).
> >    - and then apply custom Processor on grouped data.
> >
> > Processor gonna do some overcomplicated version of record counting and
> need
> > persistent KV state store access.
> >
> > The issue - neither KGroupedStream nor TimeWindowedKStream provides api
> to
> > hook processor into topology.
> >
> > Just to show some code:
> >
> > in.groupBy((key, value) -> .....)
> >    .windowedBy(Hourly)
> >    .transform(Processor) // Missing this one?
> >
> >
> > What our options to combine both? We were thinking that we can
> re-implement
> > grouping with low-level API after investigating source code, but looks
> like
> > overkill.
> >
> > Thank you.
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to