Also, I am writing a single key in the output all the time. I believe machine2 will have to write a key and since a state store is local it wouldn't know about the counter state on another machine. So, I guess this will happen.
-Sameer. On Thu, Jun 15, 2017 at 11:11 AM, Sameer Kumar <sam.kum.w...@gmail.com> wrote: > The input topic contains 60 partitions and data is distributed well across > different partitions on different keys. While consumption, I am doing some > filtering and writing only single key data. > > Output would be something of the form:- Machine 1 > > 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount > k=P:LIS:1236667:2017_06_13:I,v=651 > 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount > k=P:LIS:1236667:2017_06_13:I,v=652 > > Machine 2 > 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount > k=P:LIS:1236667:2017_06_13:I,v=1 > 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount > k=P:LIS:1236667:2017_06_13:I,v=2 > > I am sharing a snippet of code, > > private KTable<String, Integer> extractLICount(KStream<Windowed<String>, > AdLog> joinedImprLogs) { > KTable<String, Integer> liCount = joinedImprLogs.flatMap((key, value) > -> { > List<KeyValue<String, Integer>> l = new ArrayList<>(); > if (value == null) { > return l; > } > String date = new SimpleDateFormat("yyyy_MM_dd").format(new > Date(key.window().end())); > // Lineitemids > if (value != null && value.getAdLogType() == 3) { > // log.info("Invalid data: " + value); > return l; > } > if (value.getAdLogType() == 2) { > long lineitemid = value.getAdClickLog().getItmClmbLId(); > if (lineitemid == TARGETED_LI) { > String liKey = String.format("P:LIS:%s:%s:C", lineitemid, date); > l.add(new KeyValue<>(liKey, 1)); > } > return l; > } else if (value.getAdLogType() == 1){ > > long[] lineitemids = value.getAdImprLog().getItmClmbLIds(); > if (value.getAdImprLog().isVisible()) { > for (int i = 0; i < lineitemids.length; i++) { > long li = lineitemids[i]; > if (li == TARGETED_LI) { > // log.info("valid impression ids= " + > value.getAdImprLog().toString()); > String liKey = String.format("P:LIS:%s:%s:I", li, date); > l.add(new KeyValue<>(liKey, 1)); > } > } > } > return l; > } > return l; > }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer()) > .reduce((value1, value2) -> value1 + value2, > LINE_ITEM_COUNT_STORE); > return liCount; > } > > On Wed, Jun 14, 2017 at 10:55 AM, Sameer Kumar <sam.kum.w...@gmail.com> > wrote: > >> The input topic contains 60 partitions and data is distributed well >> across different partitions on different keys. While consumption, I am >> doing some filtering and writing only single key data. >> >> Output would be something of the form:- Machine 1 >> >> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount >> k=P:LIS:1236667:2017_06_13:I,v=651 >> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount >> k=P:LIS:1236667:2017_06_13:I,v=652 >> >> Machine 2 >> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount >> k=P:LIS:1236667:2017_06_13:I,v=1 >> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount >> k=P:LIS:1236667:2017_06_13:I,v=2 >> >> I am sharing a snippet of code, >> >> private KTable<String, Integer> extractLICount(KStream<Windowed<String>, >> AdLog> joinedImprLogs) { >> KTable<String, Integer> liCount = joinedImprLogs.flatMap((key, value) >> -> { >> List<KeyValue<String, Integer>> l = new ArrayList<>(); >> if (value == null) { >> return l; >> } >> String date = new SimpleDateFormat("yyyy_MM_dd").format(new >> Date(key.window().end())); >> // Lineitemids >> if (value != null && value.getAdLogType() == 3) { >> // log.info("Invalid data: " + value); >> return l; >> } >> if (value.getAdLogType() == 2) { >> long lineitemid = value.getAdClickLog().getItmClmbLId(); >> if (lineitemid == TARGETED_LI) { >> String liKey = String.format("P:LIS:%s:%s:C", lineitemid, date); >> l.add(new KeyValue<>(liKey, 1)); >> } >> return l; >> } else if (value.getAdLogType() == 1){ >> >> long[] lineitemids = value.getAdImprLog().getItmClmbLIds(); >> if (value.getAdImprLog().isVisible()) { >> for (int i = 0; i < lineitemids.length; i++) { >> long li = lineitemids[i]; >> if (li == TARGETED_LI) { >> // log.info("valid impression ids= " + >> value.getAdImprLog().toString()); >> String liKey = String.format("P:LIS:%s:%s:I", li, date); >> l.add(new KeyValue<>(liKey, 1)); >> } >> } >> } >> return l; >> } >> return l; >> }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer()) >> .reduce((value1, value2) -> value1 + value2, >> LINE_ITEM_COUNT_STORE); >> return liCount; >> } >> >> -Sameer. >> >> On Tue, Jun 13, 2017 at 10:35 PM, Matthias J. Sax <matth...@confluent.io> >> wrote: >> >>> Sameer, >>> >>> if you write a single key, all your input data should be in a single >>> partition. As Streams scales out via partitions, you cannot have a >>> second instance as data from one partition is never split. Thus, all >>> data will go to one instance while the second instance should be idle. >>> >>> So what I don't understand is, why you see that machine 2 output a >>> counter of 1 -- it should not output anything. Maybe you can give some >>> more details about your setup? >>> >>> -Matthias >>> >>> On 6/13/17 5:00 AM, Sameer Kumar wrote: >>> > Hi, >>> > >>> > I witnessed a strange behaviour in KafkaStreams, need help in >>> understanding >>> > the same. >>> > >>> > I created an application for aggregating clicks per user, I want to >>> process >>> > it only for 1 user( i was writing only a single key). >>> > When I ran application on one machine, it was running fine.Now, to >>> > loadbalance it , I started another node. >>> > >>> > My expectation was that the counter should be in sync, i.e. if output >>> from >>> > one machine is key, 100 and another machine should read this and the >>> value >>> > should be key, 101. >>> > But, that didnt happened. Instead, on machine 2, the counter started >>> with 1. >>> > >>> > >>> > Regards, >>> > -Sameer. >>> > >>> >>> >> >