Hi All,
One of my Kafka streams application is returning negative values for
count() method.
How is that possible?
is there any known issue? I cannot think of any reason that this count
could be negative.
Is it possible if the state store is corrupted?
idAndJobTransaction
.filter((k,v) -> v!=null)
.mapValues(jobTransaction -> {
jobTransaction.setCount(0);
jobTransaction.setId(0L);
jobTransaction.setRunsheet_id(0L);
jobTransaction.setTimestamp(0L);
if(jobTransaction.getDelete_flag() == 1)
return null;
else
return jobTransaction;
} )
.groupBy((id,jobTransaction)->new
KeyValue<>(jobTransaction,jobTransaction),Serialized.with(jobTransactionSerde,jobTransactionSerde))
.count()
.toStream()
.mapValues((k,v)-> new JobSummary(k,v))
.peek((k,v)->{
log.info(k.toString());
log.info(v.toString());
}).selectKey((k,v)-> v.getCompany_id()) // So that the count
is consumed in order for each company
.to(JOB_SUMMARY,Produced.with(Serdes.Long(),jobSummarySerde));
--
Thanks,
Ankur Rana
Software Developer
FarEye