Hi All, I was playing with queryable state. As queryable stream can not be modified how do I use the output of say my reduce function for further processing.
Below is 1 example. I can see that I am processing the data twice. One for the Queryable stream and once for the my original stream. That means state will be kept twice as well? In simple term I would like to query the state from rf reduce function and I would like my stream to be written to Kafka. If I use like below I am able to do so but it seems to me that there is duplicate state storing as well as duplicate work is being done. Is there any alternate for what I am trying to achieve this? Thanks public class reducetest { public static void main(String[] args) throws Exception { // set up streaming execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); List<Integer> nums = Arrays.asList(1,2,3,4,5,6,7,8,9,10); ReducingStateDescriptor<Tuple2<Integer, Integer>> descriptor = new ReducingStateDescriptor<Tuple2<Integer, Integer>>( "sum", // the state name new rf(), TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {})); // type information descriptor.setQueryable("reduce"); DataStream<Tuple2<Integer, Integer>> ds1 = env.fromCollection(nums).map(new MapFunction<Integer, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> map(Integer integer) throws Exception { return Tuple2.of(integer%2,integer); } }).keyBy(0) ; ((KeyedStream) ds1).asQueryableState("reduce", descriptor); DataStream<Integer> ds2 = ((KeyedStream) ds1).reduce( descriptor.getReduceFunction()); //ds1.print(); ds2.print(); System.out.println(env.getExecutionPlan()); env.execute("re"); } static class rf implements ReduceFunction<Tuple2<Integer, Integer>> { @Override public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> e, Tuple2<Integer, Integer> n) throws Exception { return Tuple2.of(e.f0, e.f1 + n.f1); } } }