Hi Sameer, With regard to
>>> What I saw was that while on Machine1, the counter was 100 , another >>> machine it was at 1. I saw it as inconsistent. If you really see the same key on different machines, that would be incorrect. All record with the same key, must be processed by the same machine. Thus, there not not be two counters, but only one. I am wondering what version of Kafka Streams you are using? Version 0.10.0.x does not have auto-repartitioning feature, and this might explain what you see. For this case, you need to call #through() before doing the aggregation. -Matthias On 6/17/17 3:28 AM, Sameer Kumar wrote: > Continued from m last mail... > > The code snippet that I shared was after joining impression and > notification logs. Here I am picking the line item and concatenating it > with date. You can also see there is a check for a TARGETED_LINE_ITEM, I am > not emitting the data otherwise. > > -Sameer. > > On Sat, Jun 17, 2017 at 3:55 PM, Sameer Kumar <sam.kum.w...@gmail.com> > wrote: > >> The example I gave was just for illustration. I have impression logs and >> notification logs. Notification logs are essentially tied to impressions >> served. An impression would serve multiple items. >> >> I was just trying to aggregate across a single line item, this means I am >> always generating a single key all the time and since data is streaming, my >> counter should keep on increasing. >> >> What I saw was that while on Machine1, the counter was 100 , another >> machine it was at 1. I saw it as inconsistent. >> >> >> -Sameer. >> >> On Fri, Jun 16, 2017 at 10:47 PM, Matthias J. Sax <matth...@confluent.io> >> wrote: >> >>> I just double checked you example code from an email before. There you >>> are using: >>> >>> stream.flatMap(...) >>> .groupBy((k, v) -> k, Serdes.String(), Serdes.Integer()) >>> .reduce((value1, value2) -> value1 + value2, >>> >>> In you last email, you say that you want to count on category that is >>> contained in you value and your key is something different. >>> >>> If you want to count by category, you need to set the category as key in >>> you .groupBy() -- in you code snipped, you actually don't change the key >>> (seems to be the root cause of the issue you see). >>> >>> Does this help? >>> >>> >>> -Matthias >>> >>> >>> On 6/15/17 11:48 PM, Sameer Kumar wrote: >>>> Ok.. Let me try explain it again. >>>> >>>> So, Lets say my source processor has a different key, now the value >>> that it >>>> contains lets say contains an identifier type: which basically denotes >>>> category and I am counting on different categories. A specific case >>> would >>>> be I do a filter and outputs only a specific category: lets say >>> cat1...Now, >>>> starting from source processor which is being processed across multiple >>>> nodes, cat1 would be written across multiple nodes, since state store is >>>> local..it will be starting from 1 on each of the nodes. >>>> >>>> Does it clarifies the doubt. I think after a processor the same gets >>>> written to a intermediate Kafka topic and this should be taken care of >>> but >>>> this happens, I am not sure on it. >>>> >>>> Processor 1 >>>> key1, value:{xyz....cat1} >>>> key2, value:{xyz....cat2} >>>> key3, value:{xyz....cat3} >>>> >>>> Processor 2 >>>> Machine1 >>>> key: cat1, 1 >>>> >>>> Machine2 >>>> key:cat1,1 >>>> >>>> >>>> -Sameer. >>>> >>>> On Thu, Jun 15, 2017 at 10:27 PM, Eno Thereska <eno.there...@gmail.com> >>>> wrote: >>>> >>>>> I'm not sure if I fully understand this but let me check: >>>>> >>>>> - if you start 2 instances, one instance will process half of the >>>>> partitions, the other instance will process the other half >>>>> - for any given key, like key 100, it will only be processed on one of >>> the >>>>> instances, not both. >>>>> >>>>> Does this help? >>>>> >>>>> Eno >>>>> >>>>> >>>>> >>>>>> On 15 Jun 2017, at 07:40, Sameer Kumar <sam.kum.w...@gmail.com> >>> wrote: >>>>>> >>>>>> Also, I am writing a single key in the output all the time. I believe >>>>>> machine2 will have to write a key and since a state store is local it >>>>>> wouldn't know about the counter state on another machine. So, I guess >>>>> this >>>>>> will happen. >>>>>> >>>>>> -Sameer. >>>>>> >>>>>> On Thu, Jun 15, 2017 at 11:11 AM, Sameer Kumar < >>> sam.kum.w...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> The input topic contains 60 partitions and data is distributed well >>>>> across >>>>>>> different partitions on different keys. While consumption, I am doing >>>>> some >>>>>>> filtering and writing only single key data. >>>>>>> >>>>>>> Output would be something of the form:- Machine 1 >>>>>>> >>>>>>> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount >>>>>>> k=P:LIS:1236667:2017_06_13:I,v=651 >>>>>>> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount >>>>>>> k=P:LIS:1236667:2017_06_13:I,v=652 >>>>>>> >>>>>>> Machine 2 >>>>>>> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount >>>>>>> k=P:LIS:1236667:2017_06_13:I,v=1 >>>>>>> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount >>>>>>> k=P:LIS:1236667:2017_06_13:I,v=2 >>>>>>> >>>>>>> I am sharing a snippet of code, >>>>>>> >>>>>>> private KTable<String, Integer> extractLICount(KStream< >>>>> Windowed<String>, >>>>>>> AdLog> joinedImprLogs) { >>>>>>> KTable<String, Integer> liCount = joinedImprLogs.flatMap((key, >>> value) >>>>>>> -> { >>>>>>> List<KeyValue<String, Integer>> l = new ArrayList<>(); >>>>>>> if (value == null) { >>>>>>> return l; >>>>>>> } >>>>>>> String date = new SimpleDateFormat("yyyy_MM_dd").format(new >>>>>>> Date(key.window().end())); >>>>>>> // Lineitemids >>>>>>> if (value != null && value.getAdLogType() == 3) { >>>>>>> // log.info("Invalid data: " + value); >>>>>>> return l; >>>>>>> } >>>>>>> if (value.getAdLogType() == 2) { >>>>>>> long lineitemid = value.getAdClickLog().getItmClmbLId(); >>>>>>> if (lineitemid == TARGETED_LI) { >>>>>>> String liKey = String.format("P:LIS:%s:%s:C", lineitemid, >>>>> date); >>>>>>> l.add(new KeyValue<>(liKey, 1)); >>>>>>> } >>>>>>> return l; >>>>>>> } else if (value.getAdLogType() == 1){ >>>>>>> >>>>>>> long[] lineitemids = value.getAdImprLog().getItmClmbLIds(); >>>>>>> if (value.getAdImprLog().isVisible()) { >>>>>>> for (int i = 0; i < lineitemids.length; i++) { >>>>>>> long li = lineitemids[i]; >>>>>>> if (li == TARGETED_LI) { >>>>>>> // log.info("valid impression ids= " + >>>>>>> value.getAdImprLog().toString()); >>>>>>> String liKey = String.format("P:LIS:%s:%s:I", li, date); >>>>>>> l.add(new KeyValue<>(liKey, 1)); >>>>>>> } >>>>>>> } >>>>>>> } >>>>>>> return l; >>>>>>> } >>>>>>> return l; >>>>>>> }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer()) >>>>>>> .reduce((value1, value2) -> value1 + value2, >>>>>>> LINE_ITEM_COUNT_STORE); >>>>>>> return liCount; >>>>>>> } >>>>>>> >>>>>>> On Wed, Jun 14, 2017 at 10:55 AM, Sameer Kumar < >>> sam.kum.w...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> The input topic contains 60 partitions and data is distributed well >>>>>>>> across different partitions on different keys. While consumption, I >>> am >>>>>>>> doing some filtering and writing only single key data. >>>>>>>> >>>>>>>> Output would be something of the form:- Machine 1 >>>>>>>> >>>>>>>> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount >>>>>>>> k=P:LIS:1236667:2017_06_13:I,v=651 >>>>>>>> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount >>>>>>>> k=P:LIS:1236667:2017_06_13:I,v=652 >>>>>>>> >>>>>>>> Machine 2 >>>>>>>> 2017-06-13 16:49:10 INFO LICountClickImprMR2:116 - licount >>>>>>>> k=P:LIS:1236667:2017_06_13:I,v=1 >>>>>>>> 2017-06-13 16:49:30 INFO LICountClickImprMR2:116 - licount >>>>>>>> k=P:LIS:1236667:2017_06_13:I,v=2 >>>>>>>> >>>>>>>> I am sharing a snippet of code, >>>>>>>> >>>>>>>> private KTable<String, Integer> extractLICount(KStream< >>>>> Windowed<String>, >>>>>>>> AdLog> joinedImprLogs) { >>>>>>>> KTable<String, Integer> liCount = joinedImprLogs.flatMap((key, >>>>> value) >>>>>>>> -> { >>>>>>>> List<KeyValue<String, Integer>> l = new ArrayList<>(); >>>>>>>> if (value == null) { >>>>>>>> return l; >>>>>>>> } >>>>>>>> String date = new SimpleDateFormat("yyyy_MM_dd").format(new >>>>>>>> Date(key.window().end())); >>>>>>>> // Lineitemids >>>>>>>> if (value != null && value.getAdLogType() == 3) { >>>>>>>> // log.info("Invalid data: " + value); >>>>>>>> return l; >>>>>>>> } >>>>>>>> if (value.getAdLogType() == 2) { >>>>>>>> long lineitemid = value.getAdClickLog().getItmClmbLId(); >>>>>>>> if (lineitemid == TARGETED_LI) { >>>>>>>> String liKey = String.format("P:LIS:%s:%s:C", lineitemid, >>>>> date); >>>>>>>> l.add(new KeyValue<>(liKey, 1)); >>>>>>>> } >>>>>>>> return l; >>>>>>>> } else if (value.getAdLogType() == 1){ >>>>>>>> >>>>>>>> long[] lineitemids = value.getAdImprLog().getItmClmbLIds(); >>>>>>>> if (value.getAdImprLog().isVisible()) { >>>>>>>> for (int i = 0; i < lineitemids.length; i++) { >>>>>>>> long li = lineitemids[i]; >>>>>>>> if (li == TARGETED_LI) { >>>>>>>> // log.info("valid impression ids= " + >>>>>>>> value.getAdImprLog().toString()); >>>>>>>> String liKey = String.format("P:LIS:%s:%s:I", li, >>> date); >>>>>>>> l.add(new KeyValue<>(liKey, 1)); >>>>>>>> } >>>>>>>> } >>>>>>>> } >>>>>>>> return l; >>>>>>>> } >>>>>>>> return l; >>>>>>>> }).groupBy((k, v) -> k, Serdes.String(), Serdes.Integer()) >>>>>>>> .reduce((value1, value2) -> value1 + value2, >>>>>>>> LINE_ITEM_COUNT_STORE); >>>>>>>> return liCount; >>>>>>>> } >>>>>>>> >>>>>>>> -Sameer. >>>>>>>> >>>>>>>> On Tue, Jun 13, 2017 at 10:35 PM, Matthias J. Sax < >>>>> matth...@confluent.io> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Sameer, >>>>>>>>> >>>>>>>>> if you write a single key, all your input data should be in a >>> single >>>>>>>>> partition. As Streams scales out via partitions, you cannot have a >>>>>>>>> second instance as data from one partition is never split. Thus, >>> all >>>>>>>>> data will go to one instance while the second instance should be >>> idle. >>>>>>>>> >>>>>>>>> So what I don't understand is, why you see that machine 2 output a >>>>>>>>> counter of 1 -- it should not output anything. Maybe you can give >>> some >>>>>>>>> more details about your setup? >>>>>>>>> >>>>>>>>> -Matthias >>>>>>>>> >>>>>>>>> On 6/13/17 5:00 AM, Sameer Kumar wrote: >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> I witnessed a strange behaviour in KafkaStreams, need help in >>>>>>>>> understanding >>>>>>>>>> the same. >>>>>>>>>> >>>>>>>>>> I created an application for aggregating clicks per user, I want >>> to >>>>>>>>> process >>>>>>>>>> it only for 1 user( i was writing only a single key). >>>>>>>>>> When I ran application on one machine, it was running fine.Now, to >>>>>>>>>> loadbalance it , I started another node. >>>>>>>>>> >>>>>>>>>> My expectation was that the counter should be in sync, i.e. if >>> output >>>>>>>>> from >>>>>>>>>> one machine is key, 100 and another machine should read this and >>> the >>>>>>>>> value >>>>>>>>>> should be key, 101. >>>>>>>>>> But, that didnt happened. Instead, on machine 2, the counter >>> started >>>>>>>>> with 1. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> -Sameer. >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>>>> >>>> >>> >>> >> >
signature.asc
Description: OpenPGP digital signature