(Booking(te7uc4,compact,j...@gmail.com,Mon Feb 29 19:19:40 IST 2016),1458730980000,1458731040000)current booking count 1 (Booking(tdr1ym,compact,er...@gmail.com,Mon Feb 29 18:41:07 IST 2016),1458730980000,1458731040000)current booking count 1 (Booking(t9zvqw,compact,yas...@gmail.com,Mon Feb 29 19:19:40 IST 2016),1458730980000,1458731040000)current booking count 1 (Booking(tdr1e8,compact,k....@gmail.com,Mon Feb 29 18:41:07 IST 2016),1458730980000,1458731040000)current booking count 1 (Booking(tdntcj,compact,e...@gmail.com,Mon Feb 29 19:19:40 IST 2016),1458730980000,1458731040000)current booking count 1 (Booking(tdr1wv,compact,e...@gmail.com,Mon Feb 29 18:41:07 IST 2016),1458730980000,1458731040000)current booking count 1 (Booking(tdr1wv,compact,er...@yahoo.in,Mon Feb 29 18:41:07 IST 2016),1458730980000,1458731040000)current booking count 1
The key is email id from the booking object. On Wed, Mar 23, 2016 at 4:32 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > what is the input for each of those outputs? Could you maybe print this: > > System.out.println(in + “, current booking count "+value) > > Also, what is the key that you specify for your KeyedStream? > > Cheers, > Aljoscha > > On 23 Mar 2016, at 11:53, Balaji Rajagopalan < > balaji.rajagopa...@olacabs.com> wrote: > > > > I wrote the below code which will increment a counter for the data in > the datastream, and when I print the counter each time it seems the value > is reinitialised to 0, and it is not incrementing, any thoughts. > > > > class BookingCntFlatMapFunction extends > RichFlatMapFunction[(Booking,Long,Long),(Booking,Long,Long)] > > { > > > > > > @transient var bookingCnt:ValueState[Int] = null > > > > override def flatMap(in: (Booking, Long, Long), out: > Collector[(Booking, Long, Long)]): Unit = { > > var value = bookingCnt.value() > > value += 1 > > System.out.println("current booking count "+value) > > > > bookingCnt.update(value) > > out.collect(in) > > } > > > > override def open( config:Configuration): Unit = { > > val descriptor: ValueStateDescriptor[Int] = new > ValueStateDescriptor[Int]("bookingcnt", > > TypeInformation.of(new TypeHint[Int]() {}),0) > > bookingCnt = getRuntimeContext().getState(descriptor); > > > > } > > > > > > } > > Output of the program: > > current booking count 1 > > current booking count 1 > > current booking count 1 > > current booking count 1 > > current booking count 1 > > current booking count 1 > > current booking count 1 > > > >