No, I do not have any logs before call to mapValues call but I will add
them and let you know.
Although I did see the negative values in the peek method above. Job
Summary class is a very simple class and doesn't do any processing.

[image: image.png]

Here's the constructor for JobSummary class used in this app.

public JobSummary(JobTransaction j, Long count){
    this.setUser_id(j.getUser_id());
    this.setHub_id(j.getHub_id());
    this.setCity_id(j.getCity_id());
    this.setCompany_id(j.getCompany_id());
    this.setJob_master_id(j.getJob_master_id());
    this.setJob_status_id(j.getJob_status_id());
    this.setCount(count);
    this.setDate(j.getDate());
}

I have added some more information in the StackOverflow thread below in
case you'd like to add there.
https://stackoverflow.com/questions/54592303/kafka-streams-kgroupedtable-count-returning-negative-value-hows-that-possibl


On Fri, Feb 8, 2019 at 7:25 PM Bill Bejeck <b...@confluent.io> wrote:

> Hi Ankur,
>
> I understand. Let's see if we can narrow things down some without any
> logging.
>
> Where exactly are you seeing the negative number from the code above?
> Have you confirmed the count is negative by observing the results of the
> count().toStream() before the mapValues call?
>
>
> Thanks!
> Bill
>
> On Fri, Feb 8, 2019 at 1:31 PM Ankur Rana <ankur.r...@getfareye.com>
> wrote:
>
> > Hi Bill,
> >
> > I will try to make that change but since the negative values are really
> > rare, It would be very difficult to capture the logs at that point. I am
> > seeing this issue only in production. I will see what I can do.
> >
> > Also, the negative count is not limited to smaller values, but I have
> seen
> > values as high as -300.
> > I cannot get my head around this behavior.
> >
> >
> > On Fri, Feb 8, 2019 at 5:56 PM Bill Bejeck <b...@confluent.io> wrote:
> >
> > > Hi Ankur,
> > >
> > > Could you add an additional peek method logging what's going into the
> > > groupBy call and share the logs?
> > >
> > > Thanks,
> > > Bill
> > >
> > > On Fri, Feb 8, 2019 at 7:48 AM Ankur Rana <ankur.r...@getfareye.com>
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > One of my Kafka streams application is returning negative values for
> > > > count() method.
> > > > How is that possible?
> > > >
> > > >
> > > > is there any known issue? I cannot think of any reason that this
> count
> > > > could be negative.
> > > > Is it possible if the state store is corrupted?
> > > >
> > > > idAndJobTransaction
> > > >         .filter((k,v) -> v!=null)
> > > >         .mapValues(jobTransaction -> {
> > > >             jobTransaction.setCount(0);
> > > >             jobTransaction.setId(0L);
> > > >             jobTransaction.setRunsheet_id(0L);
> > > >             jobTransaction.setTimestamp(0L);
> > > >             if(jobTransaction.getDelete_flag() == 1)
> > > >                 return null;
> > > >             else
> > > >                 return jobTransaction;
> > > >         } )
> > > >         .groupBy((id,jobTransaction)->new
> > > >
> > > >
> > >
> >
> KeyValue<>(jobTransaction,jobTransaction),Serialized.with(jobTransactionSerde,jobTransactionSerde))
> > > >         .count()
> > > >         .toStream()
> > > >         .mapValues((k,v)-> new JobSummary(k,v))
> > > >         .peek((k,v)->{
> > > >             log.info(k.toString());
> > > >             log.info(v.toString());
> > > >         }).selectKey((k,v)-> v.getCompany_id())  // So that the count
> > > > is consumed in order for each company
> > > >
>  .to(JOB_SUMMARY,Produced.with(Serdes.Long(),jobSummarySerde));
> > > >
> > > >
> > > > --
> > > > Thanks,
> > > >
> > > > Ankur Rana
> > > > Software Developer
> > > > FarEye
> > > >
> > >
> >
> >
> > --
> > Thanks,
> >
> > Ankur Rana
> > Software Developer
> > FarEye
> >
>


-- 
Thanks,

Ankur Rana
Software Developer
FarEye

Reply via email to