No, I do not have any logs before call to mapValues call but I will add them and let you know. Although I did see the negative values in the peek method above. Job Summary class is a very simple class and doesn't do any processing.
[image: image.png] Here's the constructor for JobSummary class used in this app. public JobSummary(JobTransaction j, Long count){ this.setUser_id(j.getUser_id()); this.setHub_id(j.getHub_id()); this.setCity_id(j.getCity_id()); this.setCompany_id(j.getCompany_id()); this.setJob_master_id(j.getJob_master_id()); this.setJob_status_id(j.getJob_status_id()); this.setCount(count); this.setDate(j.getDate()); } I have added some more information in the StackOverflow thread below in case you'd like to add there. https://stackoverflow.com/questions/54592303/kafka-streams-kgroupedtable-count-returning-negative-value-hows-that-possibl On Fri, Feb 8, 2019 at 7:25 PM Bill Bejeck <b...@confluent.io> wrote: > Hi Ankur, > > I understand. Let's see if we can narrow things down some without any > logging. > > Where exactly are you seeing the negative number from the code above? > Have you confirmed the count is negative by observing the results of the > count().toStream() before the mapValues call? > > > Thanks! > Bill > > On Fri, Feb 8, 2019 at 1:31 PM Ankur Rana <ankur.r...@getfareye.com> > wrote: > > > Hi Bill, > > > > I will try to make that change but since the negative values are really > > rare, It would be very difficult to capture the logs at that point. I am > > seeing this issue only in production. I will see what I can do. > > > > Also, the negative count is not limited to smaller values, but I have > seen > > values as high as -300. > > I cannot get my head around this behavior. > > > > > > On Fri, Feb 8, 2019 at 5:56 PM Bill Bejeck <b...@confluent.io> wrote: > > > > > Hi Ankur, > > > > > > Could you add an additional peek method logging what's going into the > > > groupBy call and share the logs? > > > > > > Thanks, > > > Bill > > > > > > On Fri, Feb 8, 2019 at 7:48 AM Ankur Rana <ankur.r...@getfareye.com> > > > wrote: > > > > > > > Hi All, > > > > > > > > One of my Kafka streams application is returning negative values for > > > > count() method. > > > > How is that possible? > > > > > > > > > > > > is there any known issue? I cannot think of any reason that this > count > > > > could be negative. > > > > Is it possible if the state store is corrupted? > > > > > > > > idAndJobTransaction > > > > .filter((k,v) -> v!=null) > > > > .mapValues(jobTransaction -> { > > > > jobTransaction.setCount(0); > > > > jobTransaction.setId(0L); > > > > jobTransaction.setRunsheet_id(0L); > > > > jobTransaction.setTimestamp(0L); > > > > if(jobTransaction.getDelete_flag() == 1) > > > > return null; > > > > else > > > > return jobTransaction; > > > > } ) > > > > .groupBy((id,jobTransaction)->new > > > > > > > > > > > > > > KeyValue<>(jobTransaction,jobTransaction),Serialized.with(jobTransactionSerde,jobTransactionSerde)) > > > > .count() > > > > .toStream() > > > > .mapValues((k,v)-> new JobSummary(k,v)) > > > > .peek((k,v)->{ > > > > log.info(k.toString()); > > > > log.info(v.toString()); > > > > }).selectKey((k,v)-> v.getCompany_id()) // So that the count > > > > is consumed in order for each company > > > > > .to(JOB_SUMMARY,Produced.with(Serdes.Long(),jobSummarySerde)); > > > > > > > > > > > > -- > > > > Thanks, > > > > > > > > Ankur Rana > > > > Software Developer > > > > FarEye > > > > > > > > > > > > > -- > > Thanks, > > > > Ankur Rana > > Software Developer > > FarEye > > > -- Thanks, Ankur Rana Software Developer FarEye