Thanks Hequn. But i want to give random TTL for each partitioned key. How can i achieve it?
Regards Bhaskar On Mon, Sep 17, 2018 at 7:30 AM Hequn Cheng <chenghe...@gmail.com> wrote: > Hi bhaskar, > > You need change nothing if you want to handle multi keys. Flink will do it > for you. The ValueState is a keyed state. You can think of Keyed State > <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-keyed-state> > as Operator State that has been partitioned, or sharded, with exactly one > state-partition per key. > TTL can be used in the same way. > > Best, Hequn > > > On Fri, Sep 14, 2018 at 10:29 PM bhaskar.eba...@gmail.com < > bhaskar.eba...@gmail.com> wrote: > >> Hi >> In the following example given in flink: >> object ExampleCountWindowAverage extends App { >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> >> env.fromCollection(List( >> (1L, 3L), >> (1L, 5L), >> (1L, 7L), >> (1L, 4L), >> (1L, 2L) >> )).keyBy(_._1) >> .flatMap(new CountWindowAverage()) >> .print() >> // the printed output will be (1,4) and (1,5) >> >> env.execute("ExampleManagedState") >> } >> >> There is only 1 state because there is one key. In the CountWindowAverage >> method there is one state descriptor : new ValueStateDescriptor[(Long, >> Long)]("average", createTypeInformation[(Long, Long)]) >> Name given as "average". In order to implement this is generic way, shall >> i modify the method: >> >> CountWindowAverage(keyName:String) so that new >> ValueStateDescriptor[(Long, Long)](keyName, createTypeInformation[(Long, >> Long)]) is created. But how to configure TTL for this? Inside this method? >> In the eample: >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl >> , you have given a stand alone ValueStateDescriptor. How can i use the >> TTL inside CountWindowAverage() per Key? >> >> Regards >> Bhaskar >> >