Hi Bill, I will try to make that change but since the negative values are really rare, It would be very difficult to capture the logs at that point. I am seeing this issue only in production. I will see what I can do.
Also, the negative count is not limited to smaller values, but I have seen values as high as -300. I cannot get my head around this behavior. On Fri, Feb 8, 2019 at 5:56 PM Bill Bejeck <b...@confluent.io> wrote: > Hi Ankur, > > Could you add an additional peek method logging what's going into the > groupBy call and share the logs? > > Thanks, > Bill > > On Fri, Feb 8, 2019 at 7:48 AM Ankur Rana <ankur.r...@getfareye.com> > wrote: > > > Hi All, > > > > One of my Kafka streams application is returning negative values for > > count() method. > > How is that possible? > > > > > > is there any known issue? I cannot think of any reason that this count > > could be negative. > > Is it possible if the state store is corrupted? > > > > idAndJobTransaction > > .filter((k,v) -> v!=null) > > .mapValues(jobTransaction -> { > > jobTransaction.setCount(0); > > jobTransaction.setId(0L); > > jobTransaction.setRunsheet_id(0L); > > jobTransaction.setTimestamp(0L); > > if(jobTransaction.getDelete_flag() == 1) > > return null; > > else > > return jobTransaction; > > } ) > > .groupBy((id,jobTransaction)->new > > > > > KeyValue<>(jobTransaction,jobTransaction),Serialized.with(jobTransactionSerde,jobTransactionSerde)) > > .count() > > .toStream() > > .mapValues((k,v)-> new JobSummary(k,v)) > > .peek((k,v)->{ > > log.info(k.toString()); > > log.info(v.toString()); > > }).selectKey((k,v)-> v.getCompany_id()) // So that the count > > is consumed in order for each company > > .to(JOB_SUMMARY,Produced.with(Serdes.Long(),jobSummarySerde)); > > > > > > -- > > Thanks, > > > > Ankur Rana > > Software Developer > > FarEye > > > -- Thanks, Ankur Rana Software Developer FarEye