Also, if you want (or can tolerate) probabilistic counting, with the option
to also do TopN in that manner, we also have an example that uses Count Min
Sketch:
https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/test/scala/io/confluent/examples/streams/ProbabilisticCountingScalaIntegrationTest.scala

The link/branch above is the code variant for Kafka 1.0.

The example implements a custom (fault-tolerant) state store backed by CMS,
which is then used in a Transformer.  The Transformer is then plugged into
the DSL for a mix-and-match setup of DSL and Processor API.


On Mon, Apr 9, 2018 at 9:34 PM, Dmitriy Vsekhvalnov <dvsekhval...@gmail.com>
wrote:

> Thanks again,
>
> yeah we saw that example for sure :)
>
> Ok, gonna try low-level Transformer and see how it goes.
>
>
> On Mon, Apr 9, 2018 at 9:17 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > For (1), no.
> >
> > If you want to do manual put/get you should use a Transformer and
> > implement a custom operator.
> >
> > Btw: here is an example of TopN:
> > https://github.com/confluentinc/kafka-streams-
> > examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/
> > TopArticlesExampleDriver.java
> >
> >
> >
> > -Matthias
> >
> > On 4/9/18 4:46 AM, Dmitriy Vsekhvalnov wrote:
> > > Hi Matthias, thanks
> > >
> > > clarifications below:
> > >
> > > 1. .aggregate( () -> .. ,
> > >                        (k, v, agg) -> {
> > >                            //Can i access KV store here for manual
> > put/get?
> > >                       });
> > >
> > > 2. TopN is not hard, we using pretty much same approach you describe,
> > just
> > > with bounded priority queue.  The problematic part with 'remaining
> > count' -
> > > everything else not in topN records. It appeared to be quite complex in
> > > streaming world (or we missed something). I'll try to illustrate,
> > assuming
> > > simplified event flow:
> > >
> > >  - acme.com: 100 hits  -> too small, not in TopN, we adding it to
> > remaining
> > > count
> > >  - ... some time later....
> > >  - acme.com: 150 hits -> still too small, adding to remaining count
> > >
> > > Problem: we added 250 hits to remaining, but actually we had to add
> only
> > > 150 hits. We have to subtract previous count and it means we need to
> keep
> > > them all somewhere. That's where we hope access to KV store can help.
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Sat, Apr 7, 2018 at 10:11 PM, Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> > >>>> ok, question then - is it possible to use state store with
> > .aggregate()?
> > >>
> > >> Not sure what you exactly mean by this. An aggregations always uses a
> > >> store; it's a stateful operation and cannot be computed without a
> store.
> > >>
> > >> For TopN, if you get the hit-count as input, you can use a
> > >> `.aggregate()` operator that uses an array or list out output -- this
> > >> list contains the topN and each time, aggregate() is called, you check
> > >> if the new count replaces and existing count in the array/list.
> > >>
> > >>
> > >> -Matthias
> > >>
> > >> On 4/6/18 10:36 AM, Dmitriy Vsekhvalnov wrote:
> > >>> Thanks guys,
> > >>>
> > >>> ok, question then - is it possible to use state store with
> > .aggregate()?
> > >>>
> > >>> Here are some details on counting, we basically looking for TopN +
> > >>> Remaining calculation.
> > >>>
> > >>> Example:
> > >>>
> > >>> - incoming data: api url -> hit count
> > >>>
> > >>> - we want output: Top 20 urls per each domain per hour + remaining
> > count
> > >>> per domain (e.g. sum of all other urls hits that do not belong to top
> > 10
> > >>> per each domain per hour).
> > >>>
> > >>> With some grouping variations.
> > >>>
> > >>> Make some sense? Always open for better ideas :)
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> On Fri, Apr 6, 2018 at 8:21 PM, Guozhang Wang <wangg...@gmail.com>
> > >> wrote:
> > >>>
> > >>>> Hello Dmitriy,
> > >>>>
> > >>>> You can "simulate" an lower-level processor API by 1) adding the
> > stores
> > >> you
> > >>>> need via the builder#addStore(); 2) do a manual "through" call after
> > >>>> "selectKey" (the selected key will be the same as your original
> > groupBy
> > >>>> call), and then from the repartitioned stream add the `transform()`
> > >>>> operator to do manual windowed counting.
> > >>>>
> > >>>> But before you really go into this route, I'd first like to validate
> > if
> > >> the
> > >>>> provided `Aggregate`, `Initialize` functions really cannot meet your
> > >>>> "overcomplicated
> > >>>> version of record counting", could you elaborate a bit more on this
> > >> logic
> > >>>> so maybe we can still around it around with the pure high-level DSL?
> > >>>>
> > >>>>
> > >>>> Guozhang
> > >>>>
> > >>>>
> > >>>> On Fri, Apr 6, 2018 at 8:49 AM, Dmitriy Vsekhvalnov <
> > >>>> dvsekhval...@gmail.com>
> > >>>> wrote:
> > >>>>
> > >>>>> Hey, good day everyone,
> > >>>>>
> > >>>>> another kafka-streams friday question.
> > >>>>>
> > >>>>> We hit the wall with DSL implementation and would like to try
> > low-level
> > >>>>> Processor API.
> > >>>>>
> > >>>>> What we looking for is to:
> > >>>>>   - repartition incoming source stream via grouping records by some
> > >>>> fields
> > >>>>> + windowed (hourly, daily, e.t.c).
> > >>>>>    - and then apply custom Processor on grouped data.
> > >>>>>
> > >>>>> Processor gonna do some overcomplicated version of record counting
> > and
> > >>>> need
> > >>>>> persistent KV state store access.
> > >>>>>
> > >>>>> The issue - neither KGroupedStream nor TimeWindowedKStream provides
> > api
> > >>>> to
> > >>>>> hook processor into topology.
> > >>>>>
> > >>>>> Just to show some code:
> > >>>>>
> > >>>>> in.groupBy((key, value) -> .....)
> > >>>>>    .windowedBy(Hourly)
> > >>>>>    .transform(Processor) // Missing this one?
> > >>>>>
> > >>>>>
> > >>>>> What our options to combine both? We were thinking that we can
> > >>>> re-implement
> > >>>>> grouping with low-level API after investigating source code, but
> > looks
> > >>>> like
> > >>>>> overkill.
> > >>>>>
> > >>>>> Thank you.
> > >>>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>> --
> > >>>> -- Guozhang
> > >>>>
> > >>>
> > >>
> > >>
> > >
> >
> >
>

Reply via email to