Also, I am writing a single key in the output all the time. I believe
machine2 will have to write a key and since a state store is local it
wouldn't know about the counter state on another machine. So, I guess this
will happen.

-Sameer.

On Thu, Jun 15, 2017 at 11:11 AM, Sameer Kumar <sam.kum.w...@gmail.com>
wrote:

> The input topic contains 60 partitions and data is distributed well across
> different partitions on different keys. While consumption, I am doing some
> filtering and writing only single key data.
>
> Output would be something of the form:- Machine 1
>
> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
> k=P:LIS:1236667:2017_06_13:I,v=651
> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
> k=P:LIS:1236667:2017_06_13:I,v=652
>
> Machine 2
> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
> k=P:LIS:1236667:2017_06_13:I,v=1
> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
> k=P:LIS:1236667:2017_06_13:I,v=2
>
> I am sharing a snippet of code,
>
> private KTable<String, Integer> extractLICount(KStream<Windowed<String>,
> AdLog> joinedImprLogs) {
>     KTable<String, Integer> liCount = joinedImprLogs.flatMap((key, value)
> -> {
>       List<KeyValue<String, Integer>> l = new ArrayList<>();
>       if (value == null) {
>         return l;
>       }
>       String date = new SimpleDateFormat("yyyy_MM_dd").format(new
> Date(key.window().end()));
>       // Lineitemids
>       if (value != null && value.getAdLogType() == 3) {
>         // log.info("Invalid data: " + value);
>         return l;
>       }
>       if (value.getAdLogType() == 2) {
>         long lineitemid = value.getAdClickLog().getItmClmbLId();
>         if (lineitemid == TARGETED_LI) {
>           String liKey = String.format("P:LIS:%s:%s:C", lineitemid, date);
>           l.add(new KeyValue<>(liKey, 1));
>         }
>         return l;
>       } else if (value.getAdLogType() == 1){
>
>         long[] lineitemids = value.getAdImprLog().getItmClmbLIds();
>         if (value.getAdImprLog().isVisible()) {
>           for (int i = 0; i < lineitemids.length; i++) {
>             long li = lineitemids[i];
>             if (li == TARGETED_LI) {
>               // log.info("valid impression ids= " +
> value.getAdImprLog().toString());
>               String liKey = String.format("P:LIS:%s:%s:I", li, date);
>               l.add(new KeyValue<>(liKey, 1));
>             }
>           }
>         }
>         return l;
>       }
>       return l;
>     }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer())
>         .reduce((value1, value2) -> value1 + value2,
> LINE_ITEM_COUNT_STORE);
>     return liCount;
>   }
>
> On Wed, Jun 14, 2017 at 10:55 AM, Sameer Kumar <sam.kum.w...@gmail.com>
> wrote:
>
>> The input topic contains 60 partitions and data is distributed well
>> across different partitions on different keys. While consumption, I am
>> doing some filtering and writing only single key data.
>>
>> Output would be something of the form:- Machine 1
>>
>> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
>> k=P:LIS:1236667:2017_06_13:I,v=651
>> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
>> k=P:LIS:1236667:2017_06_13:I,v=652
>>
>> Machine 2
>> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
>> k=P:LIS:1236667:2017_06_13:I,v=1
>> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
>> k=P:LIS:1236667:2017_06_13:I,v=2
>>
>> I am sharing a snippet of code,
>>
>> private KTable<String, Integer> extractLICount(KStream<Windowed<String>,
>> AdLog> joinedImprLogs) {
>>     KTable<String, Integer> liCount = joinedImprLogs.flatMap((key, value)
>> -> {
>>       List<KeyValue<String, Integer>> l = new ArrayList<>();
>>       if (value == null) {
>>         return l;
>>       }
>>       String date = new SimpleDateFormat("yyyy_MM_dd").format(new
>> Date(key.window().end()));
>>       // Lineitemids
>>       if (value != null && value.getAdLogType() == 3) {
>>         // log.info("Invalid data: " + value);
>>         return l;
>>       }
>>       if (value.getAdLogType() == 2) {
>>         long lineitemid = value.getAdClickLog().getItmClmbLId();
>>         if (lineitemid == TARGETED_LI) {
>>           String liKey = String.format("P:LIS:%s:%s:C", lineitemid, date);
>>           l.add(new KeyValue<>(liKey, 1));
>>         }
>>         return l;
>>       } else if (value.getAdLogType() == 1){
>>
>>         long[] lineitemids = value.getAdImprLog().getItmClmbLIds();
>>         if (value.getAdImprLog().isVisible()) {
>>           for (int i = 0; i < lineitemids.length; i++) {
>>             long li = lineitemids[i];
>>             if (li == TARGETED_LI) {
>>               // log.info("valid impression ids= " +
>> value.getAdImprLog().toString());
>>               String liKey = String.format("P:LIS:%s:%s:I", li, date);
>>               l.add(new KeyValue<>(liKey, 1));
>>             }
>>           }
>>         }
>>         return l;
>>       }
>>       return l;
>>     }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer())
>>         .reduce((value1, value2) -> value1 + value2,
>> LINE_ITEM_COUNT_STORE);
>>     return liCount;
>>   }
>>
>> -Sameer.
>>
>> On Tue, Jun 13, 2017 at 10:35 PM, Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>
>>> Sameer,
>>>
>>> if you write a single key, all your input data should be in a single
>>> partition. As Streams scales out via partitions, you cannot have a
>>> second instance as data from one partition is never split. Thus, all
>>> data will go to one instance while the second instance should be idle.
>>>
>>> So what I don't understand is, why you see that machine 2 output a
>>> counter of 1 -- it should not output anything. Maybe you can give some
>>> more details about your setup?
>>>
>>> -Matthias
>>>
>>> On 6/13/17 5:00 AM, Sameer Kumar wrote:
>>> > Hi,
>>> >
>>> > I witnessed a strange behaviour in KafkaStreams, need help in
>>> understanding
>>> > the same.
>>> >
>>> > I created an application for aggregating clicks per user, I want to
>>> process
>>> > it only for 1 user( i was writing only a single key).
>>> > When I ran application on one machine, it was running fine.Now, to
>>> > loadbalance it , I started another node.
>>> >
>>> > My expectation was that the counter should be in sync, i.e. if output
>>> from
>>> > one machine is key, 100 and another machine should read this and the
>>> value
>>> > should be key, 101.
>>> > But, that didnt happened. Instead, on machine 2, the counter started
>>> with 1.
>>> >
>>> >
>>> > Regards,
>>> > -Sameer.
>>> >
>>>
>>>
>>
>

Reply via email to