Also, if you want (or can tolerate) probabilistic counting, with the option to also do TopN in that manner, we also have an example that uses Count Min Sketch: https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/test/scala/io/confluent/examples/streams/ProbabilisticCountingScalaIntegrationTest.scala
The link/branch above is the code variant for Kafka 1.0. The example implements a custom (fault-tolerant) state store backed by CMS, which is then used in a Transformer. The Transformer is then plugged into the DSL for a mix-and-match setup of DSL and Processor API. On Mon, Apr 9, 2018 at 9:34 PM, Dmitriy Vsekhvalnov <dvsekhval...@gmail.com> wrote: > Thanks again, > > yeah we saw that example for sure :) > > Ok, gonna try low-level Transformer and see how it goes. > > > On Mon, Apr 9, 2018 at 9:17 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > > > For (1), no. > > > > If you want to do manual put/get you should use a Transformer and > > implement a custom operator. > > > > Btw: here is an example of TopN: > > https://github.com/confluentinc/kafka-streams- > > examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/ > > TopArticlesExampleDriver.java > > > > > > > > -Matthias > > > > On 4/9/18 4:46 AM, Dmitriy Vsekhvalnov wrote: > > > Hi Matthias, thanks > > > > > > clarifications below: > > > > > > 1. .aggregate( () -> .. , > > > (k, v, agg) -> { > > > //Can i access KV store here for manual > > put/get? > > > }); > > > > > > 2. TopN is not hard, we using pretty much same approach you describe, > > just > > > with bounded priority queue. The problematic part with 'remaining > > count' - > > > everything else not in topN records. It appeared to be quite complex in > > > streaming world (or we missed something). I'll try to illustrate, > > assuming > > > simplified event flow: > > > > > > - acme.com: 100 hits -> too small, not in TopN, we adding it to > > remaining > > > count > > > - ... some time later.... > > > - acme.com: 150 hits -> still too small, adding to remaining count > > > > > > Problem: we added 250 hits to remaining, but actually we had to add > only > > > 150 hits. We have to subtract previous count and it means we need to > keep > > > them all somewhere. That's where we hope access to KV store can help. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sat, Apr 7, 2018 at 10:11 PM, Matthias J. Sax < > matth...@confluent.io> > > > wrote: > > > > > >>>> ok, question then - is it possible to use state store with > > .aggregate()? > > >> > > >> Not sure what you exactly mean by this. An aggregations always uses a > > >> store; it's a stateful operation and cannot be computed without a > store. > > >> > > >> For TopN, if you get the hit-count as input, you can use a > > >> `.aggregate()` operator that uses an array or list out output -- this > > >> list contains the topN and each time, aggregate() is called, you check > > >> if the new count replaces and existing count in the array/list. > > >> > > >> > > >> -Matthias > > >> > > >> On 4/6/18 10:36 AM, Dmitriy Vsekhvalnov wrote: > > >>> Thanks guys, > > >>> > > >>> ok, question then - is it possible to use state store with > > .aggregate()? > > >>> > > >>> Here are some details on counting, we basically looking for TopN + > > >>> Remaining calculation. > > >>> > > >>> Example: > > >>> > > >>> - incoming data: api url -> hit count > > >>> > > >>> - we want output: Top 20 urls per each domain per hour + remaining > > count > > >>> per domain (e.g. sum of all other urls hits that do not belong to top > > 10 > > >>> per each domain per hour). > > >>> > > >>> With some grouping variations. > > >>> > > >>> Make some sense? Always open for better ideas :) > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> On Fri, Apr 6, 2018 at 8:21 PM, Guozhang Wang <wangg...@gmail.com> > > >> wrote: > > >>> > > >>>> Hello Dmitriy, > > >>>> > > >>>> You can "simulate" an lower-level processor API by 1) adding the > > stores > > >> you > > >>>> need via the builder#addStore(); 2) do a manual "through" call after > > >>>> "selectKey" (the selected key will be the same as your original > > groupBy > > >>>> call), and then from the repartitioned stream add the `transform()` > > >>>> operator to do manual windowed counting. > > >>>> > > >>>> But before you really go into this route, I'd first like to validate > > if > > >> the > > >>>> provided `Aggregate`, `Initialize` functions really cannot meet your > > >>>> "overcomplicated > > >>>> version of record counting", could you elaborate a bit more on this > > >> logic > > >>>> so maybe we can still around it around with the pure high-level DSL? > > >>>> > > >>>> > > >>>> Guozhang > > >>>> > > >>>> > > >>>> On Fri, Apr 6, 2018 at 8:49 AM, Dmitriy Vsekhvalnov < > > >>>> dvsekhval...@gmail.com> > > >>>> wrote: > > >>>> > > >>>>> Hey, good day everyone, > > >>>>> > > >>>>> another kafka-streams friday question. > > >>>>> > > >>>>> We hit the wall with DSL implementation and would like to try > > low-level > > >>>>> Processor API. > > >>>>> > > >>>>> What we looking for is to: > > >>>>> - repartition incoming source stream via grouping records by some > > >>>> fields > > >>>>> + windowed (hourly, daily, e.t.c). > > >>>>> - and then apply custom Processor on grouped data. > > >>>>> > > >>>>> Processor gonna do some overcomplicated version of record counting > > and > > >>>> need > > >>>>> persistent KV state store access. > > >>>>> > > >>>>> The issue - neither KGroupedStream nor TimeWindowedKStream provides > > api > > >>>> to > > >>>>> hook processor into topology. > > >>>>> > > >>>>> Just to show some code: > > >>>>> > > >>>>> in.groupBy((key, value) -> .....) > > >>>>> .windowedBy(Hourly) > > >>>>> .transform(Processor) // Missing this one? > > >>>>> > > >>>>> > > >>>>> What our options to combine both? We were thinking that we can > > >>>> re-implement > > >>>>> grouping with low-level API after investigating source code, but > > looks > > >>>> like > > >>>>> overkill. > > >>>>> > > >>>>> Thank you. > > >>>>> > > >>>> > > >>>> > > >>>> > > >>>> -- > > >>>> -- Guozhang > > >>>> > > >>> > > >> > > >> > > > > > > > >