The input topic contains 60 partitions and data is distributed well across different partitions on different keys. While consumption, I am doing some filtering and writing only single key data.
Output would be something of the form:- Machine 1 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount k=P:LIS:1236667:2017_06_13:I,v=651 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount k=P:LIS:1236667:2017_06_13:I,v=652 Machine 2 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount k=P:LIS:1236667:2017_06_13:I,v=1 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount k=P:LIS:1236667:2017_06_13:I,v=2 I am sharing a snippet of code, private KTable<String, Integer> extractLICount(KStream<Windowed<String>, AdLog> joinedImprLogs) { KTable<String, Integer> liCount = joinedImprLogs.flatMap((key, value) -> { List<KeyValue<String, Integer>> l = new ArrayList<>(); if (value == null) { return l; } String date = new SimpleDateFormat("yyyy_MM_dd").format(new Date(key.window().end())); // Lineitemids if (value != null && value.getAdLogType() == 3) { // log.info("Invalid data: " + value); return l; } if (value.getAdLogType() == 2) { long lineitemid = value.getAdClickLog().getItmClmbLId(); if (lineitemid == TARGETED_LI) { String liKey = String.format("P:LIS:%s:%s:C", lineitemid, date); l.add(new KeyValue<>(liKey, 1)); } return l; } else if (value.getAdLogType() == 1){ long[] lineitemids = value.getAdImprLog().getItmClmbLIds(); if (value.getAdImprLog().isVisible()) { for (int i = 0; i < lineitemids.length; i++) { long li = lineitemids[i]; if (li == TARGETED_LI) { // log.info("valid impression ids= " + value.getAdImprLog().toString()); String liKey = String.format("P:LIS:%s:%s:I", li, date); l.add(new KeyValue<>(liKey, 1)); } } } return l; } return l; }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer()) .reduce((value1, value2) -> value1 + value2, LINE_ITEM_COUNT_STORE); return liCount; } On Wed, Jun 14, 2017 at 10:55 AM, Sameer Kumar <sam.kum.w...@gmail.com> wrote: > The input topic contains 60 partitions and data is distributed well across > different partitions on different keys. While consumption, I am doing some > filtering and writing only single key data. > > Output would be something of the form:- Machine 1 > > 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount > k=P:LIS:1236667:2017_06_13:I,v=651 > 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount > k=P:LIS:1236667:2017_06_13:I,v=652 > > Machine 2 > 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount > k=P:LIS:1236667:2017_06_13:I,v=1 > 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount > k=P:LIS:1236667:2017_06_13:I,v=2 > > I am sharing a snippet of code, > > private KTable<String, Integer> extractLICount(KStream<Windowed<String>, > AdLog> joinedImprLogs) { > KTable<String, Integer> liCount = joinedImprLogs.flatMap((key, value) > -> { > List<KeyValue<String, Integer>> l = new ArrayList<>(); > if (value == null) { > return l; > } > String date = new SimpleDateFormat("yyyy_MM_dd").format(new > Date(key.window().end())); > // Lineitemids > if (value != null && value.getAdLogType() == 3) { > // log.info("Invalid data: " + value); > return l; > } > if (value.getAdLogType() == 2) { > long lineitemid = value.getAdClickLog().getItmClmbLId(); > if (lineitemid == TARGETED_LI) { > String liKey = String.format("P:LIS:%s:%s:C", lineitemid, date); > l.add(new KeyValue<>(liKey, 1)); > } > return l; > } else if (value.getAdLogType() == 1){ > > long[] lineitemids = value.getAdImprLog().getItmClmbLIds(); > if (value.getAdImprLog().isVisible()) { > for (int i = 0; i < lineitemids.length; i++) { > long li = lineitemids[i]; > if (li == TARGETED_LI) { > // log.info("valid impression ids= " + > value.getAdImprLog().toString()); > String liKey = String.format("P:LIS:%s:%s:I", li, date); > l.add(new KeyValue<>(liKey, 1)); > } > } > } > return l; > } > return l; > }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer()) > .reduce((value1, value2) -> value1 + value2, > LINE_ITEM_COUNT_STORE); > return liCount; > } > > -Sameer. > > On Tue, Jun 13, 2017 at 10:35 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> Sameer, >> >> if you write a single key, all your input data should be in a single >> partition. As Streams scales out via partitions, you cannot have a >> second instance as data from one partition is never split. Thus, all >> data will go to one instance while the second instance should be idle. >> >> So what I don't understand is, why you see that machine 2 output a >> counter of 1 -- it should not output anything. Maybe you can give some >> more details about your setup? >> >> -Matthias >> >> On 6/13/17 5:00 AM, Sameer Kumar wrote: >> > Hi, >> > >> > I witnessed a strange behaviour in KafkaStreams, need help in >> understanding >> > the same. >> > >> > I created an application for aggregating clicks per user, I want to >> process >> > it only for 1 user( i was writing only a single key). >> > When I ran application on one machine, it was running fine.Now, to >> > loadbalance it , I started another node. >> > >> > My expectation was that the counter should be in sync, i.e. if output >> from >> > one machine is key, 100 and another machine should read this and the >> value >> > should be key, 101. >> > But, that didnt happened. Instead, on machine 2, the counter started >> with 1. >> > >> > >> > Regards, >> > -Sameer. >> > >> >> >