I'm curious to know what people think about the OutputTag API for the
general side-output implementation?

One thing that might easily go overlooked is that I changed ProcessFunction
from an interface to an abstract class. So that I could provide a default
onTime() method. This also would require allowing ProcessFunction on a
non-keyed stream, as I mentioned in my first mail (I hope).

On Mon, 27 Feb 2017 at 17:45 Aljoscha Krettek <aljos...@apache.org> wrote:

> @Jamie I must have mistyped my last API proposal. This piece of code:
> WindowedOperator<T> windowedResult = input
>   .keyBy(...)
>   .window(...)
>   .apply(...)
>
> DataStream<IN> lateData = windowedResult.getSideOutput();
>
> should actually have been:
>
> WindowedOperator<T> windowedResult = input
>   .keyBy(...)
>   .window(...)
>   .apply(...)
>
> DataStream<IN> lateData = windowedResult.getLateDataSideOutput();
>
> So apart from the naming it's pretty much the same as your suggestion,
> right? The reason why I preferred the explicit OutputTag is that we
> otherwise have to create another layer of OutputTags that are internal to
> the system so that users cannot accidentally also send data to the same
> side output. It just means writing more code for use and introducing the
> more concrete return type for the WindowedStream operations. But that's
> fine if y'all prefer that variant. :-)
>
> On Sat, 25 Feb 2017 at 04:19 Chen Qin <qinnc...@gmail.com> wrote:
>
> Hi Jamie,
>
> I think it does make consuming late arriving events more explicit! At cost
> of
> fix a predefined OutputTag<IN> which user have no control nor definition
> an extra UDF which essentially filter out all mainOutputs and only let
> sideOutput pass (like filterFunction)
>
> Thanks,
> Chen
>
> > On Feb 24, 2017, at 1:17 PM, Jamie Grier <ja...@data-artisans.com>
> wrote:
> >
> > I prefer the ProcessFunction and side outputs solution over split() and
> > select() which I've never liked primarily due to the lack of type safety
> > and it also doesn't really seem to fit with the rest of Flink's API.
> >
> > On the late data question I strongly prefer the late data concept being
> > explicit in the API.  Could we not also do something like:
> >
> > WindowedStream<> windowedStream = input
> >  .keyBy(...)
> >  .window(...);
> >
> > DataStream<> mainOutput = windowedStream
> >  .apply(...);
> >
> > DataStream<> lateOutput = windowStream
> >  .lateStream();
> >
> >
> >
> >
> >
> >
> >
> > On Thu, Feb 23, 2017 at 7:08 AM, Gyula Fóra <gyf...@apache.org> wrote:
> >
> >> Hi,
> >>
> >> Thanks for the nice proposal, I like the idea of side outputs, and it
> would
> >> make a lot of topologies much simpler.
> >>
> >> Regarding the API I think we should come up with a way of making side
> >> otuputs accessible from all sort of operators in a similar way. For
> >> instance through the RichFunction interface with a special collector
> that
> >> we invalidate when the user should not be collecting to it. (just a
> quick
> >> idea)
> >>
> >> I personally wouldn't deprecate the "universal" Split/Select API that
> can
> >> be used on any  DataStream in favor of functionality that is only
> >> accessible trhough the process function/ or a few select operators. I
> think
> >> the Split/Select pattern is also very nice and I use it in many
> different
> >> contexts to get efficient multiway filtering (after map/co operators for
> >> examples).
> >>
> >> Regards,
> >> Gyula
> >>
> >> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2017. febr.
> 23.,
> >> Cs, 15:42):
> >>
> >>> Hi Folks,
> >>> Chen and I have been working for a while now on making FLIP-13 (side
> >>> outputs) [1] a reality. We think we have a pretty good internal
> >>> implementation and also a proposal for an API but now we need to
> discuss
> >>> how we want to go forward with this, especially how we should deal with
> >>> split/select which does some of the same things side outputs can do.
> I'll
> >>> first quickly describe what the split/select API looks like, so that
> >> we're
> >>> all on the same page. Then I'll present the new proposed side output
> API
> >>> and then I'll present new API for getting dropped late data from a
> >> windowed
> >>> operation, which was the original motivation for adding side outputs.
> >>>
> >>> Split/select consists of two API calls:
> DataStream.split(OutputSelector)
> >>> and SplitStream.select(). You can use it like this:
> >>>
> >>> DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
> >>>
> >>> final String EVEN_SELECTOR = "even";
> >>> final String ODD_SELECTOR = "odd";
> >>>
> >>> SplitStream<Integer> split = input.split(
> >>>        new OutputSelector<Integer>() {
> >>>            @Override
> >>>            public Iterable<String> select(Integer value) {
> >>>                if (value % 2 == 0) {
> >>>                    return Collections.singleton(EVEN_SELECTOR);
> >>>                } else {
> >>>                    return Collections.singleton(ODD_SELECTOR);
> >>>                }
> >>>            }
> >>>        });
> >>>
> >>> DataStream<Integer> evenStream = split.select(EVEN_SELECTOR);
> >>> DataStream<Integer> oddStream = split.select(ODD_SELECTOR);
> >>>
> >>> The stream is split according to an OutputSelector that returns an
> >> Iterable
> >>> of Strings. Then you can use select() to get a new stream that only
> >>> contains elements with the given selector. Notice how the element type
> >> for
> >>> all the split streams is the same.
> >>>
> >>> The new side output API proposal adds a new type OutputTag<T> and
> relies
> >> on
> >>> extending ProcessFunction to allow emitting data to outputs besides the
> >>> main output. I think it's best explained with an example as well:
> >>>
> >>> DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
> >>>
> >>> final OutputTag<String> sideOutput1 = new
> OutputTag<>("side-output-1"){}
> >> ;
> >>> final OutputTag<Integer> sideOutput2 = new
> OutputTag<>("side-output-2"){}
> >> ;
> >>>
> >>> SingleOutputStreamOperator<String> mainOutputStream = input
> >>>        .process(new ProcessFunction<Integer, String>() {
> >>>
> >>>            @Override
> >>>            public void processElement(
> >>>                    Integer value,
> >>>                    Context ctx,
> >>>                    Collector<String> out) throws Exception {
> >>>
> >>>                ctx.output(sideOutput1, "WE GOT: " + value);
> >>>                ctx.output(sideOutput2, value);
> >>>                out.collect("MAIN OUTPUT: " + value);
> >>>            }
> >>>
> >>>        });
> >>>
> >>> DataStream<String> sideOutputStream1 =
> >>> mainOutputStream.getSideOutput(sideOutput1);
> >>> DataStream<Integer> sideOutputStream2 =
> >>> mainOutputStream.getSideOutput(sideOutput2);
> >>>
> >>> Notice how the OutputTags are anonymous inner classes, similar to
> >> TypeHint.
> >>> We need this to be able to analyse the type of the side-output streams.
> >>> Also notice, how the types of the side-output streams can be
> independent
> >> of
> >>> the main-output stream, also notice how everything is correctly type
> >>> checked by the Java Compiler.
> >>>
> >>> This change requires making ProcessFunction an abstract base class so
> >> that
> >>> not every user has to implement the onTimer() method. We would also
> need
> >> to
> >>> allow ProcessFunction on a non-keyed stream.
> >>>
> >>> Chen also implemented an API based on FlatMapFunction that looks like
> the
> >>> one proposed in the FLIP. This relies on CollectorWrapper, which can be
> >>> used to "pimp" a Collector to also allow emitting to side outputs.
> >>>
> >>> For WindowedStream we have two proposals: make OutputTag visible on the
> >>> WindowedStream API or make the result type of WindowedStream operations
> >>> more specific to allow a getDroppedDataSideOutput() method. For the
> first
> >>> proposal it would look like this:
> >>>
> >>> final OutputTag<String> lateDataTag = new
> OutputTag<>("side-output-1"){}
> >> ;
> >>>
> >>> DataStream<T> windowedResult = input
> >>>  .keyBy(...)
> >>>  .window(...)
> >>>  .sideOutputLateData(lateDataTag)
> >>>  .apply(...)
> >>>
> >>> DataStream<IN> lateData = windowedResult.getSideOutput(lateDataTag);
> >>>
> >>> For the second proposal it would look like this:
> >>>
> >>> WindowedOperator<T> windowedResult = input
> >>>  .keyBy(...)
> >>>  .window(...)
> >>>  .apply(...)
> >>>
> >>> DataStream<IN> lateData = windowedResult.getSideOutput();
> >>>
> >>> Right now, the result of window operations is a
> >>> SingleOutputStreamOperator<T>, same as it is for all DataStream
> >> operations.
> >>> Making the result type more specific, i.e. a WindowedOperator, would
> >> allow
> >>> us to add extra methods there. This would require wrapping a
> >>> SingleOutputStreamOperator and forwarding all the method calls to the
> >>> wrapped operator which can be a bit of a hassle for future changes. The
> >>> first proposal requires additional boilerplate code.
> >>>
> >>> Sorry for the long mail but I think it's necessary to get everyone on
> the
> >>> same page. The question is now: how should we proceed with the proposed
> >> API
> >>> and the old split/select API? I propose to deprecate split/select and
> >> only
> >>> have side outputs, going forward. Of course, I'm a bit biased on this.
> >> ;-)
> >>> If we decide to do this, we also need to decide on what the side output
> >> API
> >>> should look like.
> >>>
> >>> Happy discussing! Feedback very welcome. :-)
> >>>
> >>> Best,
> >>> Aljoscha
> >>>
> >>> [1]
> >>>
> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> >> 13+Side+Outputs+in+Flink
> >>>
> >>
> >
> >
> >
> > --
> >
> > Jamie Grier
> > data Artisans, Director of Applications Engineering
> > @jamiegrier <https://twitter.com/jamiegrier>
> > ja...@data-artisans.com
>
>

Reply via email to