Continued from m last mail...

The code snippet that I shared was after joining impression and
notification logs. Here I am picking the line item and concatenating it
with date. You can also see there is a check for a TARGETED_LINE_ITEM, I am
not emitting the data otherwise.

-Sameer.

On Sat, Jun 17, 2017 at 3:55 PM, Sameer Kumar <sam.kum.w...@gmail.com>
wrote:

> The example I gave was just for illustration. I have impression logs and
> notification logs. Notification logs are essentially tied to impressions
> served. An impression would serve multiple items.
>
> I was just trying to aggregate across a single line item, this means I am
> always generating a single key all the time and since data is streaming, my
> counter should keep on increasing.
>
> What I saw was that while on Machine1, the counter was 100 , another
> machine it was at 1. I saw it as inconsistent.
>
>
> -Sameer.
>
> On Fri, Jun 16, 2017 at 10:47 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
>> I just double checked you example code from an email before. There you
>> are using:
>>
>> stream.flatMap(...)
>>       .groupBy((k, v) -> k, Serdes.String(), Serdes.Integer())
>>       .reduce((value1, value2) -> value1 + value2,
>>
>> In you last email, you say that you want to count on category that is
>> contained in you value and your key is something different.
>>
>> If you want to count by category, you need to set the category as key in
>> you .groupBy() -- in you code snipped, you actually don't change the key
>> (seems to be the root cause of the issue you see).
>>
>> Does this help?
>>
>>
>> -Matthias
>>
>>
>> On 6/15/17 11:48 PM, Sameer Kumar wrote:
>> > Ok.. Let me try explain it again.
>> >
>> > So, Lets say my source processor has a different key, now the value
>> that it
>> > contains lets say contains an identifier type: which basically denotes
>> > category and I am counting on different categories. A specific case
>> would
>> > be I do a filter and outputs only a specific category: lets say
>> cat1...Now,
>> > starting from source processor which is being processed across multiple
>> > nodes, cat1 would be written across multiple nodes, since state store is
>> > local..it will be starting from 1 on each of the nodes.
>> >
>> > Does it clarifies the doubt. I think after a processor the same gets
>> > written to a intermediate Kafka topic and this should be taken care of
>> but
>> > this happens, I am not sure on it.
>> >
>> > Processor 1
>> > key1, value:{xyz....cat1}
>> > key2, value:{xyz....cat2}
>> > key3, value:{xyz....cat3}
>> >
>> > Processor 2
>> > Machine1
>> > key: cat1, 1
>> >
>> > Machine2
>> > key:cat1,1
>> >
>> >
>> > -Sameer.
>> >
>> > On Thu, Jun 15, 2017 at 10:27 PM, Eno Thereska <eno.there...@gmail.com>
>> > wrote:
>> >
>> >> I'm not sure if I fully understand this but let me check:
>> >>
>> >> - if you start 2 instances, one instance will process half of the
>> >> partitions, the other instance will process the other half
>> >> - for any given key, like key 100, it will only be processed on one of
>> the
>> >> instances, not both.
>> >>
>> >> Does this help?
>> >>
>> >> Eno
>> >>
>> >>
>> >>
>> >>> On 15 Jun 2017, at 07:40, Sameer Kumar <sam.kum.w...@gmail.com>
>> wrote:
>> >>>
>> >>> Also, I am writing a single key in the output all the time. I believe
>> >>> machine2 will have to write a key and since a state store is local it
>> >>> wouldn't know about the counter state on another machine. So, I guess
>> >> this
>> >>> will happen.
>> >>>
>> >>> -Sameer.
>> >>>
>> >>> On Thu, Jun 15, 2017 at 11:11 AM, Sameer Kumar <
>> sam.kum.w...@gmail.com>
>> >>> wrote:
>> >>>
>> >>>> The input topic contains 60 partitions and data is distributed well
>> >> across
>> >>>> different partitions on different keys. While consumption, I am doing
>> >> some
>> >>>> filtering and writing only single key data.
>> >>>>
>> >>>> Output would be something of the form:- Machine 1
>> >>>>
>> >>>> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
>> >>>> k=P:LIS:1236667:2017_06_13:I,v=651
>> >>>> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
>> >>>> k=P:LIS:1236667:2017_06_13:I,v=652
>> >>>>
>> >>>> Machine 2
>> >>>> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
>> >>>> k=P:LIS:1236667:2017_06_13:I,v=1
>> >>>> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
>> >>>> k=P:LIS:1236667:2017_06_13:I,v=2
>> >>>>
>> >>>> I am sharing a snippet of code,
>> >>>>
>> >>>> private KTable<String, Integer> extractLICount(KStream<
>> >> Windowed<String>,
>> >>>> AdLog> joinedImprLogs) {
>> >>>>    KTable<String, Integer> liCount = joinedImprLogs.flatMap((key,
>> value)
>> >>>> -> {
>> >>>>      List<KeyValue<String, Integer>> l = new ArrayList<>();
>> >>>>      if (value == null) {
>> >>>>        return l;
>> >>>>      }
>> >>>>      String date = new SimpleDateFormat("yyyy_MM_dd").format(new
>> >>>> Date(key.window().end()));
>> >>>>      // Lineitemids
>> >>>>      if (value != null && value.getAdLogType() == 3) {
>> >>>>        // log.info("Invalid data: " + value);
>> >>>>        return l;
>> >>>>      }
>> >>>>      if (value.getAdLogType() == 2) {
>> >>>>        long lineitemid = value.getAdClickLog().getItmClmbLId();
>> >>>>        if (lineitemid == TARGETED_LI) {
>> >>>>          String liKey = String.format("P:LIS:%s:%s:C", lineitemid,
>> >> date);
>> >>>>          l.add(new KeyValue<>(liKey, 1));
>> >>>>        }
>> >>>>        return l;
>> >>>>      } else if (value.getAdLogType() == 1){
>> >>>>
>> >>>>        long[] lineitemids = value.getAdImprLog().getItmClmbLIds();
>> >>>>        if (value.getAdImprLog().isVisible()) {
>> >>>>          for (int i = 0; i < lineitemids.length; i++) {
>> >>>>            long li = lineitemids[i];
>> >>>>            if (li == TARGETED_LI) {
>> >>>>              // log.info("valid impression ids= " +
>> >>>> value.getAdImprLog().toString());
>> >>>>              String liKey = String.format("P:LIS:%s:%s:I", li, date);
>> >>>>              l.add(new KeyValue<>(liKey, 1));
>> >>>>            }
>> >>>>          }
>> >>>>        }
>> >>>>        return l;
>> >>>>      }
>> >>>>      return l;
>> >>>>    }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer())
>> >>>>        .reduce((value1, value2) -> value1 + value2,
>> >>>> LINE_ITEM_COUNT_STORE);
>> >>>>    return liCount;
>> >>>>  }
>> >>>>
>> >>>> On Wed, Jun 14, 2017 at 10:55 AM, Sameer Kumar <
>> sam.kum.w...@gmail.com>
>> >>>> wrote:
>> >>>>
>> >>>>> The input topic contains 60 partitions and data is distributed well
>> >>>>> across different partitions on different keys. While consumption, I
>> am
>> >>>>> doing some filtering and writing only single key data.
>> >>>>>
>> >>>>> Output would be something of the form:- Machine 1
>> >>>>>
>> >>>>> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
>> >>>>> k=P:LIS:1236667:2017_06_13:I,v=651
>> >>>>> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
>> >>>>> k=P:LIS:1236667:2017_06_13:I,v=652
>> >>>>>
>> >>>>> Machine 2
>> >>>>> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
>> >>>>> k=P:LIS:1236667:2017_06_13:I,v=1
>> >>>>> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
>> >>>>> k=P:LIS:1236667:2017_06_13:I,v=2
>> >>>>>
>> >>>>> I am sharing a snippet of code,
>> >>>>>
>> >>>>> private KTable<String, Integer> extractLICount(KStream<
>> >> Windowed<String>,
>> >>>>> AdLog> joinedImprLogs) {
>> >>>>>    KTable<String, Integer> liCount = joinedImprLogs.flatMap((key,
>> >> value)
>> >>>>> -> {
>> >>>>>      List<KeyValue<String, Integer>> l = new ArrayList<>();
>> >>>>>      if (value == null) {
>> >>>>>        return l;
>> >>>>>      }
>> >>>>>      String date = new SimpleDateFormat("yyyy_MM_dd").format(new
>> >>>>> Date(key.window().end()));
>> >>>>>      // Lineitemids
>> >>>>>      if (value != null && value.getAdLogType() == 3) {
>> >>>>>        // log.info("Invalid data: " + value);
>> >>>>>        return l;
>> >>>>>      }
>> >>>>>      if (value.getAdLogType() == 2) {
>> >>>>>        long lineitemid = value.getAdClickLog().getItmClmbLId();
>> >>>>>        if (lineitemid == TARGETED_LI) {
>> >>>>>          String liKey = String.format("P:LIS:%s:%s:C", lineitemid,
>> >> date);
>> >>>>>          l.add(new KeyValue<>(liKey, 1));
>> >>>>>        }
>> >>>>>        return l;
>> >>>>>      } else if (value.getAdLogType() == 1){
>> >>>>>
>> >>>>>        long[] lineitemids = value.getAdImprLog().getItmClmbLIds();
>> >>>>>        if (value.getAdImprLog().isVisible()) {
>> >>>>>          for (int i = 0; i < lineitemids.length; i++) {
>> >>>>>            long li = lineitemids[i];
>> >>>>>            if (li == TARGETED_LI) {
>> >>>>>              // log.info("valid impression ids= " +
>> >>>>> value.getAdImprLog().toString());
>> >>>>>              String liKey = String.format("P:LIS:%s:%s:I", li,
>> date);
>> >>>>>              l.add(new KeyValue<>(liKey, 1));
>> >>>>>            }
>> >>>>>          }
>> >>>>>        }
>> >>>>>        return l;
>> >>>>>      }
>> >>>>>      return l;
>> >>>>>    }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer())
>> >>>>>        .reduce((value1, value2) -> value1 + value2,
>> >>>>> LINE_ITEM_COUNT_STORE);
>> >>>>>    return liCount;
>> >>>>>  }
>> >>>>>
>> >>>>> -Sameer.
>> >>>>>
>> >>>>> On Tue, Jun 13, 2017 at 10:35 PM, Matthias J. Sax <
>> >> matth...@confluent.io>
>> >>>>> wrote:
>> >>>>>
>> >>>>>> Sameer,
>> >>>>>>
>> >>>>>> if you write a single key, all your input data should be in a
>> single
>> >>>>>> partition. As Streams scales out via partitions, you cannot have a
>> >>>>>> second instance as data from one partition is never split. Thus,
>> all
>> >>>>>> data will go to one instance while the second instance should be
>> idle.
>> >>>>>>
>> >>>>>> So what I don't understand is, why you see that machine 2 output a
>> >>>>>> counter of 1 -- it should not output anything. Maybe you can give
>> some
>> >>>>>> more details about your setup?
>> >>>>>>
>> >>>>>> -Matthias
>> >>>>>>
>> >>>>>> On 6/13/17 5:00 AM, Sameer Kumar wrote:
>> >>>>>>> Hi,
>> >>>>>>>
>> >>>>>>> I witnessed a strange behaviour in KafkaStreams, need help in
>> >>>>>> understanding
>> >>>>>>> the same.
>> >>>>>>>
>> >>>>>>> I created an application for aggregating clicks per user, I want
>> to
>> >>>>>> process
>> >>>>>>> it only for 1 user( i was writing only a single key).
>> >>>>>>> When I ran application on one machine, it was running fine.Now, to
>> >>>>>>> loadbalance it , I started another node.
>> >>>>>>>
>> >>>>>>> My expectation was that the counter should be in sync, i.e. if
>> output
>> >>>>>> from
>> >>>>>>> one machine is key, 100 and another machine should read this and
>> the
>> >>>>>> value
>> >>>>>>> should be key, 101.
>> >>>>>>> But, that didnt happened. Instead, on machine 2, the counter
>> started
>> >>>>>> with 1.
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> Regards,
>> >>>>>>> -Sameer.
>> >>>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>
>> >>
>> >
>>
>>
>

Reply via email to