Hi Ankur,

Could you add an additional peek method logging what's going into the
groupBy call and share the logs?

Thanks,
Bill

On Fri, Feb 8, 2019 at 7:48 AM Ankur Rana <ankur.r...@getfareye.com> wrote:

> Hi All,
>
> One of my Kafka streams application is returning negative values for
> count() method.
> How is that possible?
>
>
> is there any known issue? I cannot think of any reason that this count
> could be negative.
> Is it possible if the state store is corrupted?
>
> idAndJobTransaction
>         .filter((k,v) -> v!=null)
>         .mapValues(jobTransaction -> {
>             jobTransaction.setCount(0);
>             jobTransaction.setId(0L);
>             jobTransaction.setRunsheet_id(0L);
>             jobTransaction.setTimestamp(0L);
>             if(jobTransaction.getDelete_flag() == 1)
>                 return null;
>             else
>                 return jobTransaction;
>         } )
>         .groupBy((id,jobTransaction)->new
>
> KeyValue<>(jobTransaction,jobTransaction),Serialized.with(jobTransactionSerde,jobTransactionSerde))
>         .count()
>         .toStream()
>         .mapValues((k,v)-> new JobSummary(k,v))
>         .peek((k,v)->{
>             log.info(k.toString());
>             log.info(v.toString());
>         }).selectKey((k,v)-> v.getCompany_id())  // So that the count
> is consumed in order for each company
>         .to(JOB_SUMMARY,Produced.with(Serdes.Long(),jobSummarySerde));
>
>
> --
> Thanks,
>
> Ankur Rana
> Software Developer
> FarEye
>

Reply via email to