Thanks Kostas! Regards Bhaskar
On Mon, Sep 17, 2018 at 9:05 PM Kostas Kloudas <k.klou...@data-artisans.com> wrote: > Hi Bhaskar, > > If you want different TTLs per key, then you should use timers with a > process function > as shown in [1]. This is though an old presentation, so now the > RichProcessFunction is a KeyedProcessFunction. > Also please have a look at the training material in [2] and the process > function documentation in [3] > > Cheers, > Kostas > > [1] > https://www.slideshare.net/dataArtisans/apache-flink-training-datastream-api-processfunction > [2] https://training.data-artisans.com/ > [3] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html > > On Sep 17, 2018, at 8:50 AM, Vijay Bhaskar <bhaskar.eba...@gmail.com> > wrote: > > Thanks Hequn. But i want to give random TTL for each partitioned key. How > can i achieve it? > > Regards > Bhaskar > > On Mon, Sep 17, 2018 at 7:30 AM Hequn Cheng <chenghe...@gmail.com> wrote: > >> Hi bhaskar, >> >> You need change nothing if you want to handle multi keys. Flink will do >> it for you. The ValueState is a keyed state. You can think of Keyed State >> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-keyed-state> >> as Operator State that has been partitioned, or sharded, with exactly one >> state-partition per key. >> TTL can be used in the same way. >> >> Best, Hequn >> >> >> On Fri, Sep 14, 2018 at 10:29 PM bhaskar.eba...@gmail.com < >> bhaskar.eba...@gmail.com> wrote: >> >>> Hi >>> In the following example given in flink: >>> object ExampleCountWindowAverage extends App { >>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>> >>> env.fromCollection(List( >>> (1L, 3L), >>> (1L, 5L), >>> (1L, 7L), >>> (1L, 4L), >>> (1L, 2L) >>> )).keyBy(_._1) >>> .flatMap(new CountWindowAverage()) >>> .print() >>> // the printed output will be (1,4) and (1,5) >>> >>> env.execute("ExampleManagedState") >>> } >>> >>> There is only 1 state because there is one key. In the >>> CountWindowAverage method there is one state descriptor : new >>> ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, >>> Long)]) >>> Name given as "average". In order to implement this is generic way, >>> shall i modify the method: >>> >>> CountWindowAverage(keyName:String) so that new >>> ValueStateDescriptor[(Long, Long)](keyName, createTypeInformation[(Long, >>> Long)]) is created. But how to configure TTL for this? Inside this method? >>> In the eample: >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl >>> , you have given a stand alone ValueStateDescriptor. How can i use the >>> TTL inside CountWindowAverage() per Key? >>> >>> Regards >>> Bhaskar >>> >> >