Ok.. Let me try explain it again.

So, Lets say my source processor has a different key, now the value that it
contains lets say contains an identifier type: which basically denotes
category and I am counting on different categories. A specific case would
be I do a filter and outputs only a specific category: lets say cat1...Now,
starting from source processor which is being processed across multiple
nodes, cat1 would be written across multiple nodes, since state store is
local..it will be starting from 1 on each of the nodes.

Does it clarifies the doubt. I think after a processor the same gets
written to a intermediate Kafka topic and this should be taken care of but
this happens, I am not sure on it.

Processor 1
key1, value:{xyz....cat1}
key2, value:{xyz....cat2}
key3, value:{xyz....cat3}

Processor 2
Machine1
key: cat1, 1

Machine2
key:cat1,1


-Sameer.

On Thu, Jun 15, 2017 at 10:27 PM, Eno Thereska <eno.there...@gmail.com>
wrote:

> I'm not sure if I fully understand this but let me check:
>
> - if you start 2 instances, one instance will process half of the
> partitions, the other instance will process the other half
> - for any given key, like key 100, it will only be processed on one of the
> instances, not both.
>
> Does this help?
>
> Eno
>
>
>
> > On 15 Jun 2017, at 07:40, Sameer Kumar <sam.kum.w...@gmail.com> wrote:
> >
> > Also, I am writing a single key in the output all the time. I believe
> > machine2 will have to write a key and since a state store is local it
> > wouldn't know about the counter state on another machine. So, I guess
> this
> > will happen.
> >
> > -Sameer.
> >
> > On Thu, Jun 15, 2017 at 11:11 AM, Sameer Kumar <sam.kum.w...@gmail.com>
> > wrote:
> >
> >> The input topic contains 60 partitions and data is distributed well
> across
> >> different partitions on different keys. While consumption, I am doing
> some
> >> filtering and writing only single key data.
> >>
> >> Output would be something of the form:- Machine 1
> >>
> >> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
> >> k=P:LIS:1236667:2017_06_13:I,v=651
> >> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
> >> k=P:LIS:1236667:2017_06_13:I,v=652
> >>
> >> Machine 2
> >> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
> >> k=P:LIS:1236667:2017_06_13:I,v=1
> >> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
> >> k=P:LIS:1236667:2017_06_13:I,v=2
> >>
> >> I am sharing a snippet of code,
> >>
> >> private KTable<String, Integer> extractLICount(KStream<
> Windowed<String>,
> >> AdLog> joinedImprLogs) {
> >>    KTable<String, Integer> liCount = joinedImprLogs.flatMap((key, value)
> >> -> {
> >>      List<KeyValue<String, Integer>> l = new ArrayList<>();
> >>      if (value == null) {
> >>        return l;
> >>      }
> >>      String date = new SimpleDateFormat("yyyy_MM_dd").format(new
> >> Date(key.window().end()));
> >>      // Lineitemids
> >>      if (value != null && value.getAdLogType() == 3) {
> >>        // log.info("Invalid data: " + value);
> >>        return l;
> >>      }
> >>      if (value.getAdLogType() == 2) {
> >>        long lineitemid = value.getAdClickLog().getItmClmbLId();
> >>        if (lineitemid == TARGETED_LI) {
> >>          String liKey = String.format("P:LIS:%s:%s:C", lineitemid,
> date);
> >>          l.add(new KeyValue<>(liKey, 1));
> >>        }
> >>        return l;
> >>      } else if (value.getAdLogType() == 1){
> >>
> >>        long[] lineitemids = value.getAdImprLog().getItmClmbLIds();
> >>        if (value.getAdImprLog().isVisible()) {
> >>          for (int i = 0; i < lineitemids.length; i++) {
> >>            long li = lineitemids[i];
> >>            if (li == TARGETED_LI) {
> >>              // log.info("valid impression ids= " +
> >> value.getAdImprLog().toString());
> >>              String liKey = String.format("P:LIS:%s:%s:I", li, date);
> >>              l.add(new KeyValue<>(liKey, 1));
> >>            }
> >>          }
> >>        }
> >>        return l;
> >>      }
> >>      return l;
> >>    }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer())
> >>        .reduce((value1, value2) -> value1 + value2,
> >> LINE_ITEM_COUNT_STORE);
> >>    return liCount;
> >>  }
> >>
> >> On Wed, Jun 14, 2017 at 10:55 AM, Sameer Kumar <sam.kum.w...@gmail.com>
> >> wrote:
> >>
> >>> The input topic contains 60 partitions and data is distributed well
> >>> across different partitions on different keys. While consumption, I am
> >>> doing some filtering and writing only single key data.
> >>>
> >>> Output would be something of the form:- Machine 1
> >>>
> >>> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
> >>> k=P:LIS:1236667:2017_06_13:I,v=651
> >>> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
> >>> k=P:LIS:1236667:2017_06_13:I,v=652
> >>>
> >>> Machine 2
> >>> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
> >>> k=P:LIS:1236667:2017_06_13:I,v=1
> >>> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
> >>> k=P:LIS:1236667:2017_06_13:I,v=2
> >>>
> >>> I am sharing a snippet of code,
> >>>
> >>> private KTable<String, Integer> extractLICount(KStream<
> Windowed<String>,
> >>> AdLog> joinedImprLogs) {
> >>>    KTable<String, Integer> liCount = joinedImprLogs.flatMap((key,
> value)
> >>> -> {
> >>>      List<KeyValue<String, Integer>> l = new ArrayList<>();
> >>>      if (value == null) {
> >>>        return l;
> >>>      }
> >>>      String date = new SimpleDateFormat("yyyy_MM_dd").format(new
> >>> Date(key.window().end()));
> >>>      // Lineitemids
> >>>      if (value != null && value.getAdLogType() == 3) {
> >>>        // log.info("Invalid data: " + value);
> >>>        return l;
> >>>      }
> >>>      if (value.getAdLogType() == 2) {
> >>>        long lineitemid = value.getAdClickLog().getItmClmbLId();
> >>>        if (lineitemid == TARGETED_LI) {
> >>>          String liKey = String.format("P:LIS:%s:%s:C", lineitemid,
> date);
> >>>          l.add(new KeyValue<>(liKey, 1));
> >>>        }
> >>>        return l;
> >>>      } else if (value.getAdLogType() == 1){
> >>>
> >>>        long[] lineitemids = value.getAdImprLog().getItmClmbLIds();
> >>>        if (value.getAdImprLog().isVisible()) {
> >>>          for (int i = 0; i < lineitemids.length; i++) {
> >>>            long li = lineitemids[i];
> >>>            if (li == TARGETED_LI) {
> >>>              // log.info("valid impression ids= " +
> >>> value.getAdImprLog().toString());
> >>>              String liKey = String.format("P:LIS:%s:%s:I", li, date);
> >>>              l.add(new KeyValue<>(liKey, 1));
> >>>            }
> >>>          }
> >>>        }
> >>>        return l;
> >>>      }
> >>>      return l;
> >>>    }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer())
> >>>        .reduce((value1, value2) -> value1 + value2,
> >>> LINE_ITEM_COUNT_STORE);
> >>>    return liCount;
> >>>  }
> >>>
> >>> -Sameer.
> >>>
> >>> On Tue, Jun 13, 2017 at 10:35 PM, Matthias J. Sax <
> matth...@confluent.io>
> >>> wrote:
> >>>
> >>>> Sameer,
> >>>>
> >>>> if you write a single key, all your input data should be in a single
> >>>> partition. As Streams scales out via partitions, you cannot have a
> >>>> second instance as data from one partition is never split. Thus, all
> >>>> data will go to one instance while the second instance should be idle.
> >>>>
> >>>> So what I don't understand is, why you see that machine 2 output a
> >>>> counter of 1 -- it should not output anything. Maybe you can give some
> >>>> more details about your setup?
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 6/13/17 5:00 AM, Sameer Kumar wrote:
> >>>>> Hi,
> >>>>>
> >>>>> I witnessed a strange behaviour in KafkaStreams, need help in
> >>>> understanding
> >>>>> the same.
> >>>>>
> >>>>> I created an application for aggregating clicks per user, I want to
> >>>> process
> >>>>> it only for 1 user( i was writing only a single key).
> >>>>> When I ran application on one machine, it was running fine.Now, to
> >>>>> loadbalance it , I started another node.
> >>>>>
> >>>>> My expectation was that the counter should be in sync, i.e. if output
> >>>> from
> >>>>> one machine is key, 100 and another machine should read this and the
> >>>> value
> >>>>> should be key, 101.
> >>>>> But, that didnt happened. Instead, on machine 2, the counter started
> >>>> with 1.
> >>>>>
> >>>>>
> >>>>> Regards,
> >>>>> -Sameer.
> >>>>>
> >>>>
> >>>>
> >>>
> >>
>
>

Reply via email to