I'm not sure if I fully understand this but let me check:

- if you start 2 instances, one instance will process half of the partitions, 
the other instance will process the other half
- for any given key, like key 100, it will only be processed on one of the 
instances, not both.

Does this help?

Eno



> On 15 Jun 2017, at 07:40, Sameer Kumar <sam.kum.w...@gmail.com> wrote:
> 
> Also, I am writing a single key in the output all the time. I believe
> machine2 will have to write a key and since a state store is local it
> wouldn't know about the counter state on another machine. So, I guess this
> will happen.
> 
> -Sameer.
> 
> On Thu, Jun 15, 2017 at 11:11 AM, Sameer Kumar <sam.kum.w...@gmail.com>
> wrote:
> 
>> The input topic contains 60 partitions and data is distributed well across
>> different partitions on different keys. While consumption, I am doing some
>> filtering and writing only single key data.
>> 
>> Output would be something of the form:- Machine 1
>> 
>> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
>> k=P:LIS:1236667:2017_06_13:I,v=651
>> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
>> k=P:LIS:1236667:2017_06_13:I,v=652
>> 
>> Machine 2
>> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
>> k=P:LIS:1236667:2017_06_13:I,v=1
>> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
>> k=P:LIS:1236667:2017_06_13:I,v=2
>> 
>> I am sharing a snippet of code,
>> 
>> private KTable<String, Integer> extractLICount(KStream<Windowed<String>,
>> AdLog> joinedImprLogs) {
>>    KTable<String, Integer> liCount = joinedImprLogs.flatMap((key, value)
>> -> {
>>      List<KeyValue<String, Integer>> l = new ArrayList<>();
>>      if (value == null) {
>>        return l;
>>      }
>>      String date = new SimpleDateFormat("yyyy_MM_dd").format(new
>> Date(key.window().end()));
>>      // Lineitemids
>>      if (value != null && value.getAdLogType() == 3) {
>>        // log.info("Invalid data: " + value);
>>        return l;
>>      }
>>      if (value.getAdLogType() == 2) {
>>        long lineitemid = value.getAdClickLog().getItmClmbLId();
>>        if (lineitemid == TARGETED_LI) {
>>          String liKey = String.format("P:LIS:%s:%s:C", lineitemid, date);
>>          l.add(new KeyValue<>(liKey, 1));
>>        }
>>        return l;
>>      } else if (value.getAdLogType() == 1){
>> 
>>        long[] lineitemids = value.getAdImprLog().getItmClmbLIds();
>>        if (value.getAdImprLog().isVisible()) {
>>          for (int i = 0; i < lineitemids.length; i++) {
>>            long li = lineitemids[i];
>>            if (li == TARGETED_LI) {
>>              // log.info("valid impression ids= " +
>> value.getAdImprLog().toString());
>>              String liKey = String.format("P:LIS:%s:%s:I", li, date);
>>              l.add(new KeyValue<>(liKey, 1));
>>            }
>>          }
>>        }
>>        return l;
>>      }
>>      return l;
>>    }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer())
>>        .reduce((value1, value2) -> value1 + value2,
>> LINE_ITEM_COUNT_STORE);
>>    return liCount;
>>  }
>> 
>> On Wed, Jun 14, 2017 at 10:55 AM, Sameer Kumar <sam.kum.w...@gmail.com>
>> wrote:
>> 
>>> The input topic contains 60 partitions and data is distributed well
>>> across different partitions on different keys. While consumption, I am
>>> doing some filtering and writing only single key data.
>>> 
>>> Output would be something of the form:- Machine 1
>>> 
>>> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
>>> k=P:LIS:1236667:2017_06_13:I,v=651
>>> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
>>> k=P:LIS:1236667:2017_06_13:I,v=652
>>> 
>>> Machine 2
>>> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
>>> k=P:LIS:1236667:2017_06_13:I,v=1
>>> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
>>> k=P:LIS:1236667:2017_06_13:I,v=2
>>> 
>>> I am sharing a snippet of code,
>>> 
>>> private KTable<String, Integer> extractLICount(KStream<Windowed<String>,
>>> AdLog> joinedImprLogs) {
>>>    KTable<String, Integer> liCount = joinedImprLogs.flatMap((key, value)
>>> -> {
>>>      List<KeyValue<String, Integer>> l = new ArrayList<>();
>>>      if (value == null) {
>>>        return l;
>>>      }
>>>      String date = new SimpleDateFormat("yyyy_MM_dd").format(new
>>> Date(key.window().end()));
>>>      // Lineitemids
>>>      if (value != null && value.getAdLogType() == 3) {
>>>        // log.info("Invalid data: " + value);
>>>        return l;
>>>      }
>>>      if (value.getAdLogType() == 2) {
>>>        long lineitemid = value.getAdClickLog().getItmClmbLId();
>>>        if (lineitemid == TARGETED_LI) {
>>>          String liKey = String.format("P:LIS:%s:%s:C", lineitemid, date);
>>>          l.add(new KeyValue<>(liKey, 1));
>>>        }
>>>        return l;
>>>      } else if (value.getAdLogType() == 1){
>>> 
>>>        long[] lineitemids = value.getAdImprLog().getItmClmbLIds();
>>>        if (value.getAdImprLog().isVisible()) {
>>>          for (int i = 0; i < lineitemids.length; i++) {
>>>            long li = lineitemids[i];
>>>            if (li == TARGETED_LI) {
>>>              // log.info("valid impression ids= " +
>>> value.getAdImprLog().toString());
>>>              String liKey = String.format("P:LIS:%s:%s:I", li, date);
>>>              l.add(new KeyValue<>(liKey, 1));
>>>            }
>>>          }
>>>        }
>>>        return l;
>>>      }
>>>      return l;
>>>    }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer())
>>>        .reduce((value1, value2) -> value1 + value2,
>>> LINE_ITEM_COUNT_STORE);
>>>    return liCount;
>>>  }
>>> 
>>> -Sameer.
>>> 
>>> On Tue, Jun 13, 2017 at 10:35 PM, Matthias J. Sax <matth...@confluent.io>
>>> wrote:
>>> 
>>>> Sameer,
>>>> 
>>>> if you write a single key, all your input data should be in a single
>>>> partition. As Streams scales out via partitions, you cannot have a
>>>> second instance as data from one partition is never split. Thus, all
>>>> data will go to one instance while the second instance should be idle.
>>>> 
>>>> So what I don't understand is, why you see that machine 2 output a
>>>> counter of 1 -- it should not output anything. Maybe you can give some
>>>> more details about your setup?
>>>> 
>>>> -Matthias
>>>> 
>>>> On 6/13/17 5:00 AM, Sameer Kumar wrote:
>>>>> Hi,
>>>>> 
>>>>> I witnessed a strange behaviour in KafkaStreams, need help in
>>>> understanding
>>>>> the same.
>>>>> 
>>>>> I created an application for aggregating clicks per user, I want to
>>>> process
>>>>> it only for 1 user( i was writing only a single key).
>>>>> When I ran application on one machine, it was running fine.Now, to
>>>>> loadbalance it , I started another node.
>>>>> 
>>>>> My expectation was that the counter should be in sync, i.e. if output
>>>> from
>>>>> one machine is key, 100 and another machine should read this and the
>>>> value
>>>>> should be key, 101.
>>>>> But, that didnt happened. Instead, on machine 2, the counter started
>>>> with 1.
>>>>> 
>>>>> 
>>>>> Regards,
>>>>> -Sameer.
>>>>> 
>>>> 
>>>> 
>>> 
>> 

Reply via email to