Thanks Michael,

unfortunately we currently only considering exact counting. But will take a
look to your example for sure.

On Tue, Apr 10, 2018 at 12:16 PM, Michael Noll <mich...@confluent.io> wrote:

> Also, if you want (or can tolerate) probabilistic counting, with the option
> to also do TopN in that manner, we also have an example that uses Count Min
> Sketch:
> https://github.com/confluentinc/kafka-streams-
> examples/blob/4.0.0-post/src/test/scala/io/confluent/examples/streams/
> ProbabilisticCountingScalaIntegrationTest.scala
>
> The link/branch above is the code variant for Kafka 1.0.
>
> The example implements a custom (fault-tolerant) state store backed by CMS,
> which is then used in a Transformer.  The Transformer is then plugged into
> the DSL for a mix-and-match setup of DSL and Processor API.
>
>
> On Mon, Apr 9, 2018 at 9:34 PM, Dmitriy Vsekhvalnov <
> dvsekhval...@gmail.com>
> wrote:
>
> > Thanks again,
> >
> > yeah we saw that example for sure :)
> >
> > Ok, gonna try low-level Transformer and see how it goes.
> >
> >
> > On Mon, Apr 9, 2018 at 9:17 PM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > For (1), no.
> > >
> > > If you want to do manual put/get you should use a Transformer and
> > > implement a custom operator.
> > >
> > > Btw: here is an example of TopN:
> > > https://github.com/confluentinc/kafka-streams-
> > > examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/
> > > TopArticlesExampleDriver.java
> > >
> > >
> > >
> > > -Matthias
> > >
> > > On 4/9/18 4:46 AM, Dmitriy Vsekhvalnov wrote:
> > > > Hi Matthias, thanks
> > > >
> > > > clarifications below:
> > > >
> > > > 1. .aggregate( () -> .. ,
> > > >                        (k, v, agg) -> {
> > > >                            //Can i access KV store here for manual
> > > put/get?
> > > >                       });
> > > >
> > > > 2. TopN is not hard, we using pretty much same approach you describe,
> > > just
> > > > with bounded priority queue.  The problematic part with 'remaining
> > > count' -
> > > > everything else not in topN records. It appeared to be quite complex
> in
> > > > streaming world (or we missed something). I'll try to illustrate,
> > > assuming
> > > > simplified event flow:
> > > >
> > > >  - acme.com: 100 hits  -> too small, not in TopN, we adding it to
> > > remaining
> > > > count
> > > >  - ... some time later....
> > > >  - acme.com: 150 hits -> still too small, adding to remaining count
> > > >
> > > > Problem: we added 250 hits to remaining, but actually we had to add
> > only
> > > > 150 hits. We have to subtract previous count and it means we need to
> > keep
> > > > them all somewhere. That's where we hope access to KV store can help.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Sat, Apr 7, 2018 at 10:11 PM, Matthias J. Sax <
> > matth...@confluent.io>
> > > > wrote:
> > > >
> > > >>>> ok, question then - is it possible to use state store with
> > > .aggregate()?
> > > >>
> > > >> Not sure what you exactly mean by this. An aggregations always uses
> a
> > > >> store; it's a stateful operation and cannot be computed without a
> > store.
> > > >>
> > > >> For TopN, if you get the hit-count as input, you can use a
> > > >> `.aggregate()` operator that uses an array or list out output --
> this
> > > >> list contains the topN and each time, aggregate() is called, you
> check
> > > >> if the new count replaces and existing count in the array/list.
> > > >>
> > > >>
> > > >> -Matthias
> > > >>
> > > >> On 4/6/18 10:36 AM, Dmitriy Vsekhvalnov wrote:
> > > >>> Thanks guys,
> > > >>>
> > > >>> ok, question then - is it possible to use state store with
> > > .aggregate()?
> > > >>>
> > > >>> Here are some details on counting, we basically looking for TopN +
> > > >>> Remaining calculation.
> > > >>>
> > > >>> Example:
> > > >>>
> > > >>> - incoming data: api url -> hit count
> > > >>>
> > > >>> - we want output: Top 20 urls per each domain per hour + remaining
> > > count
> > > >>> per domain (e.g. sum of all other urls hits that do not belong to
> top
> > > 10
> > > >>> per each domain per hour).
> > > >>>
> > > >>> With some grouping variations.
> > > >>>
> > > >>> Make some sense? Always open for better ideas :)
> > > >>>
> > > >>>
> > > >>>
> > > >>>
> > > >>>
> > > >>>
> > > >>>
> > > >>> On Fri, Apr 6, 2018 at 8:21 PM, Guozhang Wang <wangg...@gmail.com>
> > > >> wrote:
> > > >>>
> > > >>>> Hello Dmitriy,
> > > >>>>
> > > >>>> You can "simulate" an lower-level processor API by 1) adding the
> > > stores
> > > >> you
> > > >>>> need via the builder#addStore(); 2) do a manual "through" call
> after
> > > >>>> "selectKey" (the selected key will be the same as your original
> > > groupBy
> > > >>>> call), and then from the repartitioned stream add the
> `transform()`
> > > >>>> operator to do manual windowed counting.
> > > >>>>
> > > >>>> But before you really go into this route, I'd first like to
> validate
> > > if
> > > >> the
> > > >>>> provided `Aggregate`, `Initialize` functions really cannot meet
> your
> > > >>>> "overcomplicated
> > > >>>> version of record counting", could you elaborate a bit more on
> this
> > > >> logic
> > > >>>> so maybe we can still around it around with the pure high-level
> DSL?
> > > >>>>
> > > >>>>
> > > >>>> Guozhang
> > > >>>>
> > > >>>>
> > > >>>> On Fri, Apr 6, 2018 at 8:49 AM, Dmitriy Vsekhvalnov <
> > > >>>> dvsekhval...@gmail.com>
> > > >>>> wrote:
> > > >>>>
> > > >>>>> Hey, good day everyone,
> > > >>>>>
> > > >>>>> another kafka-streams friday question.
> > > >>>>>
> > > >>>>> We hit the wall with DSL implementation and would like to try
> > > low-level
> > > >>>>> Processor API.
> > > >>>>>
> > > >>>>> What we looking for is to:
> > > >>>>>   - repartition incoming source stream via grouping records by
> some
> > > >>>> fields
> > > >>>>> + windowed (hourly, daily, e.t.c).
> > > >>>>>    - and then apply custom Processor on grouped data.
> > > >>>>>
> > > >>>>> Processor gonna do some overcomplicated version of record
> counting
> > > and
> > > >>>> need
> > > >>>>> persistent KV state store access.
> > > >>>>>
> > > >>>>> The issue - neither KGroupedStream nor TimeWindowedKStream
> provides
> > > api
> > > >>>> to
> > > >>>>> hook processor into topology.
> > > >>>>>
> > > >>>>> Just to show some code:
> > > >>>>>
> > > >>>>> in.groupBy((key, value) -> .....)
> > > >>>>>    .windowedBy(Hourly)
> > > >>>>>    .transform(Processor) // Missing this one?
> > > >>>>>
> > > >>>>>
> > > >>>>> What our options to combine both? We were thinking that we can
> > > >>>> re-implement
> > > >>>>> grouping with low-level API after investigating source code, but
> > > looks
> > > >>>> like
> > > >>>>> overkill.
> > > >>>>>
> > > >>>>> Thank you.
> > > >>>>>
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> --
> > > >>>> -- Guozhang
> > > >>>>
> > > >>>
> > > >>
> > > >>
> > > >
> > >
> > >
> >
>

Reply via email to