I just double checked you example code from an email before. There you
are using:

stream.flatMap(...)
      .groupBy((k, v) -> k, Serdes.String(), Serdes.Integer())
      .reduce((value1, value2) -> value1 + value2,

In you last email, you say that you want to count on category that is
contained in you value and your key is something different.

If you want to count by category, you need to set the category as key in
you .groupBy() -- in you code snipped, you actually don't change the key
(seems to be the root cause of the issue you see).

Does this help?


-Matthias


On 6/15/17 11:48 PM, Sameer Kumar wrote:
> Ok.. Let me try explain it again.
> 
> So, Lets say my source processor has a different key, now the value that it
> contains lets say contains an identifier type: which basically denotes
> category and I am counting on different categories. A specific case would
> be I do a filter and outputs only a specific category: lets say cat1...Now,
> starting from source processor which is being processed across multiple
> nodes, cat1 would be written across multiple nodes, since state store is
> local..it will be starting from 1 on each of the nodes.
> 
> Does it clarifies the doubt. I think after a processor the same gets
> written to a intermediate Kafka topic and this should be taken care of but
> this happens, I am not sure on it.
> 
> Processor 1
> key1, value:{xyz....cat1}
> key2, value:{xyz....cat2}
> key3, value:{xyz....cat3}
> 
> Processor 2
> Machine1
> key: cat1, 1
> 
> Machine2
> key:cat1,1
> 
> 
> -Sameer.
> 
> On Thu, Jun 15, 2017 at 10:27 PM, Eno Thereska <eno.there...@gmail.com>
> wrote:
> 
>> I'm not sure if I fully understand this but let me check:
>>
>> - if you start 2 instances, one instance will process half of the
>> partitions, the other instance will process the other half
>> - for any given key, like key 100, it will only be processed on one of the
>> instances, not both.
>>
>> Does this help?
>>
>> Eno
>>
>>
>>
>>> On 15 Jun 2017, at 07:40, Sameer Kumar <sam.kum.w...@gmail.com> wrote:
>>>
>>> Also, I am writing a single key in the output all the time. I believe
>>> machine2 will have to write a key and since a state store is local it
>>> wouldn't know about the counter state on another machine. So, I guess
>> this
>>> will happen.
>>>
>>> -Sameer.
>>>
>>> On Thu, Jun 15, 2017 at 11:11 AM, Sameer Kumar <sam.kum.w...@gmail.com>
>>> wrote:
>>>
>>>> The input topic contains 60 partitions and data is distributed well
>> across
>>>> different partitions on different keys. While consumption, I am doing
>> some
>>>> filtering and writing only single key data.
>>>>
>>>> Output would be something of the form:- Machine 1
>>>>
>>>> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
>>>> k=P:LIS:1236667:2017_06_13:I,v=651
>>>> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
>>>> k=P:LIS:1236667:2017_06_13:I,v=652
>>>>
>>>> Machine 2
>>>> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
>>>> k=P:LIS:1236667:2017_06_13:I,v=1
>>>> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
>>>> k=P:LIS:1236667:2017_06_13:I,v=2
>>>>
>>>> I am sharing a snippet of code,
>>>>
>>>> private KTable<String, Integer> extractLICount(KStream<
>> Windowed<String>,
>>>> AdLog> joinedImprLogs) {
>>>>    KTable<String, Integer> liCount = joinedImprLogs.flatMap((key, value)
>>>> -> {
>>>>      List<KeyValue<String, Integer>> l = new ArrayList<>();
>>>>      if (value == null) {
>>>>        return l;
>>>>      }
>>>>      String date = new SimpleDateFormat("yyyy_MM_dd").format(new
>>>> Date(key.window().end()));
>>>>      // Lineitemids
>>>>      if (value != null && value.getAdLogType() == 3) {
>>>>        // log.info("Invalid data: " + value);
>>>>        return l;
>>>>      }
>>>>      if (value.getAdLogType() == 2) {
>>>>        long lineitemid = value.getAdClickLog().getItmClmbLId();
>>>>        if (lineitemid == TARGETED_LI) {
>>>>          String liKey = String.format("P:LIS:%s:%s:C", lineitemid,
>> date);
>>>>          l.add(new KeyValue<>(liKey, 1));
>>>>        }
>>>>        return l;
>>>>      } else if (value.getAdLogType() == 1){
>>>>
>>>>        long[] lineitemids = value.getAdImprLog().getItmClmbLIds();
>>>>        if (value.getAdImprLog().isVisible()) {
>>>>          for (int i = 0; i < lineitemids.length; i++) {
>>>>            long li = lineitemids[i];
>>>>            if (li == TARGETED_LI) {
>>>>              // log.info("valid impression ids= " +
>>>> value.getAdImprLog().toString());
>>>>              String liKey = String.format("P:LIS:%s:%s:I", li, date);
>>>>              l.add(new KeyValue<>(liKey, 1));
>>>>            }
>>>>          }
>>>>        }
>>>>        return l;
>>>>      }
>>>>      return l;
>>>>    }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer())
>>>>        .reduce((value1, value2) -> value1 + value2,
>>>> LINE_ITEM_COUNT_STORE);
>>>>    return liCount;
>>>>  }
>>>>
>>>> On Wed, Jun 14, 2017 at 10:55 AM, Sameer Kumar <sam.kum.w...@gmail.com>
>>>> wrote:
>>>>
>>>>> The input topic contains 60 partitions and data is distributed well
>>>>> across different partitions on different keys. While consumption, I am
>>>>> doing some filtering and writing only single key data.
>>>>>
>>>>> Output would be something of the form:- Machine 1
>>>>>
>>>>> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
>>>>> k=P:LIS:1236667:2017_06_13:I,v=651
>>>>> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
>>>>> k=P:LIS:1236667:2017_06_13:I,v=652
>>>>>
>>>>> Machine 2
>>>>> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
>>>>> k=P:LIS:1236667:2017_06_13:I,v=1
>>>>> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
>>>>> k=P:LIS:1236667:2017_06_13:I,v=2
>>>>>
>>>>> I am sharing a snippet of code,
>>>>>
>>>>> private KTable<String, Integer> extractLICount(KStream<
>> Windowed<String>,
>>>>> AdLog> joinedImprLogs) {
>>>>>    KTable<String, Integer> liCount = joinedImprLogs.flatMap((key,
>> value)
>>>>> -> {
>>>>>      List<KeyValue<String, Integer>> l = new ArrayList<>();
>>>>>      if (value == null) {
>>>>>        return l;
>>>>>      }
>>>>>      String date = new SimpleDateFormat("yyyy_MM_dd").format(new
>>>>> Date(key.window().end()));
>>>>>      // Lineitemids
>>>>>      if (value != null && value.getAdLogType() == 3) {
>>>>>        // log.info("Invalid data: " + value);
>>>>>        return l;
>>>>>      }
>>>>>      if (value.getAdLogType() == 2) {
>>>>>        long lineitemid = value.getAdClickLog().getItmClmbLId();
>>>>>        if (lineitemid == TARGETED_LI) {
>>>>>          String liKey = String.format("P:LIS:%s:%s:C", lineitemid,
>> date);
>>>>>          l.add(new KeyValue<>(liKey, 1));
>>>>>        }
>>>>>        return l;
>>>>>      } else if (value.getAdLogType() == 1){
>>>>>
>>>>>        long[] lineitemids = value.getAdImprLog().getItmClmbLIds();
>>>>>        if (value.getAdImprLog().isVisible()) {
>>>>>          for (int i = 0; i < lineitemids.length; i++) {
>>>>>            long li = lineitemids[i];
>>>>>            if (li == TARGETED_LI) {
>>>>>              // log.info("valid impression ids= " +
>>>>> value.getAdImprLog().toString());
>>>>>              String liKey = String.format("P:LIS:%s:%s:I", li, date);
>>>>>              l.add(new KeyValue<>(liKey, 1));
>>>>>            }
>>>>>          }
>>>>>        }
>>>>>        return l;
>>>>>      }
>>>>>      return l;
>>>>>    }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer())
>>>>>        .reduce((value1, value2) -> value1 + value2,
>>>>> LINE_ITEM_COUNT_STORE);
>>>>>    return liCount;
>>>>>  }
>>>>>
>>>>> -Sameer.
>>>>>
>>>>> On Tue, Jun 13, 2017 at 10:35 PM, Matthias J. Sax <
>> matth...@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> Sameer,
>>>>>>
>>>>>> if you write a single key, all your input data should be in a single
>>>>>> partition. As Streams scales out via partitions, you cannot have a
>>>>>> second instance as data from one partition is never split. Thus, all
>>>>>> data will go to one instance while the second instance should be idle.
>>>>>>
>>>>>> So what I don't understand is, why you see that machine 2 output a
>>>>>> counter of 1 -- it should not output anything. Maybe you can give some
>>>>>> more details about your setup?
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 6/13/17 5:00 AM, Sameer Kumar wrote:
>>>>>>> Hi,
>>>>>>>
>>>>>>> I witnessed a strange behaviour in KafkaStreams, need help in
>>>>>> understanding
>>>>>>> the same.
>>>>>>>
>>>>>>> I created an application for aggregating clicks per user, I want to
>>>>>> process
>>>>>>> it only for 1 user( i was writing only a single key).
>>>>>>> When I ran application on one machine, it was running fine.Now, to
>>>>>>> loadbalance it , I started another node.
>>>>>>>
>>>>>>> My expectation was that the counter should be in sync, i.e. if output
>>>>>> from
>>>>>>> one machine is key, 100 and another machine should read this and the
>>>>>> value
>>>>>>> should be key, 101.
>>>>>>> But, that didnt happened. Instead, on machine 2, the counter started
>>>>>> with 1.
>>>>>>>
>>>>>>>
>>>>>>> Regards,
>>>>>>> -Sameer.
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to