The example I gave was just for illustration. I have impression logs and notification logs. Notification logs are essentially tied to impressions served. An impression would serve multiple items.
I was just trying to aggregate across a single line item, this means I am always generating a single key all the time and since data is streaming, my counter should keep on increasing. What I saw was that while on Machine1, the counter was 100 , another machine it was at 1. I saw it as inconsistent. -Sameer. On Fri, Jun 16, 2017 at 10:47 PM, Matthias J. Sax <matth...@confluent.io> wrote: > I just double checked you example code from an email before. There you > are using: > > stream.flatMap(...) > .groupBy((k, v) -> k, Serdes.String(), Serdes.Integer()) > .reduce((value1, value2) -> value1 + value2, > > In you last email, you say that you want to count on category that is > contained in you value and your key is something different. > > If you want to count by category, you need to set the category as key in > you .groupBy() -- in you code snipped, you actually don't change the key > (seems to be the root cause of the issue you see). > > Does this help? > > > -Matthias > > > On 6/15/17 11:48 PM, Sameer Kumar wrote: > > Ok.. Let me try explain it again. > > > > So, Lets say my source processor has a different key, now the value that > it > > contains lets say contains an identifier type: which basically denotes > > category and I am counting on different categories. A specific case would > > be I do a filter and outputs only a specific category: lets say > cat1...Now, > > starting from source processor which is being processed across multiple > > nodes, cat1 would be written across multiple nodes, since state store is > > local..it will be starting from 1 on each of the nodes. > > > > Does it clarifies the doubt. I think after a processor the same gets > > written to a intermediate Kafka topic and this should be taken care of > but > > this happens, I am not sure on it. > > > > Processor 1 > > key1, value:{xyz....cat1} > > key2, value:{xyz....cat2} > > key3, value:{xyz....cat3} > > > > Processor 2 > > Machine1 > > key: cat1, 1 > > > > Machine2 > > key:cat1,1 > > > > > > -Sameer. > > > > On Thu, Jun 15, 2017 at 10:27 PM, Eno Thereska <eno.there...@gmail.com> > > wrote: > > > >> I'm not sure if I fully understand this but let me check: > >> > >> - if you start 2 instances, one instance will process half of the > >> partitions, the other instance will process the other half > >> - for any given key, like key 100, it will only be processed on one of > the > >> instances, not both. > >> > >> Does this help? > >> > >> Eno > >> > >> > >> > >>> On 15 Jun 2017, at 07:40, Sameer Kumar <sam.kum.w...@gmail.com> wrote: > >>> > >>> Also, I am writing a single key in the output all the time. I believe > >>> machine2 will have to write a key and since a state store is local it > >>> wouldn't know about the counter state on another machine. So, I guess > >> this > >>> will happen. > >>> > >>> -Sameer. > >>> > >>> On Thu, Jun 15, 2017 at 11:11 AM, Sameer Kumar <sam.kum.w...@gmail.com > > > >>> wrote: > >>> > >>>> The input topic contains 60 partitions and data is distributed well > >> across > >>>> different partitions on different keys. While consumption, I am doing > >> some > >>>> filtering and writing only single key data. > >>>> > >>>> Output would be something of the form:- Machine 1 > >>>> > >>>> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount > >>>> k=P:LIS:1236667:2017_06_13:I,v=651 > >>>> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount > >>>> k=P:LIS:1236667:2017_06_13:I,v=652 > >>>> > >>>> Machine 2 > >>>> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount > >>>> k=P:LIS:1236667:2017_06_13:I,v=1 > >>>> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount > >>>> k=P:LIS:1236667:2017_06_13:I,v=2 > >>>> > >>>> I am sharing a snippet of code, > >>>> > >>>> private KTable<String, Integer> extractLICount(KStream< > >> Windowed<String>, > >>>> AdLog> joinedImprLogs) { > >>>> KTable<String, Integer> liCount = joinedImprLogs.flatMap((key, > value) > >>>> -> { > >>>> List<KeyValue<String, Integer>> l = new ArrayList<>(); > >>>> if (value == null) { > >>>> return l; > >>>> } > >>>> String date = new SimpleDateFormat("yyyy_MM_dd").format(new > >>>> Date(key.window().end())); > >>>> // Lineitemids > >>>> if (value != null && value.getAdLogType() == 3) { > >>>> // log.info("Invalid data: " + value); > >>>> return l; > >>>> } > >>>> if (value.getAdLogType() == 2) { > >>>> long lineitemid = value.getAdClickLog().getItmClmbLId(); > >>>> if (lineitemid == TARGETED_LI) { > >>>> String liKey = String.format("P:LIS:%s:%s:C", lineitemid, > >> date); > >>>> l.add(new KeyValue<>(liKey, 1)); > >>>> } > >>>> return l; > >>>> } else if (value.getAdLogType() == 1){ > >>>> > >>>> long[] lineitemids = value.getAdImprLog().getItmClmbLIds(); > >>>> if (value.getAdImprLog().isVisible()) { > >>>> for (int i = 0; i < lineitemids.length; i++) { > >>>> long li = lineitemids[i]; > >>>> if (li == TARGETED_LI) { > >>>> // log.info("valid impression ids= " + > >>>> value.getAdImprLog().toString()); > >>>> String liKey = String.format("P:LIS:%s:%s:I", li, date); > >>>> l.add(new KeyValue<>(liKey, 1)); > >>>> } > >>>> } > >>>> } > >>>> return l; > >>>> } > >>>> return l; > >>>> }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer()) > >>>> .reduce((value1, value2) -> value1 + value2, > >>>> LINE_ITEM_COUNT_STORE); > >>>> return liCount; > >>>> } > >>>> > >>>> On Wed, Jun 14, 2017 at 10:55 AM, Sameer Kumar < > sam.kum.w...@gmail.com> > >>>> wrote: > >>>> > >>>>> The input topic contains 60 partitions and data is distributed well > >>>>> across different partitions on different keys. While consumption, I > am > >>>>> doing some filtering and writing only single key data. > >>>>> > >>>>> Output would be something of the form:- Machine 1 > >>>>> > >>>>> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount > >>>>> k=P:LIS:1236667:2017_06_13:I,v=651 > >>>>> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount > >>>>> k=P:LIS:1236667:2017_06_13:I,v=652 > >>>>> > >>>>> Machine 2 > >>>>> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount > >>>>> k=P:LIS:1236667:2017_06_13:I,v=1 > >>>>> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount > >>>>> k=P:LIS:1236667:2017_06_13:I,v=2 > >>>>> > >>>>> I am sharing a snippet of code, > >>>>> > >>>>> private KTable<String, Integer> extractLICount(KStream< > >> Windowed<String>, > >>>>> AdLog> joinedImprLogs) { > >>>>> KTable<String, Integer> liCount = joinedImprLogs.flatMap((key, > >> value) > >>>>> -> { > >>>>> List<KeyValue<String, Integer>> l = new ArrayList<>(); > >>>>> if (value == null) { > >>>>> return l; > >>>>> } > >>>>> String date = new SimpleDateFormat("yyyy_MM_dd").format(new > >>>>> Date(key.window().end())); > >>>>> // Lineitemids > >>>>> if (value != null && value.getAdLogType() == 3) { > >>>>> // log.info("Invalid data: " + value); > >>>>> return l; > >>>>> } > >>>>> if (value.getAdLogType() == 2) { > >>>>> long lineitemid = value.getAdClickLog().getItmClmbLId(); > >>>>> if (lineitemid == TARGETED_LI) { > >>>>> String liKey = String.format("P:LIS:%s:%s:C", lineitemid, > >> date); > >>>>> l.add(new KeyValue<>(liKey, 1)); > >>>>> } > >>>>> return l; > >>>>> } else if (value.getAdLogType() == 1){ > >>>>> > >>>>> long[] lineitemids = value.getAdImprLog().getItmClmbLIds(); > >>>>> if (value.getAdImprLog().isVisible()) { > >>>>> for (int i = 0; i < lineitemids.length; i++) { > >>>>> long li = lineitemids[i]; > >>>>> if (li == TARGETED_LI) { > >>>>> // log.info("valid impression ids= " + > >>>>> value.getAdImprLog().toString()); > >>>>> String liKey = String.format("P:LIS:%s:%s:I", li, date); > >>>>> l.add(new KeyValue<>(liKey, 1)); > >>>>> } > >>>>> } > >>>>> } > >>>>> return l; > >>>>> } > >>>>> return l; > >>>>> }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer()) > >>>>> .reduce((value1, value2) -> value1 + value2, > >>>>> LINE_ITEM_COUNT_STORE); > >>>>> return liCount; > >>>>> } > >>>>> > >>>>> -Sameer. > >>>>> > >>>>> On Tue, Jun 13, 2017 at 10:35 PM, Matthias J. Sax < > >> matth...@confluent.io> > >>>>> wrote: > >>>>> > >>>>>> Sameer, > >>>>>> > >>>>>> if you write a single key, all your input data should be in a single > >>>>>> partition. As Streams scales out via partitions, you cannot have a > >>>>>> second instance as data from one partition is never split. Thus, all > >>>>>> data will go to one instance while the second instance should be > idle. > >>>>>> > >>>>>> So what I don't understand is, why you see that machine 2 output a > >>>>>> counter of 1 -- it should not output anything. Maybe you can give > some > >>>>>> more details about your setup? > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> On 6/13/17 5:00 AM, Sameer Kumar wrote: > >>>>>>> Hi, > >>>>>>> > >>>>>>> I witnessed a strange behaviour in KafkaStreams, need help in > >>>>>> understanding > >>>>>>> the same. > >>>>>>> > >>>>>>> I created an application for aggregating clicks per user, I want to > >>>>>> process > >>>>>>> it only for 1 user( i was writing only a single key). > >>>>>>> When I ran application on one machine, it was running fine.Now, to > >>>>>>> loadbalance it , I started another node. > >>>>>>> > >>>>>>> My expectation was that the counter should be in sync, i.e. if > output > >>>>>> from > >>>>>>> one machine is key, 100 and another machine should read this and > the > >>>>>> value > >>>>>>> should be key, 101. > >>>>>>> But, that didnt happened. Instead, on machine 2, the counter > started > >>>>>> with 1. > >>>>>>> > >>>>>>> > >>>>>>> Regards, > >>>>>>> -Sameer. > >>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>> > >> > >> > > > >