The input topic contains 60 partitions and data is distributed well across
different partitions on different keys. While consumption, I am doing some
filtering and writing only single key data.

Output would be something of the form:- Machine 1

2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
k=P:LIS:1236667:2017_06_13:I,v=651
2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
k=P:LIS:1236667:2017_06_13:I,v=652

Machine 2
2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
k=P:LIS:1236667:2017_06_13:I,v=1
2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
k=P:LIS:1236667:2017_06_13:I,v=2

I am sharing a snippet of code,

private KTable<String, Integer> extractLICount(KStream<Windowed<String>,
AdLog> joinedImprLogs) {
    KTable<String, Integer> liCount = joinedImprLogs.flatMap((key, value)
-> {
      List<KeyValue<String, Integer>> l = new ArrayList<>();
      if (value == null) {
        return l;
      }
      String date = new SimpleDateFormat("yyyy_MM_dd").format(new
Date(key.window().end()));
      // Lineitemids
      if (value != null && value.getAdLogType() == 3) {
        // log.info("Invalid data: " + value);
        return l;
      }
      if (value.getAdLogType() == 2) {
        long lineitemid = value.getAdClickLog().getItmClmbLId();
        if (lineitemid == TARGETED_LI) {
          String liKey = String.format("P:LIS:%s:%s:C", lineitemid, date);
          l.add(new KeyValue<>(liKey, 1));
        }
        return l;
      } else if (value.getAdLogType() == 1){

        long[] lineitemids = value.getAdImprLog().getItmClmbLIds();
        if (value.getAdImprLog().isVisible()) {
          for (int i = 0; i < lineitemids.length; i++) {
            long li = lineitemids[i];
            if (li == TARGETED_LI) {
              // log.info("valid impression ids= " +
value.getAdImprLog().toString());
              String liKey = String.format("P:LIS:%s:%s:I", li, date);
              l.add(new KeyValue<>(liKey, 1));
            }
          }
        }
        return l;
      }
      return l;
    }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer())
        .reduce((value1, value2) -> value1 + value2, LINE_ITEM_COUNT_STORE);
    return liCount;
  }

On Wed, Jun 14, 2017 at 10:55 AM, Sameer Kumar <sam.kum.w...@gmail.com>
wrote:

> The input topic contains 60 partitions and data is distributed well across
> different partitions on different keys. While consumption, I am doing some
> filtering and writing only single key data.
>
> Output would be something of the form:- Machine 1
>
> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
> k=P:LIS:1236667:2017_06_13:I,v=651
> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
> k=P:LIS:1236667:2017_06_13:I,v=652
>
> Machine 2
> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
> k=P:LIS:1236667:2017_06_13:I,v=1
> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
> k=P:LIS:1236667:2017_06_13:I,v=2
>
> I am sharing a snippet of code,
>
> private KTable<String, Integer> extractLICount(KStream<Windowed<String>,
> AdLog> joinedImprLogs) {
>     KTable<String, Integer> liCount = joinedImprLogs.flatMap((key, value)
> -> {
>       List<KeyValue<String, Integer>> l = new ArrayList<>();
>       if (value == null) {
>         return l;
>       }
>       String date = new SimpleDateFormat("yyyy_MM_dd").format(new
> Date(key.window().end()));
>       // Lineitemids
>       if (value != null && value.getAdLogType() == 3) {
>         // log.info("Invalid data: " + value);
>         return l;
>       }
>       if (value.getAdLogType() == 2) {
>         long lineitemid = value.getAdClickLog().getItmClmbLId();
>         if (lineitemid == TARGETED_LI) {
>           String liKey = String.format("P:LIS:%s:%s:C", lineitemid, date);
>           l.add(new KeyValue<>(liKey, 1));
>         }
>         return l;
>       } else if (value.getAdLogType() == 1){
>
>         long[] lineitemids = value.getAdImprLog().getItmClmbLIds();
>         if (value.getAdImprLog().isVisible()) {
>           for (int i = 0; i < lineitemids.length; i++) {
>             long li = lineitemids[i];
>             if (li == TARGETED_LI) {
>               // log.info("valid impression ids= " +
> value.getAdImprLog().toString());
>               String liKey = String.format("P:LIS:%s:%s:I", li, date);
>               l.add(new KeyValue<>(liKey, 1));
>             }
>           }
>         }
>         return l;
>       }
>       return l;
>     }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer())
>         .reduce((value1, value2) -> value1 + value2,
> LINE_ITEM_COUNT_STORE);
>     return liCount;
>   }
>
> -Sameer.
>
> On Tue, Jun 13, 2017 at 10:35 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
>> Sameer,
>>
>> if you write a single key, all your input data should be in a single
>> partition. As Streams scales out via partitions, you cannot have a
>> second instance as data from one partition is never split. Thus, all
>> data will go to one instance while the second instance should be idle.
>>
>> So what I don't understand is, why you see that machine 2 output a
>> counter of 1 -- it should not output anything. Maybe you can give some
>> more details about your setup?
>>
>> -Matthias
>>
>> On 6/13/17 5:00 AM, Sameer Kumar wrote:
>> > Hi,
>> >
>> > I witnessed a strange behaviour in KafkaStreams, need help in
>> understanding
>> > the same.
>> >
>> > I created an application for aggregating clicks per user, I want to
>> process
>> > it only for 1 user( i was writing only a single key).
>> > When I ran application on one machine, it was running fine.Now, to
>> > loadbalance it , I started another node.
>> >
>> > My expectation was that the counter should be in sync, i.e. if output
>> from
>> > one machine is key, 100 and another machine should read this and the
>> value
>> > should be key, 101.
>> > But, that didnt happened. Instead, on machine 2, the counter started
>> with 1.
>> >
>> >
>> > Regards,
>> > -Sameer.
>> >
>>
>>
>

Reply via email to