I'm not sure if I fully understand this but let me check: - if you start 2 instances, one instance will process half of the partitions, the other instance will process the other half - for any given key, like key 100, it will only be processed on one of the instances, not both.
Does this help? Eno > On 15 Jun 2017, at 07:40, Sameer Kumar <sam.kum.w...@gmail.com> wrote: > > Also, I am writing a single key in the output all the time. I believe > machine2 will have to write a key and since a state store is local it > wouldn't know about the counter state on another machine. So, I guess this > will happen. > > -Sameer. > > On Thu, Jun 15, 2017 at 11:11 AM, Sameer Kumar <sam.kum.w...@gmail.com> > wrote: > >> The input topic contains 60 partitions and data is distributed well across >> different partitions on different keys. While consumption, I am doing some >> filtering and writing only single key data. >> >> Output would be something of the form:- Machine 1 >> >> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount >> k=P:LIS:1236667:2017_06_13:I,v=651 >> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount >> k=P:LIS:1236667:2017_06_13:I,v=652 >> >> Machine 2 >> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount >> k=P:LIS:1236667:2017_06_13:I,v=1 >> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount >> k=P:LIS:1236667:2017_06_13:I,v=2 >> >> I am sharing a snippet of code, >> >> private KTable<String, Integer> extractLICount(KStream<Windowed<String>, >> AdLog> joinedImprLogs) { >> KTable<String, Integer> liCount = joinedImprLogs.flatMap((key, value) >> -> { >> List<KeyValue<String, Integer>> l = new ArrayList<>(); >> if (value == null) { >> return l; >> } >> String date = new SimpleDateFormat("yyyy_MM_dd").format(new >> Date(key.window().end())); >> // Lineitemids >> if (value != null && value.getAdLogType() == 3) { >> // log.info("Invalid data: " + value); >> return l; >> } >> if (value.getAdLogType() == 2) { >> long lineitemid = value.getAdClickLog().getItmClmbLId(); >> if (lineitemid == TARGETED_LI) { >> String liKey = String.format("P:LIS:%s:%s:C", lineitemid, date); >> l.add(new KeyValue<>(liKey, 1)); >> } >> return l; >> } else if (value.getAdLogType() == 1){ >> >> long[] lineitemids = value.getAdImprLog().getItmClmbLIds(); >> if (value.getAdImprLog().isVisible()) { >> for (int i = 0; i < lineitemids.length; i++) { >> long li = lineitemids[i]; >> if (li == TARGETED_LI) { >> // log.info("valid impression ids= " + >> value.getAdImprLog().toString()); >> String liKey = String.format("P:LIS:%s:%s:I", li, date); >> l.add(new KeyValue<>(liKey, 1)); >> } >> } >> } >> return l; >> } >> return l; >> }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer()) >> .reduce((value1, value2) -> value1 + value2, >> LINE_ITEM_COUNT_STORE); >> return liCount; >> } >> >> On Wed, Jun 14, 2017 at 10:55 AM, Sameer Kumar <sam.kum.w...@gmail.com> >> wrote: >> >>> The input topic contains 60 partitions and data is distributed well >>> across different partitions on different keys. While consumption, I am >>> doing some filtering and writing only single key data. >>> >>> Output would be something of the form:- Machine 1 >>> >>> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount >>> k=P:LIS:1236667:2017_06_13:I,v=651 >>> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount >>> k=P:LIS:1236667:2017_06_13:I,v=652 >>> >>> Machine 2 >>> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount >>> k=P:LIS:1236667:2017_06_13:I,v=1 >>> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount >>> k=P:LIS:1236667:2017_06_13:I,v=2 >>> >>> I am sharing a snippet of code, >>> >>> private KTable<String, Integer> extractLICount(KStream<Windowed<String>, >>> AdLog> joinedImprLogs) { >>> KTable<String, Integer> liCount = joinedImprLogs.flatMap((key, value) >>> -> { >>> List<KeyValue<String, Integer>> l = new ArrayList<>(); >>> if (value == null) { >>> return l; >>> } >>> String date = new SimpleDateFormat("yyyy_MM_dd").format(new >>> Date(key.window().end())); >>> // Lineitemids >>> if (value != null && value.getAdLogType() == 3) { >>> // log.info("Invalid data: " + value); >>> return l; >>> } >>> if (value.getAdLogType() == 2) { >>> long lineitemid = value.getAdClickLog().getItmClmbLId(); >>> if (lineitemid == TARGETED_LI) { >>> String liKey = String.format("P:LIS:%s:%s:C", lineitemid, date); >>> l.add(new KeyValue<>(liKey, 1)); >>> } >>> return l; >>> } else if (value.getAdLogType() == 1){ >>> >>> long[] lineitemids = value.getAdImprLog().getItmClmbLIds(); >>> if (value.getAdImprLog().isVisible()) { >>> for (int i = 0; i < lineitemids.length; i++) { >>> long li = lineitemids[i]; >>> if (li == TARGETED_LI) { >>> // log.info("valid impression ids= " + >>> value.getAdImprLog().toString()); >>> String liKey = String.format("P:LIS:%s:%s:I", li, date); >>> l.add(new KeyValue<>(liKey, 1)); >>> } >>> } >>> } >>> return l; >>> } >>> return l; >>> }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer()) >>> .reduce((value1, value2) -> value1 + value2, >>> LINE_ITEM_COUNT_STORE); >>> return liCount; >>> } >>> >>> -Sameer. >>> >>> On Tue, Jun 13, 2017 at 10:35 PM, Matthias J. Sax <matth...@confluent.io> >>> wrote: >>> >>>> Sameer, >>>> >>>> if you write a single key, all your input data should be in a single >>>> partition. As Streams scales out via partitions, you cannot have a >>>> second instance as data from one partition is never split. Thus, all >>>> data will go to one instance while the second instance should be idle. >>>> >>>> So what I don't understand is, why you see that machine 2 output a >>>> counter of 1 -- it should not output anything. Maybe you can give some >>>> more details about your setup? >>>> >>>> -Matthias >>>> >>>> On 6/13/17 5:00 AM, Sameer Kumar wrote: >>>>> Hi, >>>>> >>>>> I witnessed a strange behaviour in KafkaStreams, need help in >>>> understanding >>>>> the same. >>>>> >>>>> I created an application for aggregating clicks per user, I want to >>>> process >>>>> it only for 1 user( i was writing only a single key). >>>>> When I ran application on one machine, it was running fine.Now, to >>>>> loadbalance it , I started another node. >>>>> >>>>> My expectation was that the counter should be in sync, i.e. if output >>>> from >>>>> one machine is key, 100 and another machine should read this and the >>>> value >>>>> should be key, 101. >>>>> But, that didnt happened. Instead, on machine 2, the counter started >>>> with 1. >>>>> >>>>> >>>>> Regards, >>>>> -Sameer. >>>>> >>>> >>>> >>> >>