Hi Matthias,

I am working on 10.2.1. and I do see same outputs on same keys albeit not
always and a bit random.

I shall try to code a simpler example free from domain level code and share
it with you in a while.

I am willing to debug this further as well, if you could tell me the
classes that I should be looking into.

-Sameer.

On Tue, Jun 20, 2017 at 3:51 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Hi Sameer,
>
> With regard to
>
> >>> What I saw was that while on Machine1, the counter was 100 , another
> >>> machine it was at 1. I saw it as inconsistent.
>
> If you really see the same key on different machines, that would be
> incorrect. All record with the same key, must be processed by the same
> machine. Thus, there not not be two counters, but only one.
>
> I am wondering what version of Kafka Streams you are using? Version
> 0.10.0.x does not have auto-repartitioning feature, and this might
> explain what you see. For this case, you need to call #through() before
> doing the aggregation.
>
>
> -Matthias
>
>
> On 6/17/17 3:28 AM, Sameer Kumar wrote:
> > Continued from m last mail...
> >
> > The code snippet that I shared was after joining impression and
> > notification logs. Here I am picking the line item and concatenating it
> > with date. You can also see there is a check for a TARGETED_LINE_ITEM, I
> am
> > not emitting the data otherwise.
> >
> > -Sameer.
> >
> > On Sat, Jun 17, 2017 at 3:55 PM, Sameer Kumar <sam.kum.w...@gmail.com>
> > wrote:
> >
> >> The example I gave was just for illustration. I have impression logs and
> >> notification logs. Notification logs are essentially tied to impressions
> >> served. An impression would serve multiple items.
> >>
> >> I was just trying to aggregate across a single line item, this means I
> am
> >> always generating a single key all the time and since data is
> streaming, my
> >> counter should keep on increasing.
> >>
> >> What I saw was that while on Machine1, the counter was 100 , another
> >> machine it was at 1. I saw it as inconsistent.
> >>
> >>
> >> -Sameer.
> >>
> >> On Fri, Jun 16, 2017 at 10:47 PM, Matthias J. Sax <
> matth...@confluent.io>
> >> wrote:
> >>
> >>> I just double checked you example code from an email before. There you
> >>> are using:
> >>>
> >>> stream.flatMap(...)
> >>>       .groupBy((k, v) -> k, Serdes.String(), Serdes.Integer())
> >>>       .reduce((value1, value2) -> value1 + value2,
> >>>
> >>> In you last email, you say that you want to count on category that is
> >>> contained in you value and your key is something different.
> >>>
> >>> If you want to count by category, you need to set the category as key
> in
> >>> you .groupBy() -- in you code snipped, you actually don't change the
> key
> >>> (seems to be the root cause of the issue you see).
> >>>
> >>> Does this help?
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 6/15/17 11:48 PM, Sameer Kumar wrote:
> >>>> Ok.. Let me try explain it again.
> >>>>
> >>>> So, Lets say my source processor has a different key, now the value
> >>> that it
> >>>> contains lets say contains an identifier type: which basically denotes
> >>>> category and I am counting on different categories. A specific case
> >>> would
> >>>> be I do a filter and outputs only a specific category: lets say
> >>> cat1...Now,
> >>>> starting from source processor which is being processed across
> multiple
> >>>> nodes, cat1 would be written across multiple nodes, since state store
> is
> >>>> local..it will be starting from 1 on each of the nodes.
> >>>>
> >>>> Does it clarifies the doubt. I think after a processor the same gets
> >>>> written to a intermediate Kafka topic and this should be taken care of
> >>> but
> >>>> this happens, I am not sure on it.
> >>>>
> >>>> Processor 1
> >>>> key1, value:{xyz....cat1}
> >>>> key2, value:{xyz....cat2}
> >>>> key3, value:{xyz....cat3}
> >>>>
> >>>> Processor 2
> >>>> Machine1
> >>>> key: cat1, 1
> >>>>
> >>>> Machine2
> >>>> key:cat1,1
> >>>>
> >>>>
> >>>> -Sameer.
> >>>>
> >>>> On Thu, Jun 15, 2017 at 10:27 PM, Eno Thereska <
> eno.there...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> I'm not sure if I fully understand this but let me check:
> >>>>>
> >>>>> - if you start 2 instances, one instance will process half of the
> >>>>> partitions, the other instance will process the other half
> >>>>> - for any given key, like key 100, it will only be processed on one
> of
> >>> the
> >>>>> instances, not both.
> >>>>>
> >>>>> Does this help?
> >>>>>
> >>>>> Eno
> >>>>>
> >>>>>
> >>>>>
> >>>>>> On 15 Jun 2017, at 07:40, Sameer Kumar <sam.kum.w...@gmail.com>
> >>> wrote:
> >>>>>>
> >>>>>> Also, I am writing a single key in the output all the time. I
> believe
> >>>>>> machine2 will have to write a key and since a state store is local
> it
> >>>>>> wouldn't know about the counter state on another machine. So, I
> guess
> >>>>> this
> >>>>>> will happen.
> >>>>>>
> >>>>>> -Sameer.
> >>>>>>
> >>>>>> On Thu, Jun 15, 2017 at 11:11 AM, Sameer Kumar <
> >>> sam.kum.w...@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> The input topic contains 60 partitions and data is distributed well
> >>>>> across
> >>>>>>> different partitions on different keys. While consumption, I am
> doing
> >>>>> some
> >>>>>>> filtering and writing only single key data.
> >>>>>>>
> >>>>>>> Output would be something of the form:- Machine 1
> >>>>>>>
> >>>>>>> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
> >>>>>>> k=P:LIS:1236667:2017_06_13:I,v=651
> >>>>>>> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
> >>>>>>> k=P:LIS:1236667:2017_06_13:I,v=652
> >>>>>>>
> >>>>>>> Machine 2
> >>>>>>> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
> >>>>>>> k=P:LIS:1236667:2017_06_13:I,v=1
> >>>>>>> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
> >>>>>>> k=P:LIS:1236667:2017_06_13:I,v=2
> >>>>>>>
> >>>>>>> I am sharing a snippet of code,
> >>>>>>>
> >>>>>>> private KTable<String, Integer> extractLICount(KStream<
> >>>>> Windowed<String>,
> >>>>>>> AdLog> joinedImprLogs) {
> >>>>>>>    KTable<String, Integer> liCount = joinedImprLogs.flatMap((key,
> >>> value)
> >>>>>>> -> {
> >>>>>>>      List<KeyValue<String, Integer>> l = new ArrayList<>();
> >>>>>>>      if (value == null) {
> >>>>>>>        return l;
> >>>>>>>      }
> >>>>>>>      String date = new SimpleDateFormat("yyyy_MM_dd").format(new
> >>>>>>> Date(key.window().end()));
> >>>>>>>      // Lineitemids
> >>>>>>>      if (value != null && value.getAdLogType() == 3) {
> >>>>>>>        // log.info("Invalid data: " + value);
> >>>>>>>        return l;
> >>>>>>>      }
> >>>>>>>      if (value.getAdLogType() == 2) {
> >>>>>>>        long lineitemid = value.getAdClickLog().getItmClmbLId();
> >>>>>>>        if (lineitemid == TARGETED_LI) {
> >>>>>>>          String liKey = String.format("P:LIS:%s:%s:C", lineitemid,
> >>>>> date);
> >>>>>>>          l.add(new KeyValue<>(liKey, 1));
> >>>>>>>        }
> >>>>>>>        return l;
> >>>>>>>      } else if (value.getAdLogType() == 1){
> >>>>>>>
> >>>>>>>        long[] lineitemids = value.getAdImprLog().getItmClmbLIds();
> >>>>>>>        if (value.getAdImprLog().isVisible()) {
> >>>>>>>          for (int i = 0; i < lineitemids.length; i++) {
> >>>>>>>            long li = lineitemids[i];
> >>>>>>>            if (li == TARGETED_LI) {
> >>>>>>>              // log.info("valid impression ids= " +
> >>>>>>> value.getAdImprLog().toString());
> >>>>>>>              String liKey = String.format("P:LIS:%s:%s:I", li,
> date);
> >>>>>>>              l.add(new KeyValue<>(liKey, 1));
> >>>>>>>            }
> >>>>>>>          }
> >>>>>>>        }
> >>>>>>>        return l;
> >>>>>>>      }
> >>>>>>>      return l;
> >>>>>>>    }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer())
> >>>>>>>        .reduce((value1, value2) -> value1 + value2,
> >>>>>>> LINE_ITEM_COUNT_STORE);
> >>>>>>>    return liCount;
> >>>>>>>  }
> >>>>>>>
> >>>>>>> On Wed, Jun 14, 2017 at 10:55 AM, Sameer Kumar <
> >>> sam.kum.w...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> The input topic contains 60 partitions and data is distributed
> well
> >>>>>>>> across different partitions on different keys. While consumption,
> I
> >>> am
> >>>>>>>> doing some filtering and writing only single key data.
> >>>>>>>>
> >>>>>>>> Output would be something of the form:- Machine 1
> >>>>>>>>
> >>>>>>>> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
> >>>>>>>> k=P:LIS:1236667:2017_06_13:I,v=651
> >>>>>>>> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
> >>>>>>>> k=P:LIS:1236667:2017_06_13:I,v=652
> >>>>>>>>
> >>>>>>>> Machine 2
> >>>>>>>> 2017-06-13 16:49:10 INFO  LICountClickImprMR2:116 - licount
> >>>>>>>> k=P:LIS:1236667:2017_06_13:I,v=1
> >>>>>>>> 2017-06-13 16:49:30 INFO  LICountClickImprMR2:116 - licount
> >>>>>>>> k=P:LIS:1236667:2017_06_13:I,v=2
> >>>>>>>>
> >>>>>>>> I am sharing a snippet of code,
> >>>>>>>>
> >>>>>>>> private KTable<String, Integer> extractLICount(KStream<
> >>>>> Windowed<String>,
> >>>>>>>> AdLog> joinedImprLogs) {
> >>>>>>>>    KTable<String, Integer> liCount = joinedImprLogs.flatMap((key,
> >>>>> value)
> >>>>>>>> -> {
> >>>>>>>>      List<KeyValue<String, Integer>> l = new ArrayList<>();
> >>>>>>>>      if (value == null) {
> >>>>>>>>        return l;
> >>>>>>>>      }
> >>>>>>>>      String date = new SimpleDateFormat("yyyy_MM_dd").format(new
> >>>>>>>> Date(key.window().end()));
> >>>>>>>>      // Lineitemids
> >>>>>>>>      if (value != null && value.getAdLogType() == 3) {
> >>>>>>>>        // log.info("Invalid data: " + value);
> >>>>>>>>        return l;
> >>>>>>>>      }
> >>>>>>>>      if (value.getAdLogType() == 2) {
> >>>>>>>>        long lineitemid = value.getAdClickLog().getItmClmbLId();
> >>>>>>>>        if (lineitemid == TARGETED_LI) {
> >>>>>>>>          String liKey = String.format("P:LIS:%s:%s:C", lineitemid,
> >>>>> date);
> >>>>>>>>          l.add(new KeyValue<>(liKey, 1));
> >>>>>>>>        }
> >>>>>>>>        return l;
> >>>>>>>>      } else if (value.getAdLogType() == 1){
> >>>>>>>>
> >>>>>>>>        long[] lineitemids = value.getAdImprLog().
> getItmClmbLIds();
> >>>>>>>>        if (value.getAdImprLog().isVisible()) {
> >>>>>>>>          for (int i = 0; i < lineitemids.length; i++) {
> >>>>>>>>            long li = lineitemids[i];
> >>>>>>>>            if (li == TARGETED_LI) {
> >>>>>>>>              // log.info("valid impression ids= " +
> >>>>>>>> value.getAdImprLog().toString());
> >>>>>>>>              String liKey = String.format("P:LIS:%s:%s:I", li,
> >>> date);
> >>>>>>>>              l.add(new KeyValue<>(liKey, 1));
> >>>>>>>>            }
> >>>>>>>>          }
> >>>>>>>>        }
> >>>>>>>>        return l;
> >>>>>>>>      }
> >>>>>>>>      return l;
> >>>>>>>>    }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer())
> >>>>>>>>        .reduce((value1, value2) -> value1 + value2,
> >>>>>>>> LINE_ITEM_COUNT_STORE);
> >>>>>>>>    return liCount;
> >>>>>>>>  }
> >>>>>>>>
> >>>>>>>> -Sameer.
> >>>>>>>>
> >>>>>>>> On Tue, Jun 13, 2017 at 10:35 PM, Matthias J. Sax <
> >>>>> matth...@confluent.io>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Sameer,
> >>>>>>>>>
> >>>>>>>>> if you write a single key, all your input data should be in a
> >>> single
> >>>>>>>>> partition. As Streams scales out via partitions, you cannot have
> a
> >>>>>>>>> second instance as data from one partition is never split. Thus,
> >>> all
> >>>>>>>>> data will go to one instance while the second instance should be
> >>> idle.
> >>>>>>>>>
> >>>>>>>>> So what I don't understand is, why you see that machine 2 output
> a
> >>>>>>>>> counter of 1 -- it should not output anything. Maybe you can give
> >>> some
> >>>>>>>>> more details about your setup?
> >>>>>>>>>
> >>>>>>>>> -Matthias
> >>>>>>>>>
> >>>>>>>>> On 6/13/17 5:00 AM, Sameer Kumar wrote:
> >>>>>>>>>> Hi,
> >>>>>>>>>>
> >>>>>>>>>> I witnessed a strange behaviour in KafkaStreams, need help in
> >>>>>>>>> understanding
> >>>>>>>>>> the same.
> >>>>>>>>>>
> >>>>>>>>>> I created an application for aggregating clicks per user, I want
> >>> to
> >>>>>>>>> process
> >>>>>>>>>> it only for 1 user( i was writing only a single key).
> >>>>>>>>>> When I ran application on one machine, it was running fine.Now,
> to
> >>>>>>>>>> loadbalance it , I started another node.
> >>>>>>>>>>
> >>>>>>>>>> My expectation was that the counter should be in sync, i.e. if
> >>> output
> >>>>>>>>> from
> >>>>>>>>>> one machine is key, 100 and another machine should read this and
> >>> the
> >>>>>>>>> value
> >>>>>>>>>> should be key, 101.
> >>>>>>>>>> But, that didnt happened. Instead, on machine 2, the counter
> >>> started
> >>>>>>>>> with 1.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>> -Sameer.
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>
> >
>
>

Reply via email to