I wrote the below code which will increment a counter for the data in the datastream, and when I print the counter each time it seems the value is reinitialised to 0, and it is not incrementing, any thoughts.
class BookingCntFlatMapFunction extends RichFlatMapFunction[(Booking,Long,Long),(Booking,Long,Long)] { @transient var bookingCnt:ValueState[Int] = null override def flatMap(in: (Booking, Long, Long), out: Collector[(Booking, Long, Long)]): Unit = { var value = bookingCnt.value() value += 1 System.out.println("current booking count "+value) bookingCnt.update(value) out.collect(in) } override def open( config:Configuration): Unit = { val descriptor: ValueStateDescriptor[Int] = new ValueStateDescriptor[Int]("bookingcnt", TypeInformation.of(new TypeHint[Int]() {}),0) bookingCnt = getRuntimeContext().getState(descriptor); } } Output of the program: current booking count 1 current booking count 1 current booking count 1 current booking count 1 current booking count 1 current booking count 1 current booking count 1