I'm curious to know what people think about the OutputTag API for the general side-output implementation?
One thing that might easily go overlooked is that I changed ProcessFunction from an interface to an abstract class. So that I could provide a default onTime() method. This also would require allowing ProcessFunction on a non-keyed stream, as I mentioned in my first mail (I hope). On Mon, 27 Feb 2017 at 17:45 Aljoscha Krettek <aljos...@apache.org> wrote: > @Jamie I must have mistyped my last API proposal. This piece of code: > WindowedOperator<T> windowedResult = input > .keyBy(...) > .window(...) > .apply(...) > > DataStream<IN> lateData = windowedResult.getSideOutput(); > > should actually have been: > > WindowedOperator<T> windowedResult = input > .keyBy(...) > .window(...) > .apply(...) > > DataStream<IN> lateData = windowedResult.getLateDataSideOutput(); > > So apart from the naming it's pretty much the same as your suggestion, > right? The reason why I preferred the explicit OutputTag is that we > otherwise have to create another layer of OutputTags that are internal to > the system so that users cannot accidentally also send data to the same > side output. It just means writing more code for use and introducing the > more concrete return type for the WindowedStream operations. But that's > fine if y'all prefer that variant. :-) > > On Sat, 25 Feb 2017 at 04:19 Chen Qin <qinnc...@gmail.com> wrote: > > Hi Jamie, > > I think it does make consuming late arriving events more explicit! At cost > of > fix a predefined OutputTag<IN> which user have no control nor definition > an extra UDF which essentially filter out all mainOutputs and only let > sideOutput pass (like filterFunction) > > Thanks, > Chen > > > On Feb 24, 2017, at 1:17 PM, Jamie Grier <ja...@data-artisans.com> > wrote: > > > > I prefer the ProcessFunction and side outputs solution over split() and > > select() which I've never liked primarily due to the lack of type safety > > and it also doesn't really seem to fit with the rest of Flink's API. > > > > On the late data question I strongly prefer the late data concept being > > explicit in the API. Could we not also do something like: > > > > WindowedStream<> windowedStream = input > > .keyBy(...) > > .window(...); > > > > DataStream<> mainOutput = windowedStream > > .apply(...); > > > > DataStream<> lateOutput = windowStream > > .lateStream(); > > > > > > > > > > > > > > > > On Thu, Feb 23, 2017 at 7:08 AM, Gyula Fóra <gyf...@apache.org> wrote: > > > >> Hi, > >> > >> Thanks for the nice proposal, I like the idea of side outputs, and it > would > >> make a lot of topologies much simpler. > >> > >> Regarding the API I think we should come up with a way of making side > >> otuputs accessible from all sort of operators in a similar way. For > >> instance through the RichFunction interface with a special collector > that > >> we invalidate when the user should not be collecting to it. (just a > quick > >> idea) > >> > >> I personally wouldn't deprecate the "universal" Split/Select API that > can > >> be used on any DataStream in favor of functionality that is only > >> accessible trhough the process function/ or a few select operators. I > think > >> the Split/Select pattern is also very nice and I use it in many > different > >> contexts to get efficient multiway filtering (after map/co operators for > >> examples). > >> > >> Regards, > >> Gyula > >> > >> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2017. febr. > 23., > >> Cs, 15:42): > >> > >>> Hi Folks, > >>> Chen and I have been working for a while now on making FLIP-13 (side > >>> outputs) [1] a reality. We think we have a pretty good internal > >>> implementation and also a proposal for an API but now we need to > discuss > >>> how we want to go forward with this, especially how we should deal with > >>> split/select which does some of the same things side outputs can do. > I'll > >>> first quickly describe what the split/select API looks like, so that > >> we're > >>> all on the same page. Then I'll present the new proposed side output > API > >>> and then I'll present new API for getting dropped late data from a > >> windowed > >>> operation, which was the original motivation for adding side outputs. > >>> > >>> Split/select consists of two API calls: > DataStream.split(OutputSelector) > >>> and SplitStream.select(). You can use it like this: > >>> > >>> DataStreamSource<Integer> input = env.fromElements(1, 2, 3); > >>> > >>> final String EVEN_SELECTOR = "even"; > >>> final String ODD_SELECTOR = "odd"; > >>> > >>> SplitStream<Integer> split = input.split( > >>> new OutputSelector<Integer>() { > >>> @Override > >>> public Iterable<String> select(Integer value) { > >>> if (value % 2 == 0) { > >>> return Collections.singleton(EVEN_SELECTOR); > >>> } else { > >>> return Collections.singleton(ODD_SELECTOR); > >>> } > >>> } > >>> }); > >>> > >>> DataStream<Integer> evenStream = split.select(EVEN_SELECTOR); > >>> DataStream<Integer> oddStream = split.select(ODD_SELECTOR); > >>> > >>> The stream is split according to an OutputSelector that returns an > >> Iterable > >>> of Strings. Then you can use select() to get a new stream that only > >>> contains elements with the given selector. Notice how the element type > >> for > >>> all the split streams is the same. > >>> > >>> The new side output API proposal adds a new type OutputTag<T> and > relies > >> on > >>> extending ProcessFunction to allow emitting data to outputs besides the > >>> main output. I think it's best explained with an example as well: > >>> > >>> DataStreamSource<Integer> input = env.fromElements(1, 2, 3); > >>> > >>> final OutputTag<String> sideOutput1 = new > OutputTag<>("side-output-1"){} > >> ; > >>> final OutputTag<Integer> sideOutput2 = new > OutputTag<>("side-output-2"){} > >> ; > >>> > >>> SingleOutputStreamOperator<String> mainOutputStream = input > >>> .process(new ProcessFunction<Integer, String>() { > >>> > >>> @Override > >>> public void processElement( > >>> Integer value, > >>> Context ctx, > >>> Collector<String> out) throws Exception { > >>> > >>> ctx.output(sideOutput1, "WE GOT: " + value); > >>> ctx.output(sideOutput2, value); > >>> out.collect("MAIN OUTPUT: " + value); > >>> } > >>> > >>> }); > >>> > >>> DataStream<String> sideOutputStream1 = > >>> mainOutputStream.getSideOutput(sideOutput1); > >>> DataStream<Integer> sideOutputStream2 = > >>> mainOutputStream.getSideOutput(sideOutput2); > >>> > >>> Notice how the OutputTags are anonymous inner classes, similar to > >> TypeHint. > >>> We need this to be able to analyse the type of the side-output streams. > >>> Also notice, how the types of the side-output streams can be > independent > >> of > >>> the main-output stream, also notice how everything is correctly type > >>> checked by the Java Compiler. > >>> > >>> This change requires making ProcessFunction an abstract base class so > >> that > >>> not every user has to implement the onTimer() method. We would also > need > >> to > >>> allow ProcessFunction on a non-keyed stream. > >>> > >>> Chen also implemented an API based on FlatMapFunction that looks like > the > >>> one proposed in the FLIP. This relies on CollectorWrapper, which can be > >>> used to "pimp" a Collector to also allow emitting to side outputs. > >>> > >>> For WindowedStream we have two proposals: make OutputTag visible on the > >>> WindowedStream API or make the result type of WindowedStream operations > >>> more specific to allow a getDroppedDataSideOutput() method. For the > first > >>> proposal it would look like this: > >>> > >>> final OutputTag<String> lateDataTag = new > OutputTag<>("side-output-1"){} > >> ; > >>> > >>> DataStream<T> windowedResult = input > >>> .keyBy(...) > >>> .window(...) > >>> .sideOutputLateData(lateDataTag) > >>> .apply(...) > >>> > >>> DataStream<IN> lateData = windowedResult.getSideOutput(lateDataTag); > >>> > >>> For the second proposal it would look like this: > >>> > >>> WindowedOperator<T> windowedResult = input > >>> .keyBy(...) > >>> .window(...) > >>> .apply(...) > >>> > >>> DataStream<IN> lateData = windowedResult.getSideOutput(); > >>> > >>> Right now, the result of window operations is a > >>> SingleOutputStreamOperator<T>, same as it is for all DataStream > >> operations. > >>> Making the result type more specific, i.e. a WindowedOperator, would > >> allow > >>> us to add extra methods there. This would require wrapping a > >>> SingleOutputStreamOperator and forwarding all the method calls to the > >>> wrapped operator which can be a bit of a hassle for future changes. The > >>> first proposal requires additional boilerplate code. > >>> > >>> Sorry for the long mail but I think it's necessary to get everyone on > the > >>> same page. The question is now: how should we proceed with the proposed > >> API > >>> and the old split/select API? I propose to deprecate split/select and > >> only > >>> have side outputs, going forward. Of course, I'm a bit biased on this. > >> ;-) > >>> If we decide to do this, we also need to decide on what the side output > >> API > >>> should look like. > >>> > >>> Happy discussing! Feedback very welcome. :-) > >>> > >>> Best, > >>> Aljoscha > >>> > >>> [1] > >>> > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP- > >> 13+Side+Outputs+in+Flink > >>> > >> > > > > > > > > -- > > > > Jamie Grier > > data Artisans, Director of Applications Engineering > > @jamiegrier <https://twitter.com/jamiegrier> > > ja...@data-artisans.com > >