The example I gave was just for illustration. I have impression logs and
notification logs. Notification logs are essentially tied to impressions
served. An impression would serve multiple items.

I was just trying to aggregate across a single line item, this means I am
always generating a single key all the time and since data is streaming, my
counter should keep on increasing.

What I saw was that while on Machine1, the counter was 100 , another
machine it was at 1. I saw it as inconsistent.


On Fri, Jun 16, 2017 at 10:47 PM, Matthias J. Sax <>

> I just double checked you example code from an email before. There you
> are using:
> stream.flatMap(...)
>       .groupBy((k, v) -> k, Serdes.String(), Serdes.Integer())
>       .reduce((value1, value2) -> value1 + value2,
> In you last email, you say that you want to count on category that is
> contained in you value and your key is something different.
> If you want to count by category, you need to set the category as key in
> you .groupBy() -- in you code snipped, you actually don't change the key
> (seems to be the root cause of the issue you see).
> Does this help?
> -Matthias
> On 6/15/17 11:48 PM, Sameer Kumar wrote:
> > Ok.. Let me try explain it again.
> >
> > So, Lets say my source processor has a different key, now the value that
> it
> > contains lets say contains an identifier type: which basically denotes
> > category and I am counting on different categories. A specific case would
> > be I do a filter and outputs only a specific category: lets say
> cat1...Now,
> > starting from source processor which is being processed across multiple
> > nodes, cat1 would be written across multiple nodes, since state store is
> > will be starting from 1 on each of the nodes.
> >
> > Does it clarifies the doubt. I think after a processor the same gets
> > written to a intermediate Kafka topic and this should be taken care of
> but
> > this happens, I am not sure on it.
> >
> > Processor 1
> > key1, value:{xyz....cat1}
> > key2, value:{xyz....cat2}
> > key3, value:{xyz....cat3}
> >
> > Processor 2
> > Machine1
> > key: cat1, 1
> >
> > Machine2
> > key:cat1,1
> >
> >
> > -Sameer.
> >
> > On Thu, Jun 15, 2017 at 10:27 PM, Eno Thereska <>
> > wrote:
> >
> >> I'm not sure if I fully understand this but let me check:
> >>
> >> - if you start 2 instances, one instance will process half of the
> >> partitions, the other instance will process the other half
> >> - for any given key, like key 100, it will only be processed on one of
> the
> >> instances, not both.
> >>
> >> Does this help?
> >>
> >> Eno
> >>
> >>
> >>
> >>> On 15 Jun 2017, at 07:40, Sameer Kumar <> wrote:
> >>>
> >>> Also, I am writing a single key in the output all the time. I believe
> >>> machine2 will have to write a key and since a state store is local it
> >>> wouldn't know about the counter state on another machine. So, I guess
> >> this
> >>> will happen.
> >>>
> >>> -Sameer.
> >>>
> >>> On Thu, Jun 15, 2017 at 11:11 AM, Sameer Kumar <
> >
> >>> wrote:
> >>>
> >>>> The input topic contains 60 partitions and data is distributed well
> >> across
> >>>> different partitions on different keys. While consumption, I am doing
> >> some
> >>>> filtering and writing only single key data.
> >>>>
> >>>> Output would be something of the form:- Machine 1
> >>>>
> >>>> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
> >>>> k=P:LIS:1236667:2017_06_13:I,v=651
> >>>> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
> >>>> k=P:LIS:1236667:2017_06_13:I,v=652
> >>>>
> >>>> Machine 2
> >>>> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
> >>>> k=P:LIS:1236667:2017_06_13:I,v=1
> >>>> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
> >>>> k=P:LIS:1236667:2017_06_13:I,v=2
> >>>>
> >>>> I am sharing a snippet of code,
> >>>>
> >>>> private KTable<String, Integer> extractLICount(KStream<
> >> Windowed<String>,
> >>>> AdLog> joinedImprLogs) {
> >>>>    KTable<String, Integer> liCount = joinedImprLogs.flatMap((key,
> value)
> >>>> -> {
> >>>>      List<KeyValue<String, Integer>> l = new ArrayList<>();
> >>>>      if (value == null) {
> >>>>        return l;
> >>>>      }
> >>>>      String date = new SimpleDateFormat("yyyy_MM_dd").format(new
> >>>> Date(key.window().end()));
> >>>>      // Lineitemids
> >>>>      if (value != null && value.getAdLogType() == 3) {
> >>>>        //"Invalid data: " + value);
> >>>>        return l;
> >>>>      }
> >>>>      if (value.getAdLogType() == 2) {
> >>>>        long lineitemid = value.getAdClickLog().getItmClmbLId();
> >>>>        if (lineitemid == TARGETED_LI) {
> >>>>          String liKey = String.format("P:LIS:%s:%s:C", lineitemid,
> >> date);
> >>>>          l.add(new KeyValue<>(liKey, 1));
> >>>>        }
> >>>>        return l;
> >>>>      } else if (value.getAdLogType() == 1){
> >>>>
> >>>>        long[] lineitemids = value.getAdImprLog().getItmClmbLIds();
> >>>>        if (value.getAdImprLog().isVisible()) {
> >>>>          for (int i = 0; i < lineitemids.length; i++) {
> >>>>            long li = lineitemids[i];
> >>>>            if (li == TARGETED_LI) {
> >>>>              //"valid impression ids= " +
> >>>> value.getAdImprLog().toString());
> >>>>              String liKey = String.format("P:LIS:%s:%s:I", li, date);
> >>>>              l.add(new KeyValue<>(liKey, 1));
> >>>>            }
> >>>>          }
> >>>>        }
> >>>>        return l;
> >>>>      }
> >>>>      return l;
> >>>>    }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer())
> >>>>        .reduce((value1, value2) -> value1 + value2,
> >>>>    return liCount;
> >>>>  }
> >>>>
> >>>> On Wed, Jun 14, 2017 at 10:55 AM, Sameer Kumar <
> >>>> wrote:
> >>>>
> >>>>> The input topic contains 60 partitions and data is distributed well
> >>>>> across different partitions on different keys. While consumption, I
> am
> >>>>> doing some filtering and writing only single key data.
> >>>>>
> >>>>> Output would be something of the form:- Machine 1
> >>>>>
> >>>>> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
> >>>>> k=P:LIS:1236667:2017_06_13:I,v=651
> >>>>> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
> >>>>> k=P:LIS:1236667:2017_06_13:I,v=652
> >>>>>
> >>>>> Machine 2
> >>>>> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
> >>>>> k=P:LIS:1236667:2017_06_13:I,v=1
> >>>>> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
> >>>>> k=P:LIS:1236667:2017_06_13:I,v=2
> >>>>>
> >>>>> I am sharing a snippet of code,
> >>>>>
> >>>>> private KTable<String, Integer> extractLICount(KStream<
> >> Windowed<String>,
> >>>>> AdLog> joinedImprLogs) {
> >>>>>    KTable<String, Integer> liCount = joinedImprLogs.flatMap((key,
> >> value)
> >>>>> -> {
> >>>>>      List<KeyValue<String, Integer>> l = new ArrayList<>();
> >>>>>      if (value == null) {
> >>>>>        return l;
> >>>>>      }
> >>>>>      String date = new SimpleDateFormat("yyyy_MM_dd").format(new
> >>>>> Date(key.window().end()));
> >>>>>      // Lineitemids
> >>>>>      if (value != null && value.getAdLogType() == 3) {
> >>>>>        //"Invalid data: " + value);
> >>>>>        return l;
> >>>>>      }
> >>>>>      if (value.getAdLogType() == 2) {
> >>>>>        long lineitemid = value.getAdClickLog().getItmClmbLId();
> >>>>>        if (lineitemid == TARGETED_LI) {
> >>>>>          String liKey = String.format("P:LIS:%s:%s:C", lineitemid,
> >> date);
> >>>>>          l.add(new KeyValue<>(liKey, 1));
> >>>>>        }
> >>>>>        return l;
> >>>>>      } else if (value.getAdLogType() == 1){
> >>>>>
> >>>>>        long[] lineitemids = value.getAdImprLog().getItmClmbLIds();
> >>>>>        if (value.getAdImprLog().isVisible()) {
> >>>>>          for (int i = 0; i < lineitemids.length; i++) {
> >>>>>            long li = lineitemids[i];
> >>>>>            if (li == TARGETED_LI) {
> >>>>>              //"valid impression ids= " +
> >>>>> value.getAdImprLog().toString());
> >>>>>              String liKey = String.format("P:LIS:%s:%s:I", li, date);
> >>>>>              l.add(new KeyValue<>(liKey, 1));
> >>>>>            }
> >>>>>          }
> >>>>>        }
> >>>>>        return l;
> >>>>>      }
> >>>>>      return l;
> >>>>>    }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer())
> >>>>>        .reduce((value1, value2) -> value1 + value2,
> >>>>>    return liCount;
> >>>>>  }
> >>>>>
> >>>>> -Sameer.
> >>>>>
> >>>>> On Tue, Jun 13, 2017 at 10:35 PM, Matthias J. Sax <
> >>>
> >>>>> wrote:
> >>>>>
> >>>>>> Sameer,
> >>>>>>
> >>>>>> if you write a single key, all your input data should be in a single
> >>>>>> partition. As Streams scales out via partitions, you cannot have a
> >>>>>> second instance as data from one partition is never split. Thus, all
> >>>>>> data will go to one instance while the second instance should be
> idle.
> >>>>>>
> >>>>>> So what I don't understand is, why you see that machine 2 output a
> >>>>>> counter of 1 -- it should not output anything. Maybe you can give
> some
> >>>>>> more details about your setup?
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>> On 6/13/17 5:00 AM, Sameer Kumar wrote:
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> I witnessed a strange behaviour in KafkaStreams, need help in
> >>>>>> understanding
> >>>>>>> the same.
> >>>>>>>
> >>>>>>> I created an application for aggregating clicks per user, I want to
> >>>>>> process
> >>>>>>> it only for 1 user( i was writing only a single key).
> >>>>>>> When I ran application on one machine, it was running fine.Now, to
> >>>>>>> loadbalance it , I started another node.
> >>>>>>>
> >>>>>>> My expectation was that the counter should be in sync, i.e. if
> output
> >>>>>> from
> >>>>>>> one machine is key, 100 and another machine should read this and
> the
> >>>>>> value
> >>>>>>> should be key, 101.
> >>>>>>> But, that didnt happened. Instead, on machine 2, the counter
> started
> >>>>>> with 1.
> >>>>>>>
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>> -Sameer.
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> >>
> >

Reply via email to