Thanks Hequn. But i want to give random TTL for each partitioned key. How
can i achieve it?

Regards
Bhaskar

On Mon, Sep 17, 2018 at 7:30 AM Hequn Cheng <chenghe...@gmail.com> wrote:

> Hi bhaskar,
>
> You need change nothing if you want to handle multi keys. Flink will do it
> for you. The ValueState is a keyed state. You can think of Keyed State
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-keyed-state>
> as Operator State that has been partitioned, or sharded, with exactly one
> state-partition per key.
> TTL can be used in the same way.
>
> Best, Hequn
>
>
> On Fri, Sep 14, 2018 at 10:29 PM bhaskar.eba...@gmail.com <
> bhaskar.eba...@gmail.com> wrote:
>
>> Hi
>> In the following example given in flink:
>> object ExampleCountWindowAverage extends App {
>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>
>>   env.fromCollection(List(
>>     (1L, 3L),
>>     (1L, 5L),
>>     (1L, 7L),
>>     (1L, 4L),
>>     (1L, 2L)
>>   )).keyBy(_._1)
>>     .flatMap(new CountWindowAverage())
>>     .print()
>>   // the printed output will be (1,4) and (1,5)
>>
>>   env.execute("ExampleManagedState")
>> }
>>
>> There is only 1 state because there is one key. In the CountWindowAverage
>> method there is one state descriptor :  new ValueStateDescriptor[(Long,
>> Long)]("average", createTypeInformation[(Long, Long)])
>> Name given as "average". In order to implement this is generic way, shall
>> i modify the  method:
>>
>> CountWindowAverage(keyName:String)  so that  new
>> ValueStateDescriptor[(Long, Long)](keyName, createTypeInformation[(Long,
>> Long)]) is created. But how to configure TTL for this? Inside this method?
>> In the eample:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
>> ,   you have given a stand alone ValueStateDescriptor.  How can i use the
>> TTL inside CountWindowAverage() per Key?
>>
>> Regards
>> Bhaskar
>>
>

Reply via email to