Hi All,

I was playing with queryable state. As queryable stream can not be modified
how do I use the output of say my reduce function for further processing.

Below is 1 example. I can see that I am processing the data twice. One for
the Queryable stream and once for the my original stream. That means state
will be kept twice as well?

In simple term I would like to query the state from rf reduce function and
I would like my stream to be written to Kafka. If I use like below I am
able to do so but it seems to me that there is duplicate state storing as
well as duplicate work is being done.

Is there any alternate for what I am trying to achieve this?

Thanks

public class reducetest {

    public static void main(String[] args) throws Exception {


        // set up streaming execution environment
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        List<Integer> nums = Arrays.asList(1,2,3,4,5,6,7,8,9,10);

        ReducingStateDescriptor<Tuple2<Integer, Integer>> descriptor =
                new ReducingStateDescriptor<Tuple2<Integer, Integer>>(
                        "sum", // the state name
                        new rf(),
                        TypeInformation.of(new
TypeHint<Tuple2<Integer, Integer>>() {})); // type information
        descriptor.setQueryable("reduce");

        DataStream<Tuple2<Integer, Integer>> ds1 =
env.fromCollection(nums).map(new MapFunction<Integer, Tuple2<Integer,
Integer>>() {
            @Override
            public Tuple2<Integer, Integer> map(Integer integer)
throws Exception {
                return Tuple2.of(integer%2,integer);
            }
        }).keyBy(0)
        ;

        ((KeyedStream) ds1).asQueryableState("reduce", descriptor);


        DataStream<Integer> ds2 = ((KeyedStream) ds1).reduce(
descriptor.getReduceFunction());

        //ds1.print();
        ds2.print();

        System.out.println(env.getExecutionPlan());

        env.execute("re");
    }



    static class  rf implements ReduceFunction<Tuple2<Integer, Integer>> {

        @Override
        public Tuple2<Integer, Integer> reduce(Tuple2<Integer,
Integer> e, Tuple2<Integer, Integer> n) throws Exception {
            return Tuple2.of(e.f0, e.f1 + n.f1);

        }
    }

}

Reply via email to