Hi All,

I was playing with queryable state. As queryable stream can not be modified
how do I use the output of say my reduce function for further processing.

Below is 1 example. I am sure I have done it wrong :). I am using reduce
function twice or do I need to use rich reduce function and use the
queryable state there.

Thanks

public class reducetest {

    public static void main(String[] args) throws Exception {


        // set up streaming execution environment
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        List<Integer> nums = Arrays.asList(1,2,3,4,5,6,7,8,9,10);

        ReducingStateDescriptor<Tuple2<Integer, Integer>> descriptor =
                new ReducingStateDescriptor<Tuple2<Integer, Integer>>(
                        "sum", // the state name
                        new rf(),
                        TypeInformation.of(new
TypeHint<Tuple2<Integer, Integer>>() {})); // type information
        descriptor.setQueryable("reduce");

        DataStream<Tuple2<Integer, Integer>> ds1 =
env.fromCollection(nums).map(new MapFunction<Integer, Tuple2<Integer,
Integer>>() {
            @Override
            public Tuple2<Integer, Integer> map(Integer integer)
throws Exception {
                return Tuple2.of(integer%2,integer);
            }
        }).keyBy(0)
        ;

        ((KeyedStream) ds1).asQueryableState("reduce",
descriptor).getStateDescriptor();


        DataStream<Integer> ds2 = ((KeyedStream) ds1).reduce( new rf());

        //ds1.print();
        ds2.print();

        System.out.println(env.getExecutionPlan());

        env.execute("re");
    }



    static class  rf implements ReduceFunction<Tuple2<Integer, Integer>> {

        @Override
        public Tuple2<Integer, Integer> reduce(Tuple2<Integer,
Integer> e, Tuple2<Integer, Integer> n) throws Exception {
            return Tuple2.of(e.f0, e.f1 + n.f1);

        }
    }

}

Reply via email to