You can look into KStreamReduceProcessor#process() and
KStreamAggregateProcessor#process() This should help to see the input
data for each instance.

count()/aggregate() will use AggregateProcessor while reduce() will use
ReduceProcessor


Hope this helps.


-Matthias

On 6/20/17 10:54 PM, Sameer Kumar wrote:
> Hi Matthias,
> 
> 
> I am working on 10.2.1. and I do see same outputs on same keys albeit not
> always and a bit random.
> 
> I shall try to code a simpler example free from domain level code and share
> it with you in a while.
> 
> I am willing to debug this further as well, if you could tell me the
> classes that I should be looking into.
> 
> -Sameer.
> 
> On Tue, Jun 20, 2017 at 3:51 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> Hi Sameer,
>>
>> With regard to
>>
>>>>> What I saw was that while on Machine1, the counter was 100 , another
>>>>> machine it was at 1. I saw it as inconsistent.
>>
>> If you really see the same key on different machines, that would be
>> incorrect. All record with the same key, must be processed by the same
>> machine. Thus, there not not be two counters, but only one.
>>
>> I am wondering what version of Kafka Streams you are using? Version
>> 0.10.0.x does not have auto-repartitioning feature, and this might
>> explain what you see. For this case, you need to call #through() before
>> doing the aggregation.
>>
>>
>> -Matthias
>>
>>
>> On 6/17/17 3:28 AM, Sameer Kumar wrote:
>>> Continued from m last mail...
>>>
>>> The code snippet that I shared was after joining impression and
>>> notification logs. Here I am picking the line item and concatenating it
>>> with date. You can also see there is a check for a TARGETED_LINE_ITEM, I
>> am
>>> not emitting the data otherwise.
>>>
>>> -Sameer.
>>>
>>> On Sat, Jun 17, 2017 at 3:55 PM, Sameer Kumar <sam.kum.w...@gmail.com>
>>> wrote:
>>>
>>>> The example I gave was just for illustration. I have impression logs and
>>>> notification logs. Notification logs are essentially tied to impressions
>>>> served. An impression would serve multiple items.
>>>>
>>>> I was just trying to aggregate across a single line item, this means I
>> am
>>>> always generating a single key all the time and since data is
>> streaming, my
>>>> counter should keep on increasing.
>>>>
>>>> What I saw was that while on Machine1, the counter was 100 , another
>>>> machine it was at 1. I saw it as inconsistent.
>>>>
>>>>
>>>> -Sameer.
>>>>
>>>> On Fri, Jun 16, 2017 at 10:47 PM, Matthias J. Sax <
>> matth...@confluent.io>
>>>> wrote:
>>>>
>>>>> I just double checked you example code from an email before. There you
>>>>> are using:
>>>>>
>>>>> stream.flatMap(...)
>>>>>       .groupBy((k, v) -> k, Serdes.String(), Serdes.Integer())
>>>>>       .reduce((value1, value2) -> value1 + value2,
>>>>>
>>>>> In you last email, you say that you want to count on category that is
>>>>> contained in you value and your key is something different.
>>>>>
>>>>> If you want to count by category, you need to set the category as key
>> in
>>>>> you .groupBy() -- in you code snipped, you actually don't change the
>> key
>>>>> (seems to be the root cause of the issue you see).
>>>>>
>>>>> Does this help?
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>> On 6/15/17 11:48 PM, Sameer Kumar wrote:
>>>>>> Ok.. Let me try explain it again.
>>>>>>
>>>>>> So, Lets say my source processor has a different key, now the value
>>>>> that it
>>>>>> contains lets say contains an identifier type: which basically denotes
>>>>>> category and I am counting on different categories. A specific case
>>>>> would
>>>>>> be I do a filter and outputs only a specific category: lets say
>>>>> cat1...Now,
>>>>>> starting from source processor which is being processed across
>> multiple
>>>>>> nodes, cat1 would be written across multiple nodes, since state store
>> is
>>>>>> local..it will be starting from 1 on each of the nodes.
>>>>>>
>>>>>> Does it clarifies the doubt. I think after a processor the same gets
>>>>>> written to a intermediate Kafka topic and this should be taken care of
>>>>> but
>>>>>> this happens, I am not sure on it.
>>>>>>
>>>>>> Processor 1
>>>>>> key1, value:{xyz....cat1}
>>>>>> key2, value:{xyz....cat2}
>>>>>> key3, value:{xyz....cat3}
>>>>>>
>>>>>> Processor 2
>>>>>> Machine1
>>>>>> key: cat1, 1
>>>>>>
>>>>>> Machine2
>>>>>> key:cat1,1
>>>>>>
>>>>>>
>>>>>> -Sameer.
>>>>>>
>>>>>> On Thu, Jun 15, 2017 at 10:27 PM, Eno Thereska <
>> eno.there...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I'm not sure if I fully understand this but let me check:
>>>>>>>
>>>>>>> - if you start 2 instances, one instance will process half of the
>>>>>>> partitions, the other instance will process the other half
>>>>>>> - for any given key, like key 100, it will only be processed on one
>> of
>>>>> the
>>>>>>> instances, not both.
>>>>>>>
>>>>>>> Does this help?
>>>>>>>
>>>>>>> Eno
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>> On 15 Jun 2017, at 07:40, Sameer Kumar <sam.kum.w...@gmail.com>
>>>>> wrote:
>>>>>>>>
>>>>>>>> Also, I am writing a single key in the output all the time. I
>> believe
>>>>>>>> machine2 will have to write a key and since a state store is local
>> it
>>>>>>>> wouldn't know about the counter state on another machine. So, I
>> guess
>>>>>>> this
>>>>>>>> will happen.
>>>>>>>>
>>>>>>>> -Sameer.
>>>>>>>>
>>>>>>>> On Thu, Jun 15, 2017 at 11:11 AM, Sameer Kumar <
>>>>> sam.kum.w...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> The input topic contains 60 partitions and data is distributed well
>>>>>>> across
>>>>>>>>> different partitions on different keys. While consumption, I am
>> doing
>>>>>>> some
>>>>>>>>> filtering and writing only single key data.
>>>>>>>>>
>>>>>>>>> Output would be something of the form:- Machine 1
>>>>>>>>>
>>>>>>>>> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
>>>>>>>>> k=P:LIS:1236667:2017_06_13:I,v=651
>>>>>>>>> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
>>>>>>>>> k=P:LIS:1236667:2017_06_13:I,v=652
>>>>>>>>>
>>>>>>>>> Machine 2
>>>>>>>>> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
>>>>>>>>> k=P:LIS:1236667:2017_06_13:I,v=1
>>>>>>>>> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
>>>>>>>>> k=P:LIS:1236667:2017_06_13:I,v=2
>>>>>>>>>
>>>>>>>>> I am sharing a snippet of code,
>>>>>>>>>
>>>>>>>>> private KTable<String, Integer> extractLICount(KStream<
>>>>>>> Windowed<String>,
>>>>>>>>> AdLog> joinedImprLogs) {
>>>>>>>>>    KTable<String, Integer> liCount = joinedImprLogs.flatMap((key,
>>>>> value)
>>>>>>>>> -> {
>>>>>>>>>      List<KeyValue<String, Integer>> l = new ArrayList<>();
>>>>>>>>>      if (value == null) {
>>>>>>>>>        return l;
>>>>>>>>>      }
>>>>>>>>>      String date = new SimpleDateFormat("yyyy_MM_dd").format(new
>>>>>>>>> Date(key.window().end()));
>>>>>>>>>      // Lineitemids
>>>>>>>>>      if (value != null && value.getAdLogType() == 3) {
>>>>>>>>>        // log.info("Invalid data: " + value);
>>>>>>>>>        return l;
>>>>>>>>>      }
>>>>>>>>>      if (value.getAdLogType() == 2) {
>>>>>>>>>        long lineitemid = value.getAdClickLog().getItmClmbLId();
>>>>>>>>>        if (lineitemid == TARGETED_LI) {
>>>>>>>>>          String liKey = String.format("P:LIS:%s:%s:C", lineitemid,
>>>>>>> date);
>>>>>>>>>          l.add(new KeyValue<>(liKey, 1));
>>>>>>>>>        }
>>>>>>>>>        return l;
>>>>>>>>>      } else if (value.getAdLogType() == 1){
>>>>>>>>>
>>>>>>>>>        long[] lineitemids = value.getAdImprLog().getItmClmbLIds();
>>>>>>>>>        if (value.getAdImprLog().isVisible()) {
>>>>>>>>>          for (int i = 0; i < lineitemids.length; i++) {
>>>>>>>>>            long li = lineitemids[i];
>>>>>>>>>            if (li == TARGETED_LI) {
>>>>>>>>>              // log.info("valid impression ids= " +
>>>>>>>>> value.getAdImprLog().toString());
>>>>>>>>>              String liKey = String.format("P:LIS:%s:%s:I", li,
>> date);
>>>>>>>>>              l.add(new KeyValue<>(liKey, 1));
>>>>>>>>>            }
>>>>>>>>>          }
>>>>>>>>>        }
>>>>>>>>>        return l;
>>>>>>>>>      }
>>>>>>>>>      return l;
>>>>>>>>>    }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer())
>>>>>>>>>        .reduce((value1, value2) -> value1 + value2,
>>>>>>>>> LINE_ITEM_COUNT_STORE);
>>>>>>>>>    return liCount;
>>>>>>>>>  }
>>>>>>>>>
>>>>>>>>> On Wed, Jun 14, 2017 at 10:55 AM, Sameer Kumar <
>>>>> sam.kum.w...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> The input topic contains 60 partitions and data is distributed
>> well
>>>>>>>>>> across different partitions on different keys. While consumption,
>> I
>>>>> am
>>>>>>>>>> doing some filtering and writing only single key data.
>>>>>>>>>>
>>>>>>>>>> Output would be something of the form:- Machine 1
>>>>>>>>>>
>>>>>>>>>> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
>>>>>>>>>> k=P:LIS:1236667:2017_06_13:I,v=651
>>>>>>>>>> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
>>>>>>>>>> k=P:LIS:1236667:2017_06_13:I,v=652
>>>>>>>>>>
>>>>>>>>>> Machine 2
>>>>>>>>>> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
>>>>>>>>>> k=P:LIS:1236667:2017_06_13:I,v=1
>>>>>>>>>> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
>>>>>>>>>> k=P:LIS:1236667:2017_06_13:I,v=2
>>>>>>>>>>
>>>>>>>>>> I am sharing a snippet of code,
>>>>>>>>>>
>>>>>>>>>> private KTable<String, Integer> extractLICount(KStream<
>>>>>>> Windowed<String>,
>>>>>>>>>> AdLog> joinedImprLogs) {
>>>>>>>>>>    KTable<String, Integer> liCount = joinedImprLogs.flatMap((key,
>>>>>>> value)
>>>>>>>>>> -> {
>>>>>>>>>>      List<KeyValue<String, Integer>> l = new ArrayList<>();
>>>>>>>>>>      if (value == null) {
>>>>>>>>>>        return l;
>>>>>>>>>>      }
>>>>>>>>>>      String date = new SimpleDateFormat("yyyy_MM_dd").format(new
>>>>>>>>>> Date(key.window().end()));
>>>>>>>>>>      // Lineitemids
>>>>>>>>>>      if (value != null && value.getAdLogType() == 3) {
>>>>>>>>>>        // log.info("Invalid data: " + value);
>>>>>>>>>>        return l;
>>>>>>>>>>      }
>>>>>>>>>>      if (value.getAdLogType() == 2) {
>>>>>>>>>>        long lineitemid = value.getAdClickLog().getItmClmbLId();
>>>>>>>>>>        if (lineitemid == TARGETED_LI) {
>>>>>>>>>>          String liKey = String.format("P:LIS:%s:%s:C", lineitemid,
>>>>>>> date);
>>>>>>>>>>          l.add(new KeyValue<>(liKey, 1));
>>>>>>>>>>        }
>>>>>>>>>>        return l;
>>>>>>>>>>      } else if (value.getAdLogType() == 1){
>>>>>>>>>>
>>>>>>>>>>        long[] lineitemids = value.getAdImprLog().
>> getItmClmbLIds();
>>>>>>>>>>        if (value.getAdImprLog().isVisible()) {
>>>>>>>>>>          for (int i = 0; i < lineitemids.length; i++) {
>>>>>>>>>>            long li = lineitemids[i];
>>>>>>>>>>            if (li == TARGETED_LI) {
>>>>>>>>>>              // log.info("valid impression ids= " +
>>>>>>>>>> value.getAdImprLog().toString());
>>>>>>>>>>              String liKey = String.format("P:LIS:%s:%s:I", li,
>>>>> date);
>>>>>>>>>>              l.add(new KeyValue<>(liKey, 1));
>>>>>>>>>>            }
>>>>>>>>>>          }
>>>>>>>>>>        }
>>>>>>>>>>        return l;
>>>>>>>>>>      }
>>>>>>>>>>      return l;
>>>>>>>>>>    }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer())
>>>>>>>>>>        .reduce((value1, value2) -> value1 + value2,
>>>>>>>>>> LINE_ITEM_COUNT_STORE);
>>>>>>>>>>    return liCount;
>>>>>>>>>>  }
>>>>>>>>>>
>>>>>>>>>> -Sameer.
>>>>>>>>>>
>>>>>>>>>> On Tue, Jun 13, 2017 at 10:35 PM, Matthias J. Sax <
>>>>>>> matth...@confluent.io>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Sameer,
>>>>>>>>>>>
>>>>>>>>>>> if you write a single key, all your input data should be in a
>>>>> single
>>>>>>>>>>> partition. As Streams scales out via partitions, you cannot have
>> a
>>>>>>>>>>> second instance as data from one partition is never split. Thus,
>>>>> all
>>>>>>>>>>> data will go to one instance while the second instance should be
>>>>> idle.
>>>>>>>>>>>
>>>>>>>>>>> So what I don't understand is, why you see that machine 2 output
>> a
>>>>>>>>>>> counter of 1 -- it should not output anything. Maybe you can give
>>>>> some
>>>>>>>>>>> more details about your setup?
>>>>>>>>>>>
>>>>>>>>>>> -Matthias
>>>>>>>>>>>
>>>>>>>>>>> On 6/13/17 5:00 AM, Sameer Kumar wrote:
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> I witnessed a strange behaviour in KafkaStreams, need help in
>>>>>>>>>>> understanding
>>>>>>>>>>>> the same.
>>>>>>>>>>>>
>>>>>>>>>>>> I created an application for aggregating clicks per user, I want
>>>>> to
>>>>>>>>>>> process
>>>>>>>>>>>> it only for 1 user( i was writing only a single key).
>>>>>>>>>>>> When I ran application on one machine, it was running fine.Now,
>> to
>>>>>>>>>>>> loadbalance it , I started another node.
>>>>>>>>>>>>
>>>>>>>>>>>> My expectation was that the counter should be in sync, i.e. if
>>>>> output
>>>>>>>>>>> from
>>>>>>>>>>>> one machine is key, 100 and another machine should read this and
>>>>> the
>>>>>>>>>>> value
>>>>>>>>>>>> should be key, 101.
>>>>>>>>>>>> But, that didnt happened. Instead, on machine 2, the counter
>>>>> started
>>>>>>>>>>> with 1.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> -Sameer.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to