Hi Taher, As long as you don't put something into the state ValueState#value() will return null. The point for having ctx.globalState(1) and ctx.windowState(2) is to allow users to store some their own state, scoped to key(1) and key & window(2) accordingly. If you want to access all elements assigned to that window you can iterate over them with the "itr" in your example.
Best,
Dawid
On 25/09/18 15:07, Taher Koitawala wrote:
> Hi All,
> I am trying to access elements stored in the state of the
> window. As window, itself is a stateful operator I think I should be
> able to get records in the process function after the is triggered.
> Can someone tell me why in the following code is the state of the
> window null?
>
> Below is a mocked piece of code we are using. Am I doing something
> wrong here?
>
> env.enableCheckpointing(20000L);
> env.setStateBackend(new FsStateBackend("path"));
> DataStream<S> stream1 =env.addSource(new FlinkKafkaConsumer);
> DataStream<S> stream2 =env.addSource(new FlinkKafkaConsumer);
>
> stream1.union(stream2)
> .keyBy()
> .timeWindow(Time.milliseconds((30L))
> .allowedLateness(Time.minutes(1))
> .process(new ProcessWindowFunction<T>()
> {
> public void process(T t, ProcessWindowFunction<T>.Context ctx,
> Iterable<R> itr, Collector<R>collector)
> KeyedStateStore globalState = ctx.globalState();
> ValueState<Tuple6<Long, String, String, String, String, String>>
> valueState = ctx.globalState().getState(new
> ValueStateDescriptor<>("valueState", TypeInformation.of(new
> TypeHint<T>() {})));
>
> System.out.println(valueState.value());
> collector.collect(T)
> })
>
>
>
> Regards,
> Taher Koitawala
> GS Lab Pune
> +91 8407979163
signature.asc
Description: OpenPGP digital signature
