Hi All, I was playing with queryable state. As queryable stream can not be modified how do I use the output of say my reduce function for further processing.
Below is 1 example. I am sure I have done it wrong :). I am using reduce function twice or do I need to use rich reduce function and use the queryable state there. Thanks public class reducetest { public static void main(String[] args) throws Exception { // set up streaming execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); List<Integer> nums = Arrays.asList(1,2,3,4,5,6,7,8,9,10); ReducingStateDescriptor<Tuple2<Integer, Integer>> descriptor = new ReducingStateDescriptor<Tuple2<Integer, Integer>>( "sum", // the state name new rf(), TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {})); // type information descriptor.setQueryable("reduce"); DataStream<Tuple2<Integer, Integer>> ds1 = env.fromCollection(nums).map(new MapFunction<Integer, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> map(Integer integer) throws Exception { return Tuple2.of(integer%2,integer); } }).keyBy(0) ; ((KeyedStream) ds1).asQueryableState("reduce", descriptor).getStateDescriptor(); DataStream<Integer> ds2 = ((KeyedStream) ds1).reduce( new rf()); //ds1.print(); ds2.print(); System.out.println(env.getExecutionPlan()); env.execute("re"); } static class rf implements ReduceFunction<Tuple2<Integer, Integer>> { @Override public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> e, Tuple2<Integer, Integer> n) throws Exception { return Tuple2.of(e.f0, e.f1 + n.f1); } } }