1. I like the variant without the explicit OutputTag for the WindowOperator:

WindowedOperator<T> windowedResult = input
  .keyBy(...)
  .window(...)
  .apply(...)

DataStream<IN> lateData = windowedResult.getLateDataSideOutput();

I like Jamie's proposal getLateStream() a little better though. On the
other hand I see that it makes sense to make it explicit that a side
output is consumed.

2. I would keep the split/select API and deprecate it. Ideally,
implemented on top of side outputs.

3. What about Gyula's question to expose side output for regular
RichFunctions as well?

I think it makes sense to not "force" users to the ProcessFunction in
order to use side outputs. If on the other hand we think that the main
use case will be the late data stream from windows then it's probably
fine. I think we have two options for RichFunctions, either the
runtime context or the collector, both of which are shared with the
DataSet API. I would be OK with throwing an
UnsupportedOperationException if a batch API user tries to access it.



On Mon, Feb 27, 2017 at 8:56 PM, Jamie Grier <ja...@data-artisans.com> wrote:
> Aljoscha,
>
> Ahh, that is much better.  As long as it's explicitly referring to late
> data I think it's fine.  I also like the second variant where a user
> doesn't have to explicitly create the OutputTag.
>
>
>
> On Mon, Feb 27, 2017 at 8:45 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> @Jamie I must have mistyped my last API proposal. This piece of code:
>> WindowedOperator<T> windowedResult = input
>>   .keyBy(...)
>>   .window(...)
>>   .apply(...)
>>
>> DataStream<IN> lateData = windowedResult.getSideOutput();
>>
>> should actually have been:
>>
>> WindowedOperator<T> windowedResult = input
>>   .keyBy(...)
>>   .window(...)
>>   .apply(...)
>>
>> DataStream<IN> lateData = windowedResult.getLateDataSideOutput();
>>
>> So apart from the naming it's pretty much the same as your suggestion,
>> right? The reason why I preferred the explicit OutputTag is that we
>> otherwise have to create another layer of OutputTags that are internal to
>> the system so that users cannot accidentally also send data to the same
>> side output. It just means writing more code for use and introducing the
>> more concrete return type for the WindowedStream operations. But that's
>> fine if y'all prefer that variant. :-)
>>
>> On Sat, 25 Feb 2017 at 04:19 Chen Qin <qinnc...@gmail.com> wrote:
>>
>> > Hi Jamie,
>> >
>> > I think it does make consuming late arriving events more explicit! At
>> cost
>> > of
>> > fix a predefined OutputTag<IN> which user have no control nor definition
>> > an extra UDF which essentially filter out all mainOutputs and only let
>> > sideOutput pass (like filterFunction)
>> >
>> > Thanks,
>> > Chen
>> >
>> > > On Feb 24, 2017, at 1:17 PM, Jamie Grier <ja...@data-artisans.com>
>> > wrote:
>> > >
>> > > I prefer the ProcessFunction and side outputs solution over split() and
>> > > select() which I've never liked primarily due to the lack of type
>> safety
>> > > and it also doesn't really seem to fit with the rest of Flink's API.
>> > >
>> > > On the late data question I strongly prefer the late data concept being
>> > > explicit in the API.  Could we not also do something like:
>> > >
>> > > WindowedStream<> windowedStream = input
>> > >  .keyBy(...)
>> > >  .window(...);
>> > >
>> > > DataStream<> mainOutput = windowedStream
>> > >  .apply(...);
>> > >
>> > > DataStream<> lateOutput = windowStream
>> > >  .lateStream();
>> > >
>> > >
>> > >
>> > >
>> > >
>> > >
>> > >
>> > > On Thu, Feb 23, 2017 at 7:08 AM, Gyula Fóra <gyf...@apache.org> wrote:
>> > >
>> > >> Hi,
>> > >>
>> > >> Thanks for the nice proposal, I like the idea of side outputs, and it
>> > would
>> > >> make a lot of topologies much simpler.
>> > >>
>> > >> Regarding the API I think we should come up with a way of making side
>> > >> otuputs accessible from all sort of operators in a similar way. For
>> > >> instance through the RichFunction interface with a special collector
>> > that
>> > >> we invalidate when the user should not be collecting to it. (just a
>> > quick
>> > >> idea)
>> > >>
>> > >> I personally wouldn't deprecate the "universal" Split/Select API that
>> > can
>> > >> be used on any  DataStream in favor of functionality that is only
>> > >> accessible trhough the process function/ or a few select operators. I
>> > think
>> > >> the Split/Select pattern is also very nice and I use it in many
>> > different
>> > >> contexts to get efficient multiway filtering (after map/co operators
>> for
>> > >> examples).
>> > >>
>> > >> Regards,
>> > >> Gyula
>> > >>
>> > >> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2017. febr.
>> > 23.,
>> > >> Cs, 15:42):
>> > >>
>> > >>> Hi Folks,
>> > >>> Chen and I have been working for a while now on making FLIP-13 (side
>> > >>> outputs) [1] a reality. We think we have a pretty good internal
>> > >>> implementation and also a proposal for an API but now we need to
>> > discuss
>> > >>> how we want to go forward with this, especially how we should deal
>> with
>> > >>> split/select which does some of the same things side outputs can do.
>> > I'll
>> > >>> first quickly describe what the split/select API looks like, so that
>> > >> we're
>> > >>> all on the same page. Then I'll present the new proposed side output
>> > API
>> > >>> and then I'll present new API for getting dropped late data from a
>> > >> windowed
>> > >>> operation, which was the original motivation for adding side outputs.
>> > >>>
>> > >>> Split/select consists of two API calls:
>> > DataStream.split(OutputSelector)
>> > >>> and SplitStream.select(). You can use it like this:
>> > >>>
>> > >>> DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
>> > >>>
>> > >>> final String EVEN_SELECTOR = "even";
>> > >>> final String ODD_SELECTOR = "odd";
>> > >>>
>> > >>> SplitStream<Integer> split = input.split(
>> > >>>        new OutputSelector<Integer>() {
>> > >>>            @Override
>> > >>>            public Iterable<String> select(Integer value) {
>> > >>>                if (value % 2 == 0) {
>> > >>>                    return Collections.singleton(EVEN_SELECTOR);
>> > >>>                } else {
>> > >>>                    return Collections.singleton(ODD_SELECTOR);
>> > >>>                }
>> > >>>            }
>> > >>>        });
>> > >>>
>> > >>> DataStream<Integer> evenStream = split.select(EVEN_SELECTOR);
>> > >>> DataStream<Integer> oddStream = split.select(ODD_SELECTOR);
>> > >>>
>> > >>> The stream is split according to an OutputSelector that returns an
>> > >> Iterable
>> > >>> of Strings. Then you can use select() to get a new stream that only
>> > >>> contains elements with the given selector. Notice how the element
>> type
>> > >> for
>> > >>> all the split streams is the same.
>> > >>>
>> > >>> The new side output API proposal adds a new type OutputTag<T> and
>> > relies
>> > >> on
>> > >>> extending ProcessFunction to allow emitting data to outputs besides
>> the
>> > >>> main output. I think it's best explained with an example as well:
>> > >>>
>> > >>> DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
>> > >>>
>> > >>> final OutputTag<String> sideOutput1 = new
>> > OutputTag<>("side-output-1"){}
>> > >> ;
>> > >>> final OutputTag<Integer> sideOutput2 = new
>> > OutputTag<>("side-output-2"){}
>> > >> ;
>> > >>>
>> > >>> SingleOutputStreamOperator<String> mainOutputStream = input
>> > >>>        .process(new ProcessFunction<Integer, String>() {
>> > >>>
>> > >>>            @Override
>> > >>>            public void processElement(
>> > >>>                    Integer value,
>> > >>>                    Context ctx,
>> > >>>                    Collector<String> out) throws Exception {
>> > >>>
>> > >>>                ctx.output(sideOutput1, "WE GOT: " + value);
>> > >>>                ctx.output(sideOutput2, value);
>> > >>>                out.collect("MAIN OUTPUT: " + value);
>> > >>>            }
>> > >>>
>> > >>>        });
>> > >>>
>> > >>> DataStream<String> sideOutputStream1 =
>> > >>> mainOutputStream.getSideOutput(sideOutput1);
>> > >>> DataStream<Integer> sideOutputStream2 =
>> > >>> mainOutputStream.getSideOutput(sideOutput2);
>> > >>>
>> > >>> Notice how the OutputTags are anonymous inner classes, similar to
>> > >> TypeHint.
>> > >>> We need this to be able to analyse the type of the side-output
>> streams.
>> > >>> Also notice, how the types of the side-output streams can be
>> > independent
>> > >> of
>> > >>> the main-output stream, also notice how everything is correctly type
>> > >>> checked by the Java Compiler.
>> > >>>
>> > >>> This change requires making ProcessFunction an abstract base class so
>> > >> that
>> > >>> not every user has to implement the onTimer() method. We would also
>> > need
>> > >> to
>> > >>> allow ProcessFunction on a non-keyed stream.
>> > >>>
>> > >>> Chen also implemented an API based on FlatMapFunction that looks like
>> > the
>> > >>> one proposed in the FLIP. This relies on CollectorWrapper, which can
>> be
>> > >>> used to "pimp" a Collector to also allow emitting to side outputs.
>> > >>>
>> > >>> For WindowedStream we have two proposals: make OutputTag visible on
>> the
>> > >>> WindowedStream API or make the result type of WindowedStream
>> operations
>> > >>> more specific to allow a getDroppedDataSideOutput() method. For the
>> > first
>> > >>> proposal it would look like this:
>> > >>>
>> > >>> final OutputTag<String> lateDataTag = new
>> > OutputTag<>("side-output-1"){}
>> > >> ;
>> > >>>
>> > >>> DataStream<T> windowedResult = input
>> > >>>  .keyBy(...)
>> > >>>  .window(...)
>> > >>>  .sideOutputLateData(lateDataTag)
>> > >>>  .apply(...)
>> > >>>
>> > >>> DataStream<IN> lateData = windowedResult.getSideOutput(lateDataTag);
>> > >>>
>> > >>> For the second proposal it would look like this:
>> > >>>
>> > >>> WindowedOperator<T> windowedResult = input
>> > >>>  .keyBy(...)
>> > >>>  .window(...)
>> > >>>  .apply(...)
>> > >>>
>> > >>> DataStream<IN> lateData = windowedResult.getSideOutput();
>> > >>>
>> > >>> Right now, the result of window operations is a
>> > >>> SingleOutputStreamOperator<T>, same as it is for all DataStream
>> > >> operations.
>> > >>> Making the result type more specific, i.e. a WindowedOperator, would
>> > >> allow
>> > >>> us to add extra methods there. This would require wrapping a
>> > >>> SingleOutputStreamOperator and forwarding all the method calls to the
>> > >>> wrapped operator which can be a bit of a hassle for future changes.
>> The
>> > >>> first proposal requires additional boilerplate code.
>> > >>>
>> > >>> Sorry for the long mail but I think it's necessary to get everyone on
>> > the
>> > >>> same page. The question is now: how should we proceed with the
>> proposed
>> > >> API
>> > >>> and the old split/select API? I propose to deprecate split/select and
>> > >> only
>> > >>> have side outputs, going forward. Of course, I'm a bit biased on
>> this.
>> > >> ;-)
>> > >>> If we decide to do this, we also need to decide on what the side
>> output
>> > >> API
>> > >>> should look like.
>> > >>>
>> > >>> Happy discussing! Feedback very welcome. :-)
>> > >>>
>> > >>> Best,
>> > >>> Aljoscha
>> > >>>
>> > >>> [1]
>> > >>>
>> > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
>> > >> 13+Side+Outputs+in+Flink
>> > >>>
>> > >>
>> > >
>> > >
>> > >
>> > > --
>> > >
>> > > Jamie Grier
>> > > data Artisans, Director of Applications Engineering
>> > > @jamiegrier <https://twitter.com/jamiegrier>
>> > > ja...@data-artisans.com
>> >
>> >
>>
>
>
>
> --
>
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier <https://twitter.com/jamiegrier>
> ja...@data-artisans.com

Reply via email to