Hi bhaskar,

You need change nothing if you want to handle multi keys. Flink will do it
for you. The ValueState is a keyed state. You can think of Keyed State
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-keyed-state>
as Operator State that has been partitioned, or sharded, with exactly one
state-partition per key.
TTL can be used in the same way.

Best, Hequn


On Fri, Sep 14, 2018 at 10:29 PM bhaskar.eba...@gmail.com <
bhaskar.eba...@gmail.com> wrote:

> Hi
> In the following example given in flink:
> object ExampleCountWindowAverage extends App {
>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>
>   env.fromCollection(List(
>     (1L, 3L),
>     (1L, 5L),
>     (1L, 7L),
>     (1L, 4L),
>     (1L, 2L)
>   )).keyBy(_._1)
>     .flatMap(new CountWindowAverage())
>     .print()
>   // the printed output will be (1,4) and (1,5)
>
>   env.execute("ExampleManagedState")
> }
>
> There is only 1 state because there is one key. In the CountWindowAverage
> method there is one state descriptor :  new ValueStateDescriptor[(Long,
> Long)]("average", createTypeInformation[(Long, Long)])
> Name given as "average". In order to implement this is generic way, shall
> i modify the  method:
>
> CountWindowAverage(keyName:String)  so that  new
> ValueStateDescriptor[(Long, Long)](keyName, createTypeInformation[(Long,
> Long)]) is created. But how to configure TTL for this? Inside this method?
> In the eample:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
> ,   you have given a stand alone ValueStateDescriptor.  How can i use the
> TTL inside CountWindowAverage() per Key?
>
> Regards
> Bhaskar
>

Reply via email to