Hi Ankur, Could you add an additional peek method logging what's going into the groupBy call and share the logs?
Thanks, Bill On Fri, Feb 8, 2019 at 7:48 AM Ankur Rana <ankur.r...@getfareye.com> wrote: > Hi All, > > One of my Kafka streams application is returning negative values for > count() method. > How is that possible? > > > is there any known issue? I cannot think of any reason that this count > could be negative. > Is it possible if the state store is corrupted? > > idAndJobTransaction > .filter((k,v) -> v!=null) > .mapValues(jobTransaction -> { > jobTransaction.setCount(0); > jobTransaction.setId(0L); > jobTransaction.setRunsheet_id(0L); > jobTransaction.setTimestamp(0L); > if(jobTransaction.getDelete_flag() == 1) > return null; > else > return jobTransaction; > } ) > .groupBy((id,jobTransaction)->new > > KeyValue<>(jobTransaction,jobTransaction),Serialized.with(jobTransactionSerde,jobTransactionSerde)) > .count() > .toStream() > .mapValues((k,v)-> new JobSummary(k,v)) > .peek((k,v)->{ > log.info(k.toString()); > log.info(v.toString()); > }).selectKey((k,v)-> v.getCompany_id()) // So that the count > is consumed in order for each company > .to(JOB_SUMMARY,Produced.with(Serdes.Long(),jobSummarySerde)); > > > -- > Thanks, > > Ankur Rana > Software Developer > FarEye >