Hi, Thanks for the nice proposal, I like the idea of side outputs, and it would make a lot of topologies much simpler.
Regarding the API I think we should come up with a way of making side otuputs accessible from all sort of operators in a similar way. For instance through the RichFunction interface with a special collector that we invalidate when the user should not be collecting to it. (just a quick idea) I personally wouldn't deprecate the "universal" Split/Select API that can be used on any DataStream in favor of functionality that is only accessible trhough the process function/ or a few select operators. I think the Split/Select pattern is also very nice and I use it in many different contexts to get efficient multiway filtering (after map/co operators for examples). Regards, Gyula Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2017. febr. 23., Cs, 15:42): > Hi Folks, > Chen and I have been working for a while now on making FLIP-13 (side > outputs) [1] a reality. We think we have a pretty good internal > implementation and also a proposal for an API but now we need to discuss > how we want to go forward with this, especially how we should deal with > split/select which does some of the same things side outputs can do. I'll > first quickly describe what the split/select API looks like, so that we're > all on the same page. Then I'll present the new proposed side output API > and then I'll present new API for getting dropped late data from a windowed > operation, which was the original motivation for adding side outputs. > > Split/select consists of two API calls: DataStream.split(OutputSelector) > and SplitStream.select(). You can use it like this: > > DataStreamSource<Integer> input = env.fromElements(1, 2, 3); > > final String EVEN_SELECTOR = "even"; > final String ODD_SELECTOR = "odd"; > > SplitStream<Integer> split = input.split( > new OutputSelector<Integer>() { > @Override > public Iterable<String> select(Integer value) { > if (value % 2 == 0) { > return Collections.singleton(EVEN_SELECTOR); > } else { > return Collections.singleton(ODD_SELECTOR); > } > } > }); > > DataStream<Integer> evenStream = split.select(EVEN_SELECTOR); > DataStream<Integer> oddStream = split.select(ODD_SELECTOR); > > The stream is split according to an OutputSelector that returns an Iterable > of Strings. Then you can use select() to get a new stream that only > contains elements with the given selector. Notice how the element type for > all the split streams is the same. > > The new side output API proposal adds a new type OutputTag<T> and relies on > extending ProcessFunction to allow emitting data to outputs besides the > main output. I think it's best explained with an example as well: > > DataStreamSource<Integer> input = env.fromElements(1, 2, 3); > > final OutputTag<String> sideOutput1 = new OutputTag<>("side-output-1"){}; > final OutputTag<Integer> sideOutput2 = new OutputTag<>("side-output-2"){}; > > SingleOutputStreamOperator<String> mainOutputStream = input > .process(new ProcessFunction<Integer, String>() { > > @Override > public void processElement( > Integer value, > Context ctx, > Collector<String> out) throws Exception { > > ctx.output(sideOutput1, "WE GOT: " + value); > ctx.output(sideOutput2, value); > out.collect("MAIN OUTPUT: " + value); > } > > }); > > DataStream<String> sideOutputStream1 = > mainOutputStream.getSideOutput(sideOutput1); > DataStream<Integer> sideOutputStream2 = > mainOutputStream.getSideOutput(sideOutput2); > > Notice how the OutputTags are anonymous inner classes, similar to TypeHint. > We need this to be able to analyse the type of the side-output streams. > Also notice, how the types of the side-output streams can be independent of > the main-output stream, also notice how everything is correctly type > checked by the Java Compiler. > > This change requires making ProcessFunction an abstract base class so that > not every user has to implement the onTimer() method. We would also need to > allow ProcessFunction on a non-keyed stream. > > Chen also implemented an API based on FlatMapFunction that looks like the > one proposed in the FLIP. This relies on CollectorWrapper, which can be > used to "pimp" a Collector to also allow emitting to side outputs. > > For WindowedStream we have two proposals: make OutputTag visible on the > WindowedStream API or make the result type of WindowedStream operations > more specific to allow a getDroppedDataSideOutput() method. For the first > proposal it would look like this: > > final OutputTag<String> lateDataTag = new OutputTag<>("side-output-1"){}; > > DataStream<T> windowedResult = input > .keyBy(...) > .window(...) > .sideOutputLateData(lateDataTag) > .apply(...) > > DataStream<IN> lateData = windowedResult.getSideOutput(lateDataTag); > > For the second proposal it would look like this: > > WindowedOperator<T> windowedResult = input > .keyBy(...) > .window(...) > .apply(...) > > DataStream<IN> lateData = windowedResult.getSideOutput(); > > Right now, the result of window operations is a > SingleOutputStreamOperator<T>, same as it is for all DataStream operations. > Making the result type more specific, i.e. a WindowedOperator, would allow > us to add extra methods there. This would require wrapping a > SingleOutputStreamOperator and forwarding all the method calls to the > wrapped operator which can be a bit of a hassle for future changes. The > first proposal requires additional boilerplate code. > > Sorry for the long mail but I think it's necessary to get everyone on the > same page. The question is now: how should we proceed with the proposed API > and the old split/select API? I propose to deprecate split/select and only > have side outputs, going forward. Of course, I'm a bit biased on this. ;-) > If we decide to do this, we also need to decide on what the side output API > should look like. > > Happy discussing! Feedback very welcome. :-) > > Best, > Aljoscha > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-13+Side+Outputs+in+Flink >