Hi bhaskar, You need change nothing if you want to handle multi keys. Flink will do it for you. The ValueState is a keyed state. You can think of Keyed State <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-keyed-state> as Operator State that has been partitioned, or sharded, with exactly one state-partition per key. TTL can be used in the same way.
Best, Hequn On Fri, Sep 14, 2018 at 10:29 PM bhaskar.eba...@gmail.com < bhaskar.eba...@gmail.com> wrote: > Hi > In the following example given in flink: > object ExampleCountWindowAverage extends App { > val env = StreamExecutionEnvironment.getExecutionEnvironment > > env.fromCollection(List( > (1L, 3L), > (1L, 5L), > (1L, 7L), > (1L, 4L), > (1L, 2L) > )).keyBy(_._1) > .flatMap(new CountWindowAverage()) > .print() > // the printed output will be (1,4) and (1,5) > > env.execute("ExampleManagedState") > } > > There is only 1 state because there is one key. In the CountWindowAverage > method there is one state descriptor : new ValueStateDescriptor[(Long, > Long)]("average", createTypeInformation[(Long, Long)]) > Name given as "average". In order to implement this is generic way, shall > i modify the method: > > CountWindowAverage(keyName:String) so that new > ValueStateDescriptor[(Long, Long)](keyName, createTypeInformation[(Long, > Long)]) is created. But how to configure TTL for this? Inside this method? > In the eample: > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl > , you have given a stand alone ValueStateDescriptor. How can i use the > TTL inside CountWindowAverage() per Key? > > Regards > Bhaskar >