Continued from m last mail... The code snippet that I shared was after joining impression and notification logs. Here I am picking the line item and concatenating it with date. You can also see there is a check for a TARGETED_LINE_ITEM, I am not emitting the data otherwise.
-Sameer. On Sat, Jun 17, 2017 at 3:55 PM, Sameer Kumar <sam.kum.w...@gmail.com> wrote: > The example I gave was just for illustration. I have impression logs and > notification logs. Notification logs are essentially tied to impressions > served. An impression would serve multiple items. > > I was just trying to aggregate across a single line item, this means I am > always generating a single key all the time and since data is streaming, my > counter should keep on increasing. > > What I saw was that while on Machine1, the counter was 100 , another > machine it was at 1. I saw it as inconsistent. > > > -Sameer. > > On Fri, Jun 16, 2017 at 10:47 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> I just double checked you example code from an email before. There you >> are using: >> >> stream.flatMap(...) >> .groupBy((k, v) -> k, Serdes.String(), Serdes.Integer()) >> .reduce((value1, value2) -> value1 + value2, >> >> In you last email, you say that you want to count on category that is >> contained in you value and your key is something different. >> >> If you want to count by category, you need to set the category as key in >> you .groupBy() -- in you code snipped, you actually don't change the key >> (seems to be the root cause of the issue you see). >> >> Does this help? >> >> >> -Matthias >> >> >> On 6/15/17 11:48 PM, Sameer Kumar wrote: >> > Ok.. Let me try explain it again. >> > >> > So, Lets say my source processor has a different key, now the value >> that it >> > contains lets say contains an identifier type: which basically denotes >> > category and I am counting on different categories. A specific case >> would >> > be I do a filter and outputs only a specific category: lets say >> cat1...Now, >> > starting from source processor which is being processed across multiple >> > nodes, cat1 would be written across multiple nodes, since state store is >> > local..it will be starting from 1 on each of the nodes. >> > >> > Does it clarifies the doubt. I think after a processor the same gets >> > written to a intermediate Kafka topic and this should be taken care of >> but >> > this happens, I am not sure on it. >> > >> > Processor 1 >> > key1, value:{xyz....cat1} >> > key2, value:{xyz....cat2} >> > key3, value:{xyz....cat3} >> > >> > Processor 2 >> > Machine1 >> > key: cat1, 1 >> > >> > Machine2 >> > key:cat1,1 >> > >> > >> > -Sameer. >> > >> > On Thu, Jun 15, 2017 at 10:27 PM, Eno Thereska <eno.there...@gmail.com> >> > wrote: >> > >> >> I'm not sure if I fully understand this but let me check: >> >> >> >> - if you start 2 instances, one instance will process half of the >> >> partitions, the other instance will process the other half >> >> - for any given key, like key 100, it will only be processed on one of >> the >> >> instances, not both. >> >> >> >> Does this help? >> >> >> >> Eno >> >> >> >> >> >> >> >>> On 15 Jun 2017, at 07:40, Sameer Kumar <sam.kum.w...@gmail.com> >> wrote: >> >>> >> >>> Also, I am writing a single key in the output all the time. I believe >> >>> machine2 will have to write a key and since a state store is local it >> >>> wouldn't know about the counter state on another machine. So, I guess >> >> this >> >>> will happen. >> >>> >> >>> -Sameer. >> >>> >> >>> On Thu, Jun 15, 2017 at 11:11 AM, Sameer Kumar < >> sam.kum.w...@gmail.com> >> >>> wrote: >> >>> >> >>>> The input topic contains 60 partitions and data is distributed well >> >> across >> >>>> different partitions on different keys. While consumption, I am doing >> >> some >> >>>> filtering and writing only single key data. >> >>>> >> >>>> Output would be something of the form:- Machine 1 >> >>>> >> >>>> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount >> >>>> k=P:LIS:1236667:2017_06_13:I,v=651 >> >>>> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount >> >>>> k=P:LIS:1236667:2017_06_13:I,v=652 >> >>>> >> >>>> Machine 2 >> >>>> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount >> >>>> k=P:LIS:1236667:2017_06_13:I,v=1 >> >>>> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount >> >>>> k=P:LIS:1236667:2017_06_13:I,v=2 >> >>>> >> >>>> I am sharing a snippet of code, >> >>>> >> >>>> private KTable<String, Integer> extractLICount(KStream< >> >> Windowed<String>, >> >>>> AdLog> joinedImprLogs) { >> >>>> KTable<String, Integer> liCount = joinedImprLogs.flatMap((key, >> value) >> >>>> -> { >> >>>> List<KeyValue<String, Integer>> l = new ArrayList<>(); >> >>>> if (value == null) { >> >>>> return l; >> >>>> } >> >>>> String date = new SimpleDateFormat("yyyy_MM_dd").format(new >> >>>> Date(key.window().end())); >> >>>> // Lineitemids >> >>>> if (value != null && value.getAdLogType() == 3) { >> >>>> // log.info("Invalid data: " + value); >> >>>> return l; >> >>>> } >> >>>> if (value.getAdLogType() == 2) { >> >>>> long lineitemid = value.getAdClickLog().getItmClmbLId(); >> >>>> if (lineitemid == TARGETED_LI) { >> >>>> String liKey = String.format("P:LIS:%s:%s:C", lineitemid, >> >> date); >> >>>> l.add(new KeyValue<>(liKey, 1)); >> >>>> } >> >>>> return l; >> >>>> } else if (value.getAdLogType() == 1){ >> >>>> >> >>>> long[] lineitemids = value.getAdImprLog().getItmClmbLIds(); >> >>>> if (value.getAdImprLog().isVisible()) { >> >>>> for (int i = 0; i < lineitemids.length; i++) { >> >>>> long li = lineitemids[i]; >> >>>> if (li == TARGETED_LI) { >> >>>> // log.info("valid impression ids= " + >> >>>> value.getAdImprLog().toString()); >> >>>> String liKey = String.format("P:LIS:%s:%s:I", li, date); >> >>>> l.add(new KeyValue<>(liKey, 1)); >> >>>> } >> >>>> } >> >>>> } >> >>>> return l; >> >>>> } >> >>>> return l; >> >>>> }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer()) >> >>>> .reduce((value1, value2) -> value1 + value2, >> >>>> LINE_ITEM_COUNT_STORE); >> >>>> return liCount; >> >>>> } >> >>>> >> >>>> On Wed, Jun 14, 2017 at 10:55 AM, Sameer Kumar < >> sam.kum.w...@gmail.com> >> >>>> wrote: >> >>>> >> >>>>> The input topic contains 60 partitions and data is distributed well >> >>>>> across different partitions on different keys. While consumption, I >> am >> >>>>> doing some filtering and writing only single key data. >> >>>>> >> >>>>> Output would be something of the form:- Machine 1 >> >>>>> >> >>>>> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount >> >>>>> k=P:LIS:1236667:2017_06_13:I,v=651 >> >>>>> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount >> >>>>> k=P:LIS:1236667:2017_06_13:I,v=652 >> >>>>> >> >>>>> Machine 2 >> >>>>> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount >> >>>>> k=P:LIS:1236667:2017_06_13:I,v=1 >> >>>>> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount >> >>>>> k=P:LIS:1236667:2017_06_13:I,v=2 >> >>>>> >> >>>>> I am sharing a snippet of code, >> >>>>> >> >>>>> private KTable<String, Integer> extractLICount(KStream< >> >> Windowed<String>, >> >>>>> AdLog> joinedImprLogs) { >> >>>>> KTable<String, Integer> liCount = joinedImprLogs.flatMap((key, >> >> value) >> >>>>> -> { >> >>>>> List<KeyValue<String, Integer>> l = new ArrayList<>(); >> >>>>> if (value == null) { >> >>>>> return l; >> >>>>> } >> >>>>> String date = new SimpleDateFormat("yyyy_MM_dd").format(new >> >>>>> Date(key.window().end())); >> >>>>> // Lineitemids >> >>>>> if (value != null && value.getAdLogType() == 3) { >> >>>>> // log.info("Invalid data: " + value); >> >>>>> return l; >> >>>>> } >> >>>>> if (value.getAdLogType() == 2) { >> >>>>> long lineitemid = value.getAdClickLog().getItmClmbLId(); >> >>>>> if (lineitemid == TARGETED_LI) { >> >>>>> String liKey = String.format("P:LIS:%s:%s:C", lineitemid, >> >> date); >> >>>>> l.add(new KeyValue<>(liKey, 1)); >> >>>>> } >> >>>>> return l; >> >>>>> } else if (value.getAdLogType() == 1){ >> >>>>> >> >>>>> long[] lineitemids = value.getAdImprLog().getItmClmbLIds(); >> >>>>> if (value.getAdImprLog().isVisible()) { >> >>>>> for (int i = 0; i < lineitemids.length; i++) { >> >>>>> long li = lineitemids[i]; >> >>>>> if (li == TARGETED_LI) { >> >>>>> // log.info("valid impression ids= " + >> >>>>> value.getAdImprLog().toString()); >> >>>>> String liKey = String.format("P:LIS:%s:%s:I", li, >> date); >> >>>>> l.add(new KeyValue<>(liKey, 1)); >> >>>>> } >> >>>>> } >> >>>>> } >> >>>>> return l; >> >>>>> } >> >>>>> return l; >> >>>>> }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer()) >> >>>>> .reduce((value1, value2) -> value1 + value2, >> >>>>> LINE_ITEM_COUNT_STORE); >> >>>>> return liCount; >> >>>>> } >> >>>>> >> >>>>> -Sameer. >> >>>>> >> >>>>> On Tue, Jun 13, 2017 at 10:35 PM, Matthias J. Sax < >> >> matth...@confluent.io> >> >>>>> wrote: >> >>>>> >> >>>>>> Sameer, >> >>>>>> >> >>>>>> if you write a single key, all your input data should be in a >> single >> >>>>>> partition. As Streams scales out via partitions, you cannot have a >> >>>>>> second instance as data from one partition is never split. Thus, >> all >> >>>>>> data will go to one instance while the second instance should be >> idle. >> >>>>>> >> >>>>>> So what I don't understand is, why you see that machine 2 output a >> >>>>>> counter of 1 -- it should not output anything. Maybe you can give >> some >> >>>>>> more details about your setup? >> >>>>>> >> >>>>>> -Matthias >> >>>>>> >> >>>>>> On 6/13/17 5:00 AM, Sameer Kumar wrote: >> >>>>>>> Hi, >> >>>>>>> >> >>>>>>> I witnessed a strange behaviour in KafkaStreams, need help in >> >>>>>> understanding >> >>>>>>> the same. >> >>>>>>> >> >>>>>>> I created an application for aggregating clicks per user, I want >> to >> >>>>>> process >> >>>>>>> it only for 1 user( i was writing only a single key). >> >>>>>>> When I ran application on one machine, it was running fine.Now, to >> >>>>>>> loadbalance it , I started another node. >> >>>>>>> >> >>>>>>> My expectation was that the counter should be in sync, i.e. if >> output >> >>>>>> from >> >>>>>>> one machine is key, 100 and another machine should read this and >> the >> >>>>>> value >> >>>>>>> should be key, 101. >> >>>>>>> But, that didnt happened. Instead, on machine 2, the counter >> started >> >>>>>> with 1. >> >>>>>>> >> >>>>>>> >> >>>>>>> Regards, >> >>>>>>> -Sameer. >> >>>>>>> >> >>>>>> >> >>>>>> >> >>>>> >> >>>> >> >> >> >> >> > >> >> >