Ok.. Let me try explain it again. So, Lets say my source processor has a different key, now the value that it contains lets say contains an identifier type: which basically denotes category and I am counting on different categories. A specific case would be I do a filter and outputs only a specific category: lets say cat1...Now, starting from source processor which is being processed across multiple nodes, cat1 would be written across multiple nodes, since state store is local..it will be starting from 1 on each of the nodes.
Does it clarifies the doubt. I think after a processor the same gets written to a intermediate Kafka topic and this should be taken care of but this happens, I am not sure on it. Processor 1 key1, value:{xyz....cat1} key2, value:{xyz....cat2} key3, value:{xyz....cat3} Processor 2 Machine1 key: cat1, 1 Machine2 key:cat1,1 -Sameer. On Thu, Jun 15, 2017 at 10:27 PM, Eno Thereska <eno.there...@gmail.com> wrote: > I'm not sure if I fully understand this but let me check: > > - if you start 2 instances, one instance will process half of the > partitions, the other instance will process the other half > - for any given key, like key 100, it will only be processed on one of the > instances, not both. > > Does this help? > > Eno > > > > > On 15 Jun 2017, at 07:40, Sameer Kumar <sam.kum.w...@gmail.com> wrote: > > > > Also, I am writing a single key in the output all the time. I believe > > machine2 will have to write a key and since a state store is local it > > wouldn't know about the counter state on another machine. So, I guess > this > > will happen. > > > > -Sameer. > > > > On Thu, Jun 15, 2017 at 11:11 AM, Sameer Kumar <sam.kum.w...@gmail.com> > > wrote: > > > >> The input topic contains 60 partitions and data is distributed well > across > >> different partitions on different keys. While consumption, I am doing > some > >> filtering and writing only single key data. > >> > >> Output would be something of the form:- Machine 1 > >> > >> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount > >> k=P:LIS:1236667:2017_06_13:I,v=651 > >> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount > >> k=P:LIS:1236667:2017_06_13:I,v=652 > >> > >> Machine 2 > >> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount > >> k=P:LIS:1236667:2017_06_13:I,v=1 > >> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount > >> k=P:LIS:1236667:2017_06_13:I,v=2 > >> > >> I am sharing a snippet of code, > >> > >> private KTable<String, Integer> extractLICount(KStream< > Windowed<String>, > >> AdLog> joinedImprLogs) { > >> KTable<String, Integer> liCount = joinedImprLogs.flatMap((key, value) > >> -> { > >> List<KeyValue<String, Integer>> l = new ArrayList<>(); > >> if (value == null) { > >> return l; > >> } > >> String date = new SimpleDateFormat("yyyy_MM_dd").format(new > >> Date(key.window().end())); > >> // Lineitemids > >> if (value != null && value.getAdLogType() == 3) { > >> // log.info("Invalid data: " + value); > >> return l; > >> } > >> if (value.getAdLogType() == 2) { > >> long lineitemid = value.getAdClickLog().getItmClmbLId(); > >> if (lineitemid == TARGETED_LI) { > >> String liKey = String.format("P:LIS:%s:%s:C", lineitemid, > date); > >> l.add(new KeyValue<>(liKey, 1)); > >> } > >> return l; > >> } else if (value.getAdLogType() == 1){ > >> > >> long[] lineitemids = value.getAdImprLog().getItmClmbLIds(); > >> if (value.getAdImprLog().isVisible()) { > >> for (int i = 0; i < lineitemids.length; i++) { > >> long li = lineitemids[i]; > >> if (li == TARGETED_LI) { > >> // log.info("valid impression ids= " + > >> value.getAdImprLog().toString()); > >> String liKey = String.format("P:LIS:%s:%s:I", li, date); > >> l.add(new KeyValue<>(liKey, 1)); > >> } > >> } > >> } > >> return l; > >> } > >> return l; > >> }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer()) > >> .reduce((value1, value2) -> value1 + value2, > >> LINE_ITEM_COUNT_STORE); > >> return liCount; > >> } > >> > >> On Wed, Jun 14, 2017 at 10:55 AM, Sameer Kumar <sam.kum.w...@gmail.com> > >> wrote: > >> > >>> The input topic contains 60 partitions and data is distributed well > >>> across different partitions on different keys. While consumption, I am > >>> doing some filtering and writing only single key data. > >>> > >>> Output would be something of the form:- Machine 1 > >>> > >>> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount > >>> k=P:LIS:1236667:2017_06_13:I,v=651 > >>> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount > >>> k=P:LIS:1236667:2017_06_13:I,v=652 > >>> > >>> Machine 2 > >>> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount > >>> k=P:LIS:1236667:2017_06_13:I,v=1 > >>> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount > >>> k=P:LIS:1236667:2017_06_13:I,v=2 > >>> > >>> I am sharing a snippet of code, > >>> > >>> private KTable<String, Integer> extractLICount(KStream< > Windowed<String>, > >>> AdLog> joinedImprLogs) { > >>> KTable<String, Integer> liCount = joinedImprLogs.flatMap((key, > value) > >>> -> { > >>> List<KeyValue<String, Integer>> l = new ArrayList<>(); > >>> if (value == null) { > >>> return l; > >>> } > >>> String date = new SimpleDateFormat("yyyy_MM_dd").format(new > >>> Date(key.window().end())); > >>> // Lineitemids > >>> if (value != null && value.getAdLogType() == 3) { > >>> // log.info("Invalid data: " + value); > >>> return l; > >>> } > >>> if (value.getAdLogType() == 2) { > >>> long lineitemid = value.getAdClickLog().getItmClmbLId(); > >>> if (lineitemid == TARGETED_LI) { > >>> String liKey = String.format("P:LIS:%s:%s:C", lineitemid, > date); > >>> l.add(new KeyValue<>(liKey, 1)); > >>> } > >>> return l; > >>> } else if (value.getAdLogType() == 1){ > >>> > >>> long[] lineitemids = value.getAdImprLog().getItmClmbLIds(); > >>> if (value.getAdImprLog().isVisible()) { > >>> for (int i = 0; i < lineitemids.length; i++) { > >>> long li = lineitemids[i]; > >>> if (li == TARGETED_LI) { > >>> // log.info("valid impression ids= " + > >>> value.getAdImprLog().toString()); > >>> String liKey = String.format("P:LIS:%s:%s:I", li, date); > >>> l.add(new KeyValue<>(liKey, 1)); > >>> } > >>> } > >>> } > >>> return l; > >>> } > >>> return l; > >>> }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer()) > >>> .reduce((value1, value2) -> value1 + value2, > >>> LINE_ITEM_COUNT_STORE); > >>> return liCount; > >>> } > >>> > >>> -Sameer. > >>> > >>> On Tue, Jun 13, 2017 at 10:35 PM, Matthias J. Sax < > matth...@confluent.io> > >>> wrote: > >>> > >>>> Sameer, > >>>> > >>>> if you write a single key, all your input data should be in a single > >>>> partition. As Streams scales out via partitions, you cannot have a > >>>> second instance as data from one partition is never split. Thus, all > >>>> data will go to one instance while the second instance should be idle. > >>>> > >>>> So what I don't understand is, why you see that machine 2 output a > >>>> counter of 1 -- it should not output anything. Maybe you can give some > >>>> more details about your setup? > >>>> > >>>> -Matthias > >>>> > >>>> On 6/13/17 5:00 AM, Sameer Kumar wrote: > >>>>> Hi, > >>>>> > >>>>> I witnessed a strange behaviour in KafkaStreams, need help in > >>>> understanding > >>>>> the same. > >>>>> > >>>>> I created an application for aggregating clicks per user, I want to > >>>> process > >>>>> it only for 1 user( i was writing only a single key). > >>>>> When I ran application on one machine, it was running fine.Now, to > >>>>> loadbalance it , I started another node. > >>>>> > >>>>> My expectation was that the counter should be in sync, i.e. if output > >>>> from > >>>>> one machine is key, 100 and another machine should read this and the > >>>> value > >>>>> should be key, 101. > >>>>> But, that didnt happened. Instead, on machine 2, the counter started > >>>> with 1. > >>>>> > >>>>> > >>>>> Regards, > >>>>> -Sameer. > >>>>> > >>>> > >>>> > >>> > >> > >