Hi All, One of my Kafka streams application is returning negative values for count() method. How is that possible?
is there any known issue? I cannot think of any reason that this count could be negative. Is it possible if the state store is corrupted? idAndJobTransaction .filter((k,v) -> v!=null) .mapValues(jobTransaction -> { jobTransaction.setCount(0); jobTransaction.setId(0L); jobTransaction.setRunsheet_id(0L); jobTransaction.setTimestamp(0L); if(jobTransaction.getDelete_flag() == 1) return null; else return jobTransaction; } ) .groupBy((id,jobTransaction)->new KeyValue<>(jobTransaction,jobTransaction),Serialized.with(jobTransactionSerde,jobTransactionSerde)) .count() .toStream() .mapValues((k,v)-> new JobSummary(k,v)) .peek((k,v)->{ log.info(k.toString()); log.info(v.toString()); }).selectKey((k,v)-> v.getCompany_id()) // So that the count is consumed in order for each company .to(JOB_SUMMARY,Produced.with(Serdes.Long(),jobSummarySerde)); -- Thanks, Ankur Rana Software Developer FarEye