Thanks again,

yeah we saw that example for sure :)

Ok, gonna try low-level Transformer and see how it goes.


On Mon, Apr 9, 2018 at 9:17 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> For (1), no.
>
> If you want to do manual put/get you should use a Transformer and
> implement a custom operator.
>
> Btw: here is an example of TopN:
> https://github.com/confluentinc/kafka-streams-
> examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/
> TopArticlesExampleDriver.java
>
>
>
> -Matthias
>
> On 4/9/18 4:46 AM, Dmitriy Vsekhvalnov wrote:
> > Hi Matthias, thanks
> >
> > clarifications below:
> >
> > 1. .aggregate( () -> .. ,
> >                        (k, v, agg) -> {
> >                            //Can i access KV store here for manual
> put/get?
> >                       });
> >
> > 2. TopN is not hard, we using pretty much same approach you describe,
> just
> > with bounded priority queue.  The problematic part with 'remaining
> count' -
> > everything else not in topN records. It appeared to be quite complex in
> > streaming world (or we missed something). I'll try to illustrate,
> assuming
> > simplified event flow:
> >
> >  - acme.com: 100 hits  -> too small, not in TopN, we adding it to
> remaining
> > count
> >  - ... some time later....
> >  - acme.com: 150 hits -> still too small, adding to remaining count
> >
> > Problem: we added 250 hits to remaining, but actually we had to add only
> > 150 hits. We have to subtract previous count and it means we need to keep
> > them all somewhere. That's where we hope access to KV store can help.
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > On Sat, Apr 7, 2018 at 10:11 PM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >>>> ok, question then - is it possible to use state store with
> .aggregate()?
> >>
> >> Not sure what you exactly mean by this. An aggregations always uses a
> >> store; it's a stateful operation and cannot be computed without a store.
> >>
> >> For TopN, if you get the hit-count as input, you can use a
> >> `.aggregate()` operator that uses an array or list out output -- this
> >> list contains the topN and each time, aggregate() is called, you check
> >> if the new count replaces and existing count in the array/list.
> >>
> >>
> >> -Matthias
> >>
> >> On 4/6/18 10:36 AM, Dmitriy Vsekhvalnov wrote:
> >>> Thanks guys,
> >>>
> >>> ok, question then - is it possible to use state store with
> .aggregate()?
> >>>
> >>> Here are some details on counting, we basically looking for TopN +
> >>> Remaining calculation.
> >>>
> >>> Example:
> >>>
> >>> - incoming data: api url -> hit count
> >>>
> >>> - we want output: Top 20 urls per each domain per hour + remaining
> count
> >>> per domain (e.g. sum of all other urls hits that do not belong to top
> 10
> >>> per each domain per hour).
> >>>
> >>> With some grouping variations.
> >>>
> >>> Make some sense? Always open for better ideas :)
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> On Fri, Apr 6, 2018 at 8:21 PM, Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >>>
> >>>> Hello Dmitriy,
> >>>>
> >>>> You can "simulate" an lower-level processor API by 1) adding the
> stores
> >> you
> >>>> need via the builder#addStore(); 2) do a manual "through" call after
> >>>> "selectKey" (the selected key will be the same as your original
> groupBy
> >>>> call), and then from the repartitioned stream add the `transform()`
> >>>> operator to do manual windowed counting.
> >>>>
> >>>> But before you really go into this route, I'd first like to validate
> if
> >> the
> >>>> provided `Aggregate`, `Initialize` functions really cannot meet your
> >>>> "overcomplicated
> >>>> version of record counting", could you elaborate a bit more on this
> >> logic
> >>>> so maybe we can still around it around with the pure high-level DSL?
> >>>>
> >>>>
> >>>> Guozhang
> >>>>
> >>>>
> >>>> On Fri, Apr 6, 2018 at 8:49 AM, Dmitriy Vsekhvalnov <
> >>>> dvsekhval...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Hey, good day everyone,
> >>>>>
> >>>>> another kafka-streams friday question.
> >>>>>
> >>>>> We hit the wall with DSL implementation and would like to try
> low-level
> >>>>> Processor API.
> >>>>>
> >>>>> What we looking for is to:
> >>>>>   - repartition incoming source stream via grouping records by some
> >>>> fields
> >>>>> + windowed (hourly, daily, e.t.c).
> >>>>>    - and then apply custom Processor on grouped data.
> >>>>>
> >>>>> Processor gonna do some overcomplicated version of record counting
> and
> >>>> need
> >>>>> persistent KV state store access.
> >>>>>
> >>>>> The issue - neither KGroupedStream nor TimeWindowedKStream provides
> api
> >>>> to
> >>>>> hook processor into topology.
> >>>>>
> >>>>> Just to show some code:
> >>>>>
> >>>>> in.groupBy((key, value) -> .....)
> >>>>>    .windowedBy(Hourly)
> >>>>>    .transform(Processor) // Missing this one?
> >>>>>
> >>>>>
> >>>>> What our options to combine both? We were thinking that we can
> >>>> re-implement
> >>>>> grouping with low-level API after investigating source code, but
> looks
> >>>> like
> >>>>> overkill.
> >>>>>
> >>>>> Thank you.
> >>>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> -- Guozhang
> >>>>
> >>>
> >>
> >>
> >
>
>

Reply via email to