I wrote the below code which will increment a counter for the data in the
datastream, and when I print the counter each time it seems the value is
reinitialised to 0, and it is not incrementing, any thoughts.

class BookingCntFlatMapFunction extends
RichFlatMapFunction[(Booking,Long,Long),(Booking,Long,Long)]
{


  @transient var bookingCnt:ValueState[Int] = null

  override def flatMap(in: (Booking, Long, Long), out:
Collector[(Booking, Long, Long)]): Unit = {
    var value = bookingCnt.value()
    value += 1
    System.out.println("current booking count "+value)

    bookingCnt.update(value)
     out.collect(in)
  }

  override def open( config:Configuration): Unit = {
    val descriptor: ValueStateDescriptor[Int] = new
ValueStateDescriptor[Int]("bookingcnt",
      TypeInformation.of(new TypeHint[Int]() {}),0)
    bookingCnt = getRuntimeContext().getState(descriptor);

  }


}

Output of the program:

current booking count 1
current booking count 1
current booking count 1
current booking count 1
current booking count 1
current booking count 1
current booking count 1

Reply via email to