Hi All,

One of my Kafka streams application is returning negative values for
count() method.
How is that possible?


is there any known issue? I cannot think of any reason that this count
could be negative.
Is it possible if the state store is corrupted?

idAndJobTransaction
        .filter((k,v) -> v!=null)
        .mapValues(jobTransaction -> {
            jobTransaction.setCount(0);
            jobTransaction.setId(0L);
            jobTransaction.setRunsheet_id(0L);
            jobTransaction.setTimestamp(0L);
            if(jobTransaction.getDelete_flag() == 1)
                return null;
            else
                return jobTransaction;
        } )
        .groupBy((id,jobTransaction)->new
KeyValue<>(jobTransaction,jobTransaction),Serialized.with(jobTransactionSerde,jobTransactionSerde))
        .count()
        .toStream()
        .mapValues((k,v)-> new JobSummary(k,v))
        .peek((k,v)->{
            log.info(k.toString());
            log.info(v.toString());
        }).selectKey((k,v)-> v.getCompany_id())  // So that the count
is consumed in order for each company
        .to(JOB_SUMMARY,Produced.with(Serdes.Long(),jobSummarySerde));


-- 
Thanks,

Ankur Rana
Software Developer
FarEye

Reply via email to