Thanks Michael, unfortunately we currently only considering exact counting. But will take a look to your example for sure.
On Tue, Apr 10, 2018 at 12:16 PM, Michael Noll <mich...@confluent.io> wrote: > Also, if you want (or can tolerate) probabilistic counting, with the option > to also do TopN in that manner, we also have an example that uses Count Min > Sketch: > https://github.com/confluentinc/kafka-streams- > examples/blob/4.0.0-post/src/test/scala/io/confluent/examples/streams/ > ProbabilisticCountingScalaIntegrationTest.scala > > The link/branch above is the code variant for Kafka 1.0. > > The example implements a custom (fault-tolerant) state store backed by CMS, > which is then used in a Transformer. The Transformer is then plugged into > the DSL for a mix-and-match setup of DSL and Processor API. > > > On Mon, Apr 9, 2018 at 9:34 PM, Dmitriy Vsekhvalnov < > dvsekhval...@gmail.com> > wrote: > > > Thanks again, > > > > yeah we saw that example for sure :) > > > > Ok, gonna try low-level Transformer and see how it goes. > > > > > > On Mon, Apr 9, 2018 at 9:17 PM, Matthias J. Sax <matth...@confluent.io> > > wrote: > > > > > For (1), no. > > > > > > If you want to do manual put/get you should use a Transformer and > > > implement a custom operator. > > > > > > Btw: here is an example of TopN: > > > https://github.com/confluentinc/kafka-streams- > > > examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/ > > > TopArticlesExampleDriver.java > > > > > > > > > > > > -Matthias > > > > > > On 4/9/18 4:46 AM, Dmitriy Vsekhvalnov wrote: > > > > Hi Matthias, thanks > > > > > > > > clarifications below: > > > > > > > > 1. .aggregate( () -> .. , > > > > (k, v, agg) -> { > > > > //Can i access KV store here for manual > > > put/get? > > > > }); > > > > > > > > 2. TopN is not hard, we using pretty much same approach you describe, > > > just > > > > with bounded priority queue. The problematic part with 'remaining > > > count' - > > > > everything else not in topN records. It appeared to be quite complex > in > > > > streaming world (or we missed something). I'll try to illustrate, > > > assuming > > > > simplified event flow: > > > > > > > > - acme.com: 100 hits -> too small, not in TopN, we adding it to > > > remaining > > > > count > > > > - ... some time later.... > > > > - acme.com: 150 hits -> still too small, adding to remaining count > > > > > > > > Problem: we added 250 hits to remaining, but actually we had to add > > only > > > > 150 hits. We have to subtract previous count and it means we need to > > keep > > > > them all somewhere. That's where we hope access to KV store can help. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sat, Apr 7, 2018 at 10:11 PM, Matthias J. Sax < > > matth...@confluent.io> > > > > wrote: > > > > > > > >>>> ok, question then - is it possible to use state store with > > > .aggregate()? > > > >> > > > >> Not sure what you exactly mean by this. An aggregations always uses > a > > > >> store; it's a stateful operation and cannot be computed without a > > store. > > > >> > > > >> For TopN, if you get the hit-count as input, you can use a > > > >> `.aggregate()` operator that uses an array or list out output -- > this > > > >> list contains the topN and each time, aggregate() is called, you > check > > > >> if the new count replaces and existing count in the array/list. > > > >> > > > >> > > > >> -Matthias > > > >> > > > >> On 4/6/18 10:36 AM, Dmitriy Vsekhvalnov wrote: > > > >>> Thanks guys, > > > >>> > > > >>> ok, question then - is it possible to use state store with > > > .aggregate()? > > > >>> > > > >>> Here are some details on counting, we basically looking for TopN + > > > >>> Remaining calculation. > > > >>> > > > >>> Example: > > > >>> > > > >>> - incoming data: api url -> hit count > > > >>> > > > >>> - we want output: Top 20 urls per each domain per hour + remaining > > > count > > > >>> per domain (e.g. sum of all other urls hits that do not belong to > top > > > 10 > > > >>> per each domain per hour). > > > >>> > > > >>> With some grouping variations. > > > >>> > > > >>> Make some sense? Always open for better ideas :) > > > >>> > > > >>> > > > >>> > > > >>> > > > >>> > > > >>> > > > >>> > > > >>> On Fri, Apr 6, 2018 at 8:21 PM, Guozhang Wang <wangg...@gmail.com> > > > >> wrote: > > > >>> > > > >>>> Hello Dmitriy, > > > >>>> > > > >>>> You can "simulate" an lower-level processor API by 1) adding the > > > stores > > > >> you > > > >>>> need via the builder#addStore(); 2) do a manual "through" call > after > > > >>>> "selectKey" (the selected key will be the same as your original > > > groupBy > > > >>>> call), and then from the repartitioned stream add the > `transform()` > > > >>>> operator to do manual windowed counting. > > > >>>> > > > >>>> But before you really go into this route, I'd first like to > validate > > > if > > > >> the > > > >>>> provided `Aggregate`, `Initialize` functions really cannot meet > your > > > >>>> "overcomplicated > > > >>>> version of record counting", could you elaborate a bit more on > this > > > >> logic > > > >>>> so maybe we can still around it around with the pure high-level > DSL? > > > >>>> > > > >>>> > > > >>>> Guozhang > > > >>>> > > > >>>> > > > >>>> On Fri, Apr 6, 2018 at 8:49 AM, Dmitriy Vsekhvalnov < > > > >>>> dvsekhval...@gmail.com> > > > >>>> wrote: > > > >>>> > > > >>>>> Hey, good day everyone, > > > >>>>> > > > >>>>> another kafka-streams friday question. > > > >>>>> > > > >>>>> We hit the wall with DSL implementation and would like to try > > > low-level > > > >>>>> Processor API. > > > >>>>> > > > >>>>> What we looking for is to: > > > >>>>> - repartition incoming source stream via grouping records by > some > > > >>>> fields > > > >>>>> + windowed (hourly, daily, e.t.c). > > > >>>>> - and then apply custom Processor on grouped data. > > > >>>>> > > > >>>>> Processor gonna do some overcomplicated version of record > counting > > > and > > > >>>> need > > > >>>>> persistent KV state store access. > > > >>>>> > > > >>>>> The issue - neither KGroupedStream nor TimeWindowedKStream > provides > > > api > > > >>>> to > > > >>>>> hook processor into topology. > > > >>>>> > > > >>>>> Just to show some code: > > > >>>>> > > > >>>>> in.groupBy((key, value) -> .....) > > > >>>>> .windowedBy(Hourly) > > > >>>>> .transform(Processor) // Missing this one? > > > >>>>> > > > >>>>> > > > >>>>> What our options to combine both? We were thinking that we can > > > >>>> re-implement > > > >>>>> grouping with low-level API after investigating source code, but > > > looks > > > >>>> like > > > >>>>> overkill. > > > >>>>> > > > >>>>> Thank you. > > > >>>>> > > > >>>> > > > >>>> > > > >>>> > > > >>>> -- > > > >>>> -- Guozhang > > > >>>> > > > >>> > > > >> > > > >> > > > > > > > > > > > > >