Hi Matthias,
I am working on 10.2.1. and I do see same outputs on same keys albeit not always and a bit random. I shall try to code a simpler example free from domain level code and share it with you in a while. I am willing to debug this further as well, if you could tell me the classes that I should be looking into. -Sameer. On Tue, Jun 20, 2017 at 3:51 AM, Matthias J. Sax <matth...@confluent.io> wrote: > Hi Sameer, > > With regard to > > >>> What I saw was that while on Machine1, the counter was 100 , another > >>> machine it was at 1. I saw it as inconsistent. > > If you really see the same key on different machines, that would be > incorrect. All record with the same key, must be processed by the same > machine. Thus, there not not be two counters, but only one. > > I am wondering what version of Kafka Streams you are using? Version > 0.10.0.x does not have auto-repartitioning feature, and this might > explain what you see. For this case, you need to call #through() before > doing the aggregation. > > > -Matthias > > > On 6/17/17 3:28 AM, Sameer Kumar wrote: > > Continued from m last mail... > > > > The code snippet that I shared was after joining impression and > > notification logs. Here I am picking the line item and concatenating it > > with date. You can also see there is a check for a TARGETED_LINE_ITEM, I > am > > not emitting the data otherwise. > > > > -Sameer. > > > > On Sat, Jun 17, 2017 at 3:55 PM, Sameer Kumar <sam.kum.w...@gmail.com> > > wrote: > > > >> The example I gave was just for illustration. I have impression logs and > >> notification logs. Notification logs are essentially tied to impressions > >> served. An impression would serve multiple items. > >> > >> I was just trying to aggregate across a single line item, this means I > am > >> always generating a single key all the time and since data is > streaming, my > >> counter should keep on increasing. > >> > >> What I saw was that while on Machine1, the counter was 100 , another > >> machine it was at 1. I saw it as inconsistent. > >> > >> > >> -Sameer. > >> > >> On Fri, Jun 16, 2017 at 10:47 PM, Matthias J. Sax < > matth...@confluent.io> > >> wrote: > >> > >>> I just double checked you example code from an email before. There you > >>> are using: > >>> > >>> stream.flatMap(...) > >>> .groupBy((k, v) -> k, Serdes.String(), Serdes.Integer()) > >>> .reduce((value1, value2) -> value1 + value2, > >>> > >>> In you last email, you say that you want to count on category that is > >>> contained in you value and your key is something different. > >>> > >>> If you want to count by category, you need to set the category as key > in > >>> you .groupBy() -- in you code snipped, you actually don't change the > key > >>> (seems to be the root cause of the issue you see). > >>> > >>> Does this help? > >>> > >>> > >>> -Matthias > >>> > >>> > >>> On 6/15/17 11:48 PM, Sameer Kumar wrote: > >>>> Ok.. Let me try explain it again. > >>>> > >>>> So, Lets say my source processor has a different key, now the value > >>> that it > >>>> contains lets say contains an identifier type: which basically denotes > >>>> category and I am counting on different categories. A specific case > >>> would > >>>> be I do a filter and outputs only a specific category: lets say > >>> cat1...Now, > >>>> starting from source processor which is being processed across > multiple > >>>> nodes, cat1 would be written across multiple nodes, since state store > is > >>>> local..it will be starting from 1 on each of the nodes. > >>>> > >>>> Does it clarifies the doubt. I think after a processor the same gets > >>>> written to a intermediate Kafka topic and this should be taken care of > >>> but > >>>> this happens, I am not sure on it. > >>>> > >>>> Processor 1 > >>>> key1, value:{xyz....cat1} > >>>> key2, value:{xyz....cat2} > >>>> key3, value:{xyz....cat3} > >>>> > >>>> Processor 2 > >>>> Machine1 > >>>> key: cat1, 1 > >>>> > >>>> Machine2 > >>>> key:cat1,1 > >>>> > >>>> > >>>> -Sameer. > >>>> > >>>> On Thu, Jun 15, 2017 at 10:27 PM, Eno Thereska < > eno.there...@gmail.com> > >>>> wrote: > >>>> > >>>>> I'm not sure if I fully understand this but let me check: > >>>>> > >>>>> - if you start 2 instances, one instance will process half of the > >>>>> partitions, the other instance will process the other half > >>>>> - for any given key, like key 100, it will only be processed on one > of > >>> the > >>>>> instances, not both. > >>>>> > >>>>> Does this help? > >>>>> > >>>>> Eno > >>>>> > >>>>> > >>>>> > >>>>>> On 15 Jun 2017, at 07:40, Sameer Kumar <sam.kum.w...@gmail.com> > >>> wrote: > >>>>>> > >>>>>> Also, I am writing a single key in the output all the time. I > believe > >>>>>> machine2 will have to write a key and since a state store is local > it > >>>>>> wouldn't know about the counter state on another machine. So, I > guess > >>>>> this > >>>>>> will happen. > >>>>>> > >>>>>> -Sameer. > >>>>>> > >>>>>> On Thu, Jun 15, 2017 at 11:11 AM, Sameer Kumar < > >>> sam.kum.w...@gmail.com> > >>>>>> wrote: > >>>>>> > >>>>>>> The input topic contains 60 partitions and data is distributed well > >>>>> across > >>>>>>> different partitions on different keys. While consumption, I am > doing > >>>>> some > >>>>>>> filtering and writing only single key data. > >>>>>>> > >>>>>>> Output would be something of the form:- Machine 1 > >>>>>>> > >>>>>>> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount > >>>>>>> k=P:LIS:1236667:2017_06_13:I,v=651 > >>>>>>> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount > >>>>>>> k=P:LIS:1236667:2017_06_13:I,v=652 > >>>>>>> > >>>>>>> Machine 2 > >>>>>>> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount > >>>>>>> k=P:LIS:1236667:2017_06_13:I,v=1 > >>>>>>> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount > >>>>>>> k=P:LIS:1236667:2017_06_13:I,v=2 > >>>>>>> > >>>>>>> I am sharing a snippet of code, > >>>>>>> > >>>>>>> private KTable<String, Integer> extractLICount(KStream< > >>>>> Windowed<String>, > >>>>>>> AdLog> joinedImprLogs) { > >>>>>>> KTable<String, Integer> liCount = joinedImprLogs.flatMap((key, > >>> value) > >>>>>>> -> { > >>>>>>> List<KeyValue<String, Integer>> l = new ArrayList<>(); > >>>>>>> if (value == null) { > >>>>>>> return l; > >>>>>>> } > >>>>>>> String date = new SimpleDateFormat("yyyy_MM_dd").format(new > >>>>>>> Date(key.window().end())); > >>>>>>> // Lineitemids > >>>>>>> if (value != null && value.getAdLogType() == 3) { > >>>>>>> // log.info("Invalid data: " + value); > >>>>>>> return l; > >>>>>>> } > >>>>>>> if (value.getAdLogType() == 2) { > >>>>>>> long lineitemid = value.getAdClickLog().getItmClmbLId(); > >>>>>>> if (lineitemid == TARGETED_LI) { > >>>>>>> String liKey = String.format("P:LIS:%s:%s:C", lineitemid, > >>>>> date); > >>>>>>> l.add(new KeyValue<>(liKey, 1)); > >>>>>>> } > >>>>>>> return l; > >>>>>>> } else if (value.getAdLogType() == 1){ > >>>>>>> > >>>>>>> long[] lineitemids = value.getAdImprLog().getItmClmbLIds(); > >>>>>>> if (value.getAdImprLog().isVisible()) { > >>>>>>> for (int i = 0; i < lineitemids.length; i++) { > >>>>>>> long li = lineitemids[i]; > >>>>>>> if (li == TARGETED_LI) { > >>>>>>> // log.info("valid impression ids= " + > >>>>>>> value.getAdImprLog().toString()); > >>>>>>> String liKey = String.format("P:LIS:%s:%s:I", li, > date); > >>>>>>> l.add(new KeyValue<>(liKey, 1)); > >>>>>>> } > >>>>>>> } > >>>>>>> } > >>>>>>> return l; > >>>>>>> } > >>>>>>> return l; > >>>>>>> }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer()) > >>>>>>> .reduce((value1, value2) -> value1 + value2, > >>>>>>> LINE_ITEM_COUNT_STORE); > >>>>>>> return liCount; > >>>>>>> } > >>>>>>> > >>>>>>> On Wed, Jun 14, 2017 at 10:55 AM, Sameer Kumar < > >>> sam.kum.w...@gmail.com> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> The input topic contains 60 partitions and data is distributed > well > >>>>>>>> across different partitions on different keys. While consumption, > I > >>> am > >>>>>>>> doing some filtering and writing only single key data. > >>>>>>>> > >>>>>>>> Output would be something of the form:- Machine 1 > >>>>>>>> > >>>>>>>> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount > >>>>>>>> k=P:LIS:1236667:2017_06_13:I,v=651 > >>>>>>>> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount > >>>>>>>> k=P:LIS:1236667:2017_06_13:I,v=652 > >>>>>>>> > >>>>>>>> Machine 2 > >>>>>>>> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount > >>>>>>>> k=P:LIS:1236667:2017_06_13:I,v=1 > >>>>>>>> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount > >>>>>>>> k=P:LIS:1236667:2017_06_13:I,v=2 > >>>>>>>> > >>>>>>>> I am sharing a snippet of code, > >>>>>>>> > >>>>>>>> private KTable<String, Integer> extractLICount(KStream< > >>>>> Windowed<String>, > >>>>>>>> AdLog> joinedImprLogs) { > >>>>>>>> KTable<String, Integer> liCount = joinedImprLogs.flatMap((key, > >>>>> value) > >>>>>>>> -> { > >>>>>>>> List<KeyValue<String, Integer>> l = new ArrayList<>(); > >>>>>>>> if (value == null) { > >>>>>>>> return l; > >>>>>>>> } > >>>>>>>> String date = new SimpleDateFormat("yyyy_MM_dd").format(new > >>>>>>>> Date(key.window().end())); > >>>>>>>> // Lineitemids > >>>>>>>> if (value != null && value.getAdLogType() == 3) { > >>>>>>>> // log.info("Invalid data: " + value); > >>>>>>>> return l; > >>>>>>>> } > >>>>>>>> if (value.getAdLogType() == 2) { > >>>>>>>> long lineitemid = value.getAdClickLog().getItmClmbLId(); > >>>>>>>> if (lineitemid == TARGETED_LI) { > >>>>>>>> String liKey = String.format("P:LIS:%s:%s:C", lineitemid, > >>>>> date); > >>>>>>>> l.add(new KeyValue<>(liKey, 1)); > >>>>>>>> } > >>>>>>>> return l; > >>>>>>>> } else if (value.getAdLogType() == 1){ > >>>>>>>> > >>>>>>>> long[] lineitemids = value.getAdImprLog(). > getItmClmbLIds(); > >>>>>>>> if (value.getAdImprLog().isVisible()) { > >>>>>>>> for (int i = 0; i < lineitemids.length; i++) { > >>>>>>>> long li = lineitemids[i]; > >>>>>>>> if (li == TARGETED_LI) { > >>>>>>>> // log.info("valid impression ids= " + > >>>>>>>> value.getAdImprLog().toString()); > >>>>>>>> String liKey = String.format("P:LIS:%s:%s:I", li, > >>> date); > >>>>>>>> l.add(new KeyValue<>(liKey, 1)); > >>>>>>>> } > >>>>>>>> } > >>>>>>>> } > >>>>>>>> return l; > >>>>>>>> } > >>>>>>>> return l; > >>>>>>>> }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer()) > >>>>>>>> .reduce((value1, value2) -> value1 + value2, > >>>>>>>> LINE_ITEM_COUNT_STORE); > >>>>>>>> return liCount; > >>>>>>>> } > >>>>>>>> > >>>>>>>> -Sameer. > >>>>>>>> > >>>>>>>> On Tue, Jun 13, 2017 at 10:35 PM, Matthias J. Sax < > >>>>> matth...@confluent.io> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Sameer, > >>>>>>>>> > >>>>>>>>> if you write a single key, all your input data should be in a > >>> single > >>>>>>>>> partition. As Streams scales out via partitions, you cannot have > a > >>>>>>>>> second instance as data from one partition is never split. Thus, > >>> all > >>>>>>>>> data will go to one instance while the second instance should be > >>> idle. > >>>>>>>>> > >>>>>>>>> So what I don't understand is, why you see that machine 2 output > a > >>>>>>>>> counter of 1 -- it should not output anything. Maybe you can give > >>> some > >>>>>>>>> more details about your setup? > >>>>>>>>> > >>>>>>>>> -Matthias > >>>>>>>>> > >>>>>>>>> On 6/13/17 5:00 AM, Sameer Kumar wrote: > >>>>>>>>>> Hi, > >>>>>>>>>> > >>>>>>>>>> I witnessed a strange behaviour in KafkaStreams, need help in > >>>>>>>>> understanding > >>>>>>>>>> the same. > >>>>>>>>>> > >>>>>>>>>> I created an application for aggregating clicks per user, I want > >>> to > >>>>>>>>> process > >>>>>>>>>> it only for 1 user( i was writing only a single key). > >>>>>>>>>> When I ran application on one machine, it was running fine.Now, > to > >>>>>>>>>> loadbalance it , I started another node. > >>>>>>>>>> > >>>>>>>>>> My expectation was that the counter should be in sync, i.e. if > >>> output > >>>>>>>>> from > >>>>>>>>>> one machine is key, 100 and another machine should read this and > >>> the > >>>>>>>>> value > >>>>>>>>>> should be key, 101. > >>>>>>>>>> But, that didnt happened. Instead, on machine 2, the counter > >>> started > >>>>>>>>> with 1. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Regards, > >>>>>>>>>> -Sameer. > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>>>> > >>>> > >>> > >>> > >> > > > >