I like this proposal!

  - The side output API seems more powerful, because it allows different
output types.

  - It would be nice to eventually have only one construct, because
multiple variants for the same thing tend to confuse users.
  - One can probably implement split/select with side outputs as a special
case, where instead of "select(string)" one has "select(tag)".
  - As a migration step, we can keep a deprecated "select(string)" and make
it use a tag that is just the result type of the stream

  - For the window operator, I like the second variant better, which does
not require users to explicitly declare a tag



On Mon, Feb 27, 2017 at 5:47 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> I'm curious to know what people think about the OutputTag API for the
> general side-output implementation?
>
> One thing that might easily go overlooked is that I changed ProcessFunction
> from an interface to an abstract class. So that I could provide a default
> onTime() method. This also would require allowing ProcessFunction on a
> non-keyed stream, as I mentioned in my first mail (I hope).
>
> On Mon, 27 Feb 2017 at 17:45 Aljoscha Krettek <aljos...@apache.org> wrote:
>
> > @Jamie I must have mistyped my last API proposal. This piece of code:
> > WindowedOperator<T> windowedResult = input
> >   .keyBy(...)
> >   .window(...)
> >   .apply(...)
> >
> > DataStream<IN> lateData = windowedResult.getSideOutput();
> >
> > should actually have been:
> >
> > WindowedOperator<T> windowedResult = input
> >   .keyBy(...)
> >   .window(...)
> >   .apply(...)
> >
> > DataStream<IN> lateData = windowedResult.getLateDataSideOutput();
> >
> > So apart from the naming it's pretty much the same as your suggestion,
> > right? The reason why I preferred the explicit OutputTag is that we
> > otherwise have to create another layer of OutputTags that are internal to
> > the system so that users cannot accidentally also send data to the same
> > side output. It just means writing more code for use and introducing the
> > more concrete return type for the WindowedStream operations. But that's
> > fine if y'all prefer that variant. :-)
> >
> > On Sat, 25 Feb 2017 at 04:19 Chen Qin <qinnc...@gmail.com> wrote:
> >
> > Hi Jamie,
> >
> > I think it does make consuming late arriving events more explicit! At
> cost
> > of
> > fix a predefined OutputTag<IN> which user have no control nor definition
> > an extra UDF which essentially filter out all mainOutputs and only let
> > sideOutput pass (like filterFunction)
> >
> > Thanks,
> > Chen
> >
> > > On Feb 24, 2017, at 1:17 PM, Jamie Grier <ja...@data-artisans.com>
> > wrote:
> > >
> > > I prefer the ProcessFunction and side outputs solution over split() and
> > > select() which I've never liked primarily due to the lack of type
> safety
> > > and it also doesn't really seem to fit with the rest of Flink's API.
> > >
> > > On the late data question I strongly prefer the late data concept being
> > > explicit in the API.  Could we not also do something like:
> > >
> > > WindowedStream<> windowedStream = input
> > >  .keyBy(...)
> > >  .window(...);
> > >
> > > DataStream<> mainOutput = windowedStream
> > >  .apply(...);
> > >
> > > DataStream<> lateOutput = windowStream
> > >  .lateStream();
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Thu, Feb 23, 2017 at 7:08 AM, Gyula Fóra <gyf...@apache.org> wrote:
> > >
> > >> Hi,
> > >>
> > >> Thanks for the nice proposal, I like the idea of side outputs, and it
> > would
> > >> make a lot of topologies much simpler.
> > >>
> > >> Regarding the API I think we should come up with a way of making side
> > >> otuputs accessible from all sort of operators in a similar way. For
> > >> instance through the RichFunction interface with a special collector
> > that
> > >> we invalidate when the user should not be collecting to it. (just a
> > quick
> > >> idea)
> > >>
> > >> I personally wouldn't deprecate the "universal" Split/Select API that
> > can
> > >> be used on any  DataStream in favor of functionality that is only
> > >> accessible trhough the process function/ or a few select operators. I
> > think
> > >> the Split/Select pattern is also very nice and I use it in many
> > different
> > >> contexts to get efficient multiway filtering (after map/co operators
> for
> > >> examples).
> > >>
> > >> Regards,
> > >> Gyula
> > >>
> > >> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2017. febr.
> > 23.,
> > >> Cs, 15:42):
> > >>
> > >>> Hi Folks,
> > >>> Chen and I have been working for a while now on making FLIP-13 (side
> > >>> outputs) [1] a reality. We think we have a pretty good internal
> > >>> implementation and also a proposal for an API but now we need to
> > discuss
> > >>> how we want to go forward with this, especially how we should deal
> with
> > >>> split/select which does some of the same things side outputs can do.
> > I'll
> > >>> first quickly describe what the split/select API looks like, so that
> > >> we're
> > >>> all on the same page. Then I'll present the new proposed side output
> > API
> > >>> and then I'll present new API for getting dropped late data from a
> > >> windowed
> > >>> operation, which was the original motivation for adding side outputs.
> > >>>
> > >>> Split/select consists of two API calls:
> > DataStream.split(OutputSelector)
> > >>> and SplitStream.select(). You can use it like this:
> > >>>
> > >>> DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
> > >>>
> > >>> final String EVEN_SELECTOR = "even";
> > >>> final String ODD_SELECTOR = "odd";
> > >>>
> > >>> SplitStream<Integer> split = input.split(
> > >>>        new OutputSelector<Integer>() {
> > >>>            @Override
> > >>>            public Iterable<String> select(Integer value) {
> > >>>                if (value % 2 == 0) {
> > >>>                    return Collections.singleton(EVEN_SELECTOR);
> > >>>                } else {
> > >>>                    return Collections.singleton(ODD_SELECTOR);
> > >>>                }
> > >>>            }
> > >>>        });
> > >>>
> > >>> DataStream<Integer> evenStream = split.select(EVEN_SELECTOR);
> > >>> DataStream<Integer> oddStream = split.select(ODD_SELECTOR);
> > >>>
> > >>> The stream is split according to an OutputSelector that returns an
> > >> Iterable
> > >>> of Strings. Then you can use select() to get a new stream that only
> > >>> contains elements with the given selector. Notice how the element
> type
> > >> for
> > >>> all the split streams is the same.
> > >>>
> > >>> The new side output API proposal adds a new type OutputTag<T> and
> > relies
> > >> on
> > >>> extending ProcessFunction to allow emitting data to outputs besides
> the
> > >>> main output. I think it's best explained with an example as well:
> > >>>
> > >>> DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
> > >>>
> > >>> final OutputTag<String> sideOutput1 = new
> > OutputTag<>("side-output-1"){}
> > >> ;
> > >>> final OutputTag<Integer> sideOutput2 = new
> > OutputTag<>("side-output-2"){}
> > >> ;
> > >>>
> > >>> SingleOutputStreamOperator<String> mainOutputStream = input
> > >>>        .process(new ProcessFunction<Integer, String>() {
> > >>>
> > >>>            @Override
> > >>>            public void processElement(
> > >>>                    Integer value,
> > >>>                    Context ctx,
> > >>>                    Collector<String> out) throws Exception {
> > >>>
> > >>>                ctx.output(sideOutput1, "WE GOT: " + value);
> > >>>                ctx.output(sideOutput2, value);
> > >>>                out.collect("MAIN OUTPUT: " + value);
> > >>>            }
> > >>>
> > >>>        });
> > >>>
> > >>> DataStream<String> sideOutputStream1 =
> > >>> mainOutputStream.getSideOutput(sideOutput1);
> > >>> DataStream<Integer> sideOutputStream2 =
> > >>> mainOutputStream.getSideOutput(sideOutput2);
> > >>>
> > >>> Notice how the OutputTags are anonymous inner classes, similar to
> > >> TypeHint.
> > >>> We need this to be able to analyse the type of the side-output
> streams.
> > >>> Also notice, how the types of the side-output streams can be
> > independent
> > >> of
> > >>> the main-output stream, also notice how everything is correctly type
> > >>> checked by the Java Compiler.
> > >>>
> > >>> This change requires making ProcessFunction an abstract base class so
> > >> that
> > >>> not every user has to implement the onTimer() method. We would also
> > need
> > >> to
> > >>> allow ProcessFunction on a non-keyed stream.
> > >>>
> > >>> Chen also implemented an API based on FlatMapFunction that looks like
> > the
> > >>> one proposed in the FLIP. This relies on CollectorWrapper, which can
> be
> > >>> used to "pimp" a Collector to also allow emitting to side outputs.
> > >>>
> > >>> For WindowedStream we have two proposals: make OutputTag visible on
> the
> > >>> WindowedStream API or make the result type of WindowedStream
> operations
> > >>> more specific to allow a getDroppedDataSideOutput() method. For the
> > first
> > >>> proposal it would look like this:
> > >>>
> > >>> final OutputTag<String> lateDataTag = new
> > OutputTag<>("side-output-1"){}
> > >> ;
> > >>>
> > >>> DataStream<T> windowedResult = input
> > >>>  .keyBy(...)
> > >>>  .window(...)
> > >>>  .sideOutputLateData(lateDataTag)
> > >>>  .apply(...)
> > >>>
> > >>> DataStream<IN> lateData = windowedResult.getSideOutput(lateDataTag);
> > >>>
> > >>> For the second proposal it would look like this:
> > >>>
> > >>> WindowedOperator<T> windowedResult = input
> > >>>  .keyBy(...)
> > >>>  .window(...)
> > >>>  .apply(...)
> > >>>
> > >>> DataStream<IN> lateData = windowedResult.getSideOutput();
> > >>>
> > >>> Right now, the result of window operations is a
> > >>> SingleOutputStreamOperator<T>, same as it is for all DataStream
> > >> operations.
> > >>> Making the result type more specific, i.e. a WindowedOperator, would
> > >> allow
> > >>> us to add extra methods there. This would require wrapping a
> > >>> SingleOutputStreamOperator and forwarding all the method calls to the
> > >>> wrapped operator which can be a bit of a hassle for future changes.
> The
> > >>> first proposal requires additional boilerplate code.
> > >>>
> > >>> Sorry for the long mail but I think it's necessary to get everyone on
> > the
> > >>> same page. The question is now: how should we proceed with the
> proposed
> > >> API
> > >>> and the old split/select API? I propose to deprecate split/select and
> > >> only
> > >>> have side outputs, going forward. Of course, I'm a bit biased on
> this.
> > >> ;-)
> > >>> If we decide to do this, we also need to decide on what the side
> output
> > >> API
> > >>> should look like.
> > >>>
> > >>> Happy discussing! Feedback very welcome. :-)
> > >>>
> > >>> Best,
> > >>> Aljoscha
> > >>>
> > >>> [1]
> > >>>
> > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > >> 13+Side+Outputs+in+Flink
> > >>>
> > >>
> > >
> > >
> > >
> > > --
> > >
> > > Jamie Grier
> > > data Artisans, Director of Applications Engineering
> > > @jamiegrier <https://twitter.com/jamiegrier>
> > > ja...@data-artisans.com
> >
> >
>

Reply via email to