I have used connected streams where one part of the connected stream maintains 
state and the other part consumes it. 

However it was not queryable externally. For state that is queryable externally 
you are right you probably need another operator to store state and support 
query-ability. 

Sent from my iPhone

> On Sep 5, 2018, at 7:26 AM, Darshan Singh <darshan.m...@gmail.com> wrote:
> 
> Hi All,
> 
> I was playing with queryable state. As queryable stream can not be modified 
> how do I use the output of say my reduce function for further processing.
> 
> Below is 1 example. I can see that I am processing the data twice. One for 
> the Queryable stream and once for the my original stream. That means state 
> will be kept twice as well?
> 
> In simple term I would like to query the state from rf reduce function and I 
> would like my stream to be written to Kafka. If I use like below I am able to 
> do so but it seems to me that there is duplicate state storing as well as 
> duplicate work is being done.
> 
> Is there any alternate for what I am trying to achieve this?
> 
> Thanks
> 
> public class reducetest {
> 
>     public static void main(String[] args) throws Exception {
> 
> 
>         // set up streaming execution environment
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>         env.setParallelism(1);
>         List<Integer> nums = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
> 
>         ReducingStateDescriptor<Tuple2<Integer, Integer>> descriptor =
>                 new ReducingStateDescriptor<Tuple2<Integer, Integer>>(
>                         "sum", // the state name
>                         new rf(),
>                         TypeInformation.of(new TypeHint<Tuple2<Integer, 
> Integer>>() {})); // type information
>         descriptor.setQueryable("reduce");
> 
>         DataStream<Tuple2<Integer, Integer>> ds1 = 
> env.fromCollection(nums).map(new MapFunction<Integer, Tuple2<Integer, 
> Integer>>() {
>             @Override
>             public Tuple2<Integer, Integer> map(Integer integer) throws 
> Exception {
>                 return Tuple2.of(integer%2,integer);
>             }
>         }).keyBy(0)
>         ;
> 
>         ((KeyedStream) ds1).asQueryableState("reduce", descriptor);
> 
> 
>         DataStream<Integer> ds2 = ((KeyedStream) ds1).reduce( 
> descriptor.getReduceFunction());
> 
>         //ds1.print();
>         ds2.print();
> 
>         System.out.println(env.getExecutionPlan());
> 
>         env.execute("re");
>     }
> 
> 
> 
>     static class  rf implements ReduceFunction<Tuple2<Integer, Integer>> {
> 
>         @Override
>         public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> e, 
> Tuple2<Integer, Integer> n) throws Exception {
>             return Tuple2.of(e.f0, e.f1 + n.f1);
> 
>         }
>     }
> 
> }

Reply via email to