Thanks again, yeah we saw that example for sure :)
Ok, gonna try low-level Transformer and see how it goes. On Mon, Apr 9, 2018 at 9:17 PM, Matthias J. Sax <matth...@confluent.io> wrote: > For (1), no. > > If you want to do manual put/get you should use a Transformer and > implement a custom operator. > > Btw: here is an example of TopN: > https://github.com/confluentinc/kafka-streams- > examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/ > TopArticlesExampleDriver.java > > > > -Matthias > > On 4/9/18 4:46 AM, Dmitriy Vsekhvalnov wrote: > > Hi Matthias, thanks > > > > clarifications below: > > > > 1. .aggregate( () -> .. , > > (k, v, agg) -> { > > //Can i access KV store here for manual > put/get? > > }); > > > > 2. TopN is not hard, we using pretty much same approach you describe, > just > > with bounded priority queue. The problematic part with 'remaining > count' - > > everything else not in topN records. It appeared to be quite complex in > > streaming world (or we missed something). I'll try to illustrate, > assuming > > simplified event flow: > > > > - acme.com: 100 hits -> too small, not in TopN, we adding it to > remaining > > count > > - ... some time later.... > > - acme.com: 150 hits -> still too small, adding to remaining count > > > > Problem: we added 250 hits to remaining, but actually we had to add only > > 150 hits. We have to subtract previous count and it means we need to keep > > them all somewhere. That's where we hope access to KV store can help. > > > > > > > > > > > > > > > > > > > > > > On Sat, Apr 7, 2018 at 10:11 PM, Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >>>> ok, question then - is it possible to use state store with > .aggregate()? > >> > >> Not sure what you exactly mean by this. An aggregations always uses a > >> store; it's a stateful operation and cannot be computed without a store. > >> > >> For TopN, if you get the hit-count as input, you can use a > >> `.aggregate()` operator that uses an array or list out output -- this > >> list contains the topN and each time, aggregate() is called, you check > >> if the new count replaces and existing count in the array/list. > >> > >> > >> -Matthias > >> > >> On 4/6/18 10:36 AM, Dmitriy Vsekhvalnov wrote: > >>> Thanks guys, > >>> > >>> ok, question then - is it possible to use state store with > .aggregate()? > >>> > >>> Here are some details on counting, we basically looking for TopN + > >>> Remaining calculation. > >>> > >>> Example: > >>> > >>> - incoming data: api url -> hit count > >>> > >>> - we want output: Top 20 urls per each domain per hour + remaining > count > >>> per domain (e.g. sum of all other urls hits that do not belong to top > 10 > >>> per each domain per hour). > >>> > >>> With some grouping variations. > >>> > >>> Make some sense? Always open for better ideas :) > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> On Fri, Apr 6, 2018 at 8:21 PM, Guozhang Wang <wangg...@gmail.com> > >> wrote: > >>> > >>>> Hello Dmitriy, > >>>> > >>>> You can "simulate" an lower-level processor API by 1) adding the > stores > >> you > >>>> need via the builder#addStore(); 2) do a manual "through" call after > >>>> "selectKey" (the selected key will be the same as your original > groupBy > >>>> call), and then from the repartitioned stream add the `transform()` > >>>> operator to do manual windowed counting. > >>>> > >>>> But before you really go into this route, I'd first like to validate > if > >> the > >>>> provided `Aggregate`, `Initialize` functions really cannot meet your > >>>> "overcomplicated > >>>> version of record counting", could you elaborate a bit more on this > >> logic > >>>> so maybe we can still around it around with the pure high-level DSL? > >>>> > >>>> > >>>> Guozhang > >>>> > >>>> > >>>> On Fri, Apr 6, 2018 at 8:49 AM, Dmitriy Vsekhvalnov < > >>>> dvsekhval...@gmail.com> > >>>> wrote: > >>>> > >>>>> Hey, good day everyone, > >>>>> > >>>>> another kafka-streams friday question. > >>>>> > >>>>> We hit the wall with DSL implementation and would like to try > low-level > >>>>> Processor API. > >>>>> > >>>>> What we looking for is to: > >>>>> - repartition incoming source stream via grouping records by some > >>>> fields > >>>>> + windowed (hourly, daily, e.t.c). > >>>>> - and then apply custom Processor on grouped data. > >>>>> > >>>>> Processor gonna do some overcomplicated version of record counting > and > >>>> need > >>>>> persistent KV state store access. > >>>>> > >>>>> The issue - neither KGroupedStream nor TimeWindowedKStream provides > api > >>>> to > >>>>> hook processor into topology. > >>>>> > >>>>> Just to show some code: > >>>>> > >>>>> in.groupBy((key, value) -> .....) > >>>>> .windowedBy(Hourly) > >>>>> .transform(Processor) // Missing this one? > >>>>> > >>>>> > >>>>> What our options to combine both? We were thinking that we can > >>>> re-implement > >>>>> grouping with low-level API after investigating source code, but > looks > >>>> like > >>>>> overkill. > >>>>> > >>>>> Thank you. > >>>>> > >>>> > >>>> > >>>> > >>>> -- > >>>> -- Guozhang > >>>> > >>> > >> > >> > > > >