I like this proposal! - The side output API seems more powerful, because it allows different output types.
- It would be nice to eventually have only one construct, because multiple variants for the same thing tend to confuse users. - One can probably implement split/select with side outputs as a special case, where instead of "select(string)" one has "select(tag)". - As a migration step, we can keep a deprecated "select(string)" and make it use a tag that is just the result type of the stream - For the window operator, I like the second variant better, which does not require users to explicitly declare a tag On Mon, Feb 27, 2017 at 5:47 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > I'm curious to know what people think about the OutputTag API for the > general side-output implementation? > > One thing that might easily go overlooked is that I changed ProcessFunction > from an interface to an abstract class. So that I could provide a default > onTime() method. This also would require allowing ProcessFunction on a > non-keyed stream, as I mentioned in my first mail (I hope). > > On Mon, 27 Feb 2017 at 17:45 Aljoscha Krettek <aljos...@apache.org> wrote: > > > @Jamie I must have mistyped my last API proposal. This piece of code: > > WindowedOperator<T> windowedResult = input > > .keyBy(...) > > .window(...) > > .apply(...) > > > > DataStream<IN> lateData = windowedResult.getSideOutput(); > > > > should actually have been: > > > > WindowedOperator<T> windowedResult = input > > .keyBy(...) > > .window(...) > > .apply(...) > > > > DataStream<IN> lateData = windowedResult.getLateDataSideOutput(); > > > > So apart from the naming it's pretty much the same as your suggestion, > > right? The reason why I preferred the explicit OutputTag is that we > > otherwise have to create another layer of OutputTags that are internal to > > the system so that users cannot accidentally also send data to the same > > side output. It just means writing more code for use and introducing the > > more concrete return type for the WindowedStream operations. But that's > > fine if y'all prefer that variant. :-) > > > > On Sat, 25 Feb 2017 at 04:19 Chen Qin <qinnc...@gmail.com> wrote: > > > > Hi Jamie, > > > > I think it does make consuming late arriving events more explicit! At > cost > > of > > fix a predefined OutputTag<IN> which user have no control nor definition > > an extra UDF which essentially filter out all mainOutputs and only let > > sideOutput pass (like filterFunction) > > > > Thanks, > > Chen > > > > > On Feb 24, 2017, at 1:17 PM, Jamie Grier <ja...@data-artisans.com> > > wrote: > > > > > > I prefer the ProcessFunction and side outputs solution over split() and > > > select() which I've never liked primarily due to the lack of type > safety > > > and it also doesn't really seem to fit with the rest of Flink's API. > > > > > > On the late data question I strongly prefer the late data concept being > > > explicit in the API. Could we not also do something like: > > > > > > WindowedStream<> windowedStream = input > > > .keyBy(...) > > > .window(...); > > > > > > DataStream<> mainOutput = windowedStream > > > .apply(...); > > > > > > DataStream<> lateOutput = windowStream > > > .lateStream(); > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Feb 23, 2017 at 7:08 AM, Gyula Fóra <gyf...@apache.org> wrote: > > > > > >> Hi, > > >> > > >> Thanks for the nice proposal, I like the idea of side outputs, and it > > would > > >> make a lot of topologies much simpler. > > >> > > >> Regarding the API I think we should come up with a way of making side > > >> otuputs accessible from all sort of operators in a similar way. For > > >> instance through the RichFunction interface with a special collector > > that > > >> we invalidate when the user should not be collecting to it. (just a > > quick > > >> idea) > > >> > > >> I personally wouldn't deprecate the "universal" Split/Select API that > > can > > >> be used on any DataStream in favor of functionality that is only > > >> accessible trhough the process function/ or a few select operators. I > > think > > >> the Split/Select pattern is also very nice and I use it in many > > different > > >> contexts to get efficient multiway filtering (after map/co operators > for > > >> examples). > > >> > > >> Regards, > > >> Gyula > > >> > > >> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2017. febr. > > 23., > > >> Cs, 15:42): > > >> > > >>> Hi Folks, > > >>> Chen and I have been working for a while now on making FLIP-13 (side > > >>> outputs) [1] a reality. We think we have a pretty good internal > > >>> implementation and also a proposal for an API but now we need to > > discuss > > >>> how we want to go forward with this, especially how we should deal > with > > >>> split/select which does some of the same things side outputs can do. > > I'll > > >>> first quickly describe what the split/select API looks like, so that > > >> we're > > >>> all on the same page. Then I'll present the new proposed side output > > API > > >>> and then I'll present new API for getting dropped late data from a > > >> windowed > > >>> operation, which was the original motivation for adding side outputs. > > >>> > > >>> Split/select consists of two API calls: > > DataStream.split(OutputSelector) > > >>> and SplitStream.select(). You can use it like this: > > >>> > > >>> DataStreamSource<Integer> input = env.fromElements(1, 2, 3); > > >>> > > >>> final String EVEN_SELECTOR = "even"; > > >>> final String ODD_SELECTOR = "odd"; > > >>> > > >>> SplitStream<Integer> split = input.split( > > >>> new OutputSelector<Integer>() { > > >>> @Override > > >>> public Iterable<String> select(Integer value) { > > >>> if (value % 2 == 0) { > > >>> return Collections.singleton(EVEN_SELECTOR); > > >>> } else { > > >>> return Collections.singleton(ODD_SELECTOR); > > >>> } > > >>> } > > >>> }); > > >>> > > >>> DataStream<Integer> evenStream = split.select(EVEN_SELECTOR); > > >>> DataStream<Integer> oddStream = split.select(ODD_SELECTOR); > > >>> > > >>> The stream is split according to an OutputSelector that returns an > > >> Iterable > > >>> of Strings. Then you can use select() to get a new stream that only > > >>> contains elements with the given selector. Notice how the element > type > > >> for > > >>> all the split streams is the same. > > >>> > > >>> The new side output API proposal adds a new type OutputTag<T> and > > relies > > >> on > > >>> extending ProcessFunction to allow emitting data to outputs besides > the > > >>> main output. I think it's best explained with an example as well: > > >>> > > >>> DataStreamSource<Integer> input = env.fromElements(1, 2, 3); > > >>> > > >>> final OutputTag<String> sideOutput1 = new > > OutputTag<>("side-output-1"){} > > >> ; > > >>> final OutputTag<Integer> sideOutput2 = new > > OutputTag<>("side-output-2"){} > > >> ; > > >>> > > >>> SingleOutputStreamOperator<String> mainOutputStream = input > > >>> .process(new ProcessFunction<Integer, String>() { > > >>> > > >>> @Override > > >>> public void processElement( > > >>> Integer value, > > >>> Context ctx, > > >>> Collector<String> out) throws Exception { > > >>> > > >>> ctx.output(sideOutput1, "WE GOT: " + value); > > >>> ctx.output(sideOutput2, value); > > >>> out.collect("MAIN OUTPUT: " + value); > > >>> } > > >>> > > >>> }); > > >>> > > >>> DataStream<String> sideOutputStream1 = > > >>> mainOutputStream.getSideOutput(sideOutput1); > > >>> DataStream<Integer> sideOutputStream2 = > > >>> mainOutputStream.getSideOutput(sideOutput2); > > >>> > > >>> Notice how the OutputTags are anonymous inner classes, similar to > > >> TypeHint. > > >>> We need this to be able to analyse the type of the side-output > streams. > > >>> Also notice, how the types of the side-output streams can be > > independent > > >> of > > >>> the main-output stream, also notice how everything is correctly type > > >>> checked by the Java Compiler. > > >>> > > >>> This change requires making ProcessFunction an abstract base class so > > >> that > > >>> not every user has to implement the onTimer() method. We would also > > need > > >> to > > >>> allow ProcessFunction on a non-keyed stream. > > >>> > > >>> Chen also implemented an API based on FlatMapFunction that looks like > > the > > >>> one proposed in the FLIP. This relies on CollectorWrapper, which can > be > > >>> used to "pimp" a Collector to also allow emitting to side outputs. > > >>> > > >>> For WindowedStream we have two proposals: make OutputTag visible on > the > > >>> WindowedStream API or make the result type of WindowedStream > operations > > >>> more specific to allow a getDroppedDataSideOutput() method. For the > > first > > >>> proposal it would look like this: > > >>> > > >>> final OutputTag<String> lateDataTag = new > > OutputTag<>("side-output-1"){} > > >> ; > > >>> > > >>> DataStream<T> windowedResult = input > > >>> .keyBy(...) > > >>> .window(...) > > >>> .sideOutputLateData(lateDataTag) > > >>> .apply(...) > > >>> > > >>> DataStream<IN> lateData = windowedResult.getSideOutput(lateDataTag); > > >>> > > >>> For the second proposal it would look like this: > > >>> > > >>> WindowedOperator<T> windowedResult = input > > >>> .keyBy(...) > > >>> .window(...) > > >>> .apply(...) > > >>> > > >>> DataStream<IN> lateData = windowedResult.getSideOutput(); > > >>> > > >>> Right now, the result of window operations is a > > >>> SingleOutputStreamOperator<T>, same as it is for all DataStream > > >> operations. > > >>> Making the result type more specific, i.e. a WindowedOperator, would > > >> allow > > >>> us to add extra methods there. This would require wrapping a > > >>> SingleOutputStreamOperator and forwarding all the method calls to the > > >>> wrapped operator which can be a bit of a hassle for future changes. > The > > >>> first proposal requires additional boilerplate code. > > >>> > > >>> Sorry for the long mail but I think it's necessary to get everyone on > > the > > >>> same page. The question is now: how should we proceed with the > proposed > > >> API > > >>> and the old split/select API? I propose to deprecate split/select and > > >> only > > >>> have side outputs, going forward. Of course, I'm a bit biased on > this. > > >> ;-) > > >>> If we decide to do this, we also need to decide on what the side > output > > >> API > > >>> should look like. > > >>> > > >>> Happy discussing! Feedback very welcome. :-) > > >>> > > >>> Best, > > >>> Aljoscha > > >>> > > >>> [1] > > >>> > > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP- > > >> 13+Side+Outputs+in+Flink > > >>> > > >> > > > > > > > > > > > > -- > > > > > > Jamie Grier > > > data Artisans, Director of Applications Engineering > > > @jamiegrier <https://twitter.com/jamiegrier> > > > ja...@data-artisans.com > > > > >