Hi In the following example given in flink: object ExampleCountWindowAverage extends App { val env = StreamExecutionEnvironment.getExecutionEnvironment
env.fromCollection(List( (1L, 3L), (1L, 5L), (1L, 7L), (1L, 4L), (1L, 2L) )).keyBy(_._1) .flatMap(new CountWindowAverage()) .print() // the printed output will be (1,4) and (1,5) env.execute("ExampleManagedState") } There is only 1 state because there is one key. In the CountWindowAverage method there is one state descriptor : new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)]) Name given as "average". In order to implement this is generic way, shall i modify the method: CountWindowAverage(keyName:String) so that new ValueStateDescriptor[(Long, Long)](keyName, createTypeInformation[(Long, Long)]) is created. But how to configure TTL for this? Inside this method? In the eample: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl , you have given a stand alone ValueStateDescriptor. How can i use the TTL inside CountWindowAverage() per Key? Regards Bhaskar