You can look into KStreamReduceProcessor#process() and KStreamAggregateProcessor#process() This should help to see the input data for each instance.
count()/aggregate() will use AggregateProcessor while reduce() will use ReduceProcessor Hope this helps. -Matthias On 6/20/17 10:54 PM, Sameer Kumar wrote: > Hi Matthias, > > > I am working on 10.2.1. and I do see same outputs on same keys albeit not > always and a bit random. > > I shall try to code a simpler example free from domain level code and share > it with you in a while. > > I am willing to debug this further as well, if you could tell me the > classes that I should be looking into. > > -Sameer. > > On Tue, Jun 20, 2017 at 3:51 AM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> Hi Sameer, >> >> With regard to >> >>>>> What I saw was that while on Machine1, the counter was 100 , another >>>>> machine it was at 1. I saw it as inconsistent. >> >> If you really see the same key on different machines, that would be >> incorrect. All record with the same key, must be processed by the same >> machine. Thus, there not not be two counters, but only one. >> >> I am wondering what version of Kafka Streams you are using? Version >> 0.10.0.x does not have auto-repartitioning feature, and this might >> explain what you see. For this case, you need to call #through() before >> doing the aggregation. >> >> >> -Matthias >> >> >> On 6/17/17 3:28 AM, Sameer Kumar wrote: >>> Continued from m last mail... >>> >>> The code snippet that I shared was after joining impression and >>> notification logs. Here I am picking the line item and concatenating it >>> with date. You can also see there is a check for a TARGETED_LINE_ITEM, I >> am >>> not emitting the data otherwise. >>> >>> -Sameer. >>> >>> On Sat, Jun 17, 2017 at 3:55 PM, Sameer Kumar <sam.kum.w...@gmail.com> >>> wrote: >>> >>>> The example I gave was just for illustration. I have impression logs and >>>> notification logs. Notification logs are essentially tied to impressions >>>> served. An impression would serve multiple items. >>>> >>>> I was just trying to aggregate across a single line item, this means I >> am >>>> always generating a single key all the time and since data is >> streaming, my >>>> counter should keep on increasing. >>>> >>>> What I saw was that while on Machine1, the counter was 100 , another >>>> machine it was at 1. I saw it as inconsistent. >>>> >>>> >>>> -Sameer. >>>> >>>> On Fri, Jun 16, 2017 at 10:47 PM, Matthias J. Sax < >> matth...@confluent.io> >>>> wrote: >>>> >>>>> I just double checked you example code from an email before. There you >>>>> are using: >>>>> >>>>> stream.flatMap(...) >>>>> .groupBy((k, v) -> k, Serdes.String(), Serdes.Integer()) >>>>> .reduce((value1, value2) -> value1 + value2, >>>>> >>>>> In you last email, you say that you want to count on category that is >>>>> contained in you value and your key is something different. >>>>> >>>>> If you want to count by category, you need to set the category as key >> in >>>>> you .groupBy() -- in you code snipped, you actually don't change the >> key >>>>> (seems to be the root cause of the issue you see). >>>>> >>>>> Does this help? >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> On 6/15/17 11:48 PM, Sameer Kumar wrote: >>>>>> Ok.. Let me try explain it again. >>>>>> >>>>>> So, Lets say my source processor has a different key, now the value >>>>> that it >>>>>> contains lets say contains an identifier type: which basically denotes >>>>>> category and I am counting on different categories. A specific case >>>>> would >>>>>> be I do a filter and outputs only a specific category: lets say >>>>> cat1...Now, >>>>>> starting from source processor which is being processed across >> multiple >>>>>> nodes, cat1 would be written across multiple nodes, since state store >> is >>>>>> local..it will be starting from 1 on each of the nodes. >>>>>> >>>>>> Does it clarifies the doubt. I think after a processor the same gets >>>>>> written to a intermediate Kafka topic and this should be taken care of >>>>> but >>>>>> this happens, I am not sure on it. >>>>>> >>>>>> Processor 1 >>>>>> key1, value:{xyz....cat1} >>>>>> key2, value:{xyz....cat2} >>>>>> key3, value:{xyz....cat3} >>>>>> >>>>>> Processor 2 >>>>>> Machine1 >>>>>> key: cat1, 1 >>>>>> >>>>>> Machine2 >>>>>> key:cat1,1 >>>>>> >>>>>> >>>>>> -Sameer. >>>>>> >>>>>> On Thu, Jun 15, 2017 at 10:27 PM, Eno Thereska < >> eno.there...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> I'm not sure if I fully understand this but let me check: >>>>>>> >>>>>>> - if you start 2 instances, one instance will process half of the >>>>>>> partitions, the other instance will process the other half >>>>>>> - for any given key, like key 100, it will only be processed on one >> of >>>>> the >>>>>>> instances, not both. >>>>>>> >>>>>>> Does this help? >>>>>>> >>>>>>> Eno >>>>>>> >>>>>>> >>>>>>> >>>>>>>> On 15 Jun 2017, at 07:40, Sameer Kumar <sam.kum.w...@gmail.com> >>>>> wrote: >>>>>>>> >>>>>>>> Also, I am writing a single key in the output all the time. I >> believe >>>>>>>> machine2 will have to write a key and since a state store is local >> it >>>>>>>> wouldn't know about the counter state on another machine. So, I >> guess >>>>>>> this >>>>>>>> will happen. >>>>>>>> >>>>>>>> -Sameer. >>>>>>>> >>>>>>>> On Thu, Jun 15, 2017 at 11:11 AM, Sameer Kumar < >>>>> sam.kum.w...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> The input topic contains 60 partitions and data is distributed well >>>>>>> across >>>>>>>>> different partitions on different keys. While consumption, I am >> doing >>>>>>> some >>>>>>>>> filtering and writing only single key data. >>>>>>>>> >>>>>>>>> Output would be something of the form:- Machine 1 >>>>>>>>> >>>>>>>>> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount >>>>>>>>> k=P:LIS:1236667:2017_06_13:I,v=651 >>>>>>>>> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount >>>>>>>>> k=P:LIS:1236667:2017_06_13:I,v=652 >>>>>>>>> >>>>>>>>> Machine 2 >>>>>>>>> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount >>>>>>>>> k=P:LIS:1236667:2017_06_13:I,v=1 >>>>>>>>> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount >>>>>>>>> k=P:LIS:1236667:2017_06_13:I,v=2 >>>>>>>>> >>>>>>>>> I am sharing a snippet of code, >>>>>>>>> >>>>>>>>> private KTable<String, Integer> extractLICount(KStream< >>>>>>> Windowed<String>, >>>>>>>>> AdLog> joinedImprLogs) { >>>>>>>>> KTable<String, Integer> liCount = joinedImprLogs.flatMap((key, >>>>> value) >>>>>>>>> -> { >>>>>>>>> List<KeyValue<String, Integer>> l = new ArrayList<>(); >>>>>>>>> if (value == null) { >>>>>>>>> return l; >>>>>>>>> } >>>>>>>>> String date = new SimpleDateFormat("yyyy_MM_dd").format(new >>>>>>>>> Date(key.window().end())); >>>>>>>>> // Lineitemids >>>>>>>>> if (value != null && value.getAdLogType() == 3) { >>>>>>>>> // log.info("Invalid data: " + value); >>>>>>>>> return l; >>>>>>>>> } >>>>>>>>> if (value.getAdLogType() == 2) { >>>>>>>>> long lineitemid = value.getAdClickLog().getItmClmbLId(); >>>>>>>>> if (lineitemid == TARGETED_LI) { >>>>>>>>> String liKey = String.format("P:LIS:%s:%s:C", lineitemid, >>>>>>> date); >>>>>>>>> l.add(new KeyValue<>(liKey, 1)); >>>>>>>>> } >>>>>>>>> return l; >>>>>>>>> } else if (value.getAdLogType() == 1){ >>>>>>>>> >>>>>>>>> long[] lineitemids = value.getAdImprLog().getItmClmbLIds(); >>>>>>>>> if (value.getAdImprLog().isVisible()) { >>>>>>>>> for (int i = 0; i < lineitemids.length; i++) { >>>>>>>>> long li = lineitemids[i]; >>>>>>>>> if (li == TARGETED_LI) { >>>>>>>>> // log.info("valid impression ids= " + >>>>>>>>> value.getAdImprLog().toString()); >>>>>>>>> String liKey = String.format("P:LIS:%s:%s:I", li, >> date); >>>>>>>>> l.add(new KeyValue<>(liKey, 1)); >>>>>>>>> } >>>>>>>>> } >>>>>>>>> } >>>>>>>>> return l; >>>>>>>>> } >>>>>>>>> return l; >>>>>>>>> }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer()) >>>>>>>>> .reduce((value1, value2) -> value1 + value2, >>>>>>>>> LINE_ITEM_COUNT_STORE); >>>>>>>>> return liCount; >>>>>>>>> } >>>>>>>>> >>>>>>>>> On Wed, Jun 14, 2017 at 10:55 AM, Sameer Kumar < >>>>> sam.kum.w...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> The input topic contains 60 partitions and data is distributed >> well >>>>>>>>>> across different partitions on different keys. While consumption, >> I >>>>> am >>>>>>>>>> doing some filtering and writing only single key data. >>>>>>>>>> >>>>>>>>>> Output would be something of the form:- Machine 1 >>>>>>>>>> >>>>>>>>>> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount >>>>>>>>>> k=P:LIS:1236667:2017_06_13:I,v=651 >>>>>>>>>> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount >>>>>>>>>> k=P:LIS:1236667:2017_06_13:I,v=652 >>>>>>>>>> >>>>>>>>>> Machine 2 >>>>>>>>>> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount >>>>>>>>>> k=P:LIS:1236667:2017_06_13:I,v=1 >>>>>>>>>> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount >>>>>>>>>> k=P:LIS:1236667:2017_06_13:I,v=2 >>>>>>>>>> >>>>>>>>>> I am sharing a snippet of code, >>>>>>>>>> >>>>>>>>>> private KTable<String, Integer> extractLICount(KStream< >>>>>>> Windowed<String>, >>>>>>>>>> AdLog> joinedImprLogs) { >>>>>>>>>> KTable<String, Integer> liCount = joinedImprLogs.flatMap((key, >>>>>>> value) >>>>>>>>>> -> { >>>>>>>>>> List<KeyValue<String, Integer>> l = new ArrayList<>(); >>>>>>>>>> if (value == null) { >>>>>>>>>> return l; >>>>>>>>>> } >>>>>>>>>> String date = new SimpleDateFormat("yyyy_MM_dd").format(new >>>>>>>>>> Date(key.window().end())); >>>>>>>>>> // Lineitemids >>>>>>>>>> if (value != null && value.getAdLogType() == 3) { >>>>>>>>>> // log.info("Invalid data: " + value); >>>>>>>>>> return l; >>>>>>>>>> } >>>>>>>>>> if (value.getAdLogType() == 2) { >>>>>>>>>> long lineitemid = value.getAdClickLog().getItmClmbLId(); >>>>>>>>>> if (lineitemid == TARGETED_LI) { >>>>>>>>>> String liKey = String.format("P:LIS:%s:%s:C", lineitemid, >>>>>>> date); >>>>>>>>>> l.add(new KeyValue<>(liKey, 1)); >>>>>>>>>> } >>>>>>>>>> return l; >>>>>>>>>> } else if (value.getAdLogType() == 1){ >>>>>>>>>> >>>>>>>>>> long[] lineitemids = value.getAdImprLog(). >> getItmClmbLIds(); >>>>>>>>>> if (value.getAdImprLog().isVisible()) { >>>>>>>>>> for (int i = 0; i < lineitemids.length; i++) { >>>>>>>>>> long li = lineitemids[i]; >>>>>>>>>> if (li == TARGETED_LI) { >>>>>>>>>> // log.info("valid impression ids= " + >>>>>>>>>> value.getAdImprLog().toString()); >>>>>>>>>> String liKey = String.format("P:LIS:%s:%s:I", li, >>>>> date); >>>>>>>>>> l.add(new KeyValue<>(liKey, 1)); >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> return l; >>>>>>>>>> } >>>>>>>>>> return l; >>>>>>>>>> }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer()) >>>>>>>>>> .reduce((value1, value2) -> value1 + value2, >>>>>>>>>> LINE_ITEM_COUNT_STORE); >>>>>>>>>> return liCount; >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> -Sameer. >>>>>>>>>> >>>>>>>>>> On Tue, Jun 13, 2017 at 10:35 PM, Matthias J. Sax < >>>>>>> matth...@confluent.io> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Sameer, >>>>>>>>>>> >>>>>>>>>>> if you write a single key, all your input data should be in a >>>>> single >>>>>>>>>>> partition. As Streams scales out via partitions, you cannot have >> a >>>>>>>>>>> second instance as data from one partition is never split. Thus, >>>>> all >>>>>>>>>>> data will go to one instance while the second instance should be >>>>> idle. >>>>>>>>>>> >>>>>>>>>>> So what I don't understand is, why you see that machine 2 output >> a >>>>>>>>>>> counter of 1 -- it should not output anything. Maybe you can give >>>>> some >>>>>>>>>>> more details about your setup? >>>>>>>>>>> >>>>>>>>>>> -Matthias >>>>>>>>>>> >>>>>>>>>>> On 6/13/17 5:00 AM, Sameer Kumar wrote: >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> I witnessed a strange behaviour in KafkaStreams, need help in >>>>>>>>>>> understanding >>>>>>>>>>>> the same. >>>>>>>>>>>> >>>>>>>>>>>> I created an application for aggregating clicks per user, I want >>>>> to >>>>>>>>>>> process >>>>>>>>>>>> it only for 1 user( i was writing only a single key). >>>>>>>>>>>> When I ran application on one machine, it was running fine.Now, >> to >>>>>>>>>>>> loadbalance it , I started another node. >>>>>>>>>>>> >>>>>>>>>>>> My expectation was that the counter should be in sync, i.e. if >>>>> output >>>>>>>>>>> from >>>>>>>>>>>> one machine is key, 100 and another machine should read this and >>>>> the >>>>>>>>>>> value >>>>>>>>>>>> should be key, 101. >>>>>>>>>>>> But, that didnt happened. Instead, on machine 2, the counter >>>>> started >>>>>>>>>>> with 1. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Regards, >>>>>>>>>>>> -Sameer. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature