Hi
In the following example given in flink:
object ExampleCountWindowAverage extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  env.fromCollection(List(
    (1L, 3L),
    (1L, 5L),
    (1L, 7L),
    (1L, 4L),
    (1L, 2L)
  )).keyBy(_._1)
    .flatMap(new CountWindowAverage())
    .print()
  // the printed output will be (1,4) and (1,5)

  env.execute("ExampleManagedState")
}

There is only 1 state because there is one key. In the CountWindowAverage 
method there is one state descriptor :  new ValueStateDescriptor[(Long, 
Long)]("average", createTypeInformation[(Long, Long)])
Name given as "average". In order to implement this is generic way, shall i 
modify the  method:

CountWindowAverage(keyName:String)  so that  new ValueStateDescriptor[(Long, 
Long)](keyName, createTypeInformation[(Long, Long)]) is created. But how to 
configure TTL for this? Inside this method?
In the eample: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
 ,   you have given a stand alone ValueStateDescriptor.  How can i use the TTL 
inside CountWindowAverage() per Key?

Regards
Bhaskar

Reply via email to