Thanks for the explanation. Did you meant that process() would return a
SingleOutputWithSideOutputOperator?

Any way, that should be enough to avoid the problem that I hit (and it also
seems like the best & only way).

Maybe the name should be something more generic though, like
ProcessedSingleOutputOperator or something, I wouldn't know..

Would this deserve an improvement ticket in JIRA?

On Mon, Jan 15, 2018 at 12:43 PM, Chesnay Schepler <ches...@apache.org>
wrote:

> It would mean that getSideOutput() would return a
> SingleOutputWithSideOutputOperator which extends SingleOutputOperator
> offering getSideOutput(). Other transformations would still return a
> SingleOutputOperator.
>
> With this the following code wouldn't compile.
>
> stream
>     .process(...)
>     .filter(...)
>     .getSideOutput(...) // compile error
>
> You would have to explicitly define the code as below, which makes the
> behavior unambiguous:
>
> processed = stream
>     .process(...)
>
> filtered = processed
>     .filter(...)
>
> filteredSideOutput = processed
>     .getSideOutput(...)
>     .filter(...)
>
>
> On 15.01.2018 09:55, Juho Autio wrote:
>
> > sideoutput might deserve a seperate class which inherit form
> singleoutput. It might prevent lot of confusions
>
> Thanks, but how could that be done? Do you mean that if one calls
> .process(), then the stream would change to another class which would only
> allow calls like .getMainOutput() or .getSideOutput("name")? Of course
> compile time error would be even better than a runtime error, but I don't
> see yet how it could be done in practice.
>
> On Sun, Jan 14, 2018 at 4:55 AM, Chen Qin <qinnc...@gmail.com> wrote:
>
>> Hi Juho,
>>
>> I think sideoutput might deserve a seperate class which inherit form
>> singleoutput. It might prevent lot of confusions. A more generic question
>> is whether datastream api can be mulitple ins and mulitple outs natively.
>> It's more like scheduling problem when you come from single process system
>> to multiple process system, which one should get resource and how much
>> sharing same hardware resources, I guess it will open gate to lots of edge
>> cases -> strategies-> more edge cases :)
>>
>> Chen
>>
>> On Fri, Jan 12, 2018 at 6:34 AM, Juho Autio <juho.au...@rovio.com> wrote:
>>
>>> Maybe I could express it in a slightly different way: if adding the
>>> .filter() after .process() causes the side output to be somehow totally
>>> "lost", then I believe the .getSideOutput() could be aware that there is
>>> not such side output to be listened to from upstream, and throw an
>>> exception. I mean, this should be possible when building the DAG, it
>>> shouldn't require starting the stream to detect? Thanks..
>>>
>>> On Fri, Jan 12, 2018 at 2:48 PM, Tzu-Li (Gordon) Tai <
>>> tzuli...@apache.org> wrote:
>>>
>>>> Hi Juho,
>>>>
>>>> Now that I think of it this seems like a bug to me: why does the call
>>>> to getSideOutput succeed if it doesn't provide _any_ input?
>>>>
>>>>
>>>> With the way side outputs work, I don’t think this is possible (or
>>>> would make sense). An operator does not know whether or not it would ever
>>>> emit some element with a given tag.
>>>> As far as I understand it, calling `getSideOutput` essentially adds a
>>>> virtual node to the result stream graph that listens to the specified tag
>>>> from the upstream input.
>>>>
>>>> While I’m not aware whether or not your observation is expected
>>>> behavior, from an API perspective, I can see why it is a bit confusing for
>>>> you.
>>>> Aljoscha would be the expert here, maybe he’ll have more insights. I’ve
>>>> looped him in cc’ed.
>>>>
>>>> Cheers,
>>>> Gordon
>>>>
>>>>
>>>> On 12 January 2018 at 4:05:13 PM, Juho Autio (juho.au...@rovio.com)
>>>> wrote:
>>>>
>>>> When I run the code below (Flink 1.4.0 or 1.3.1), only "a" is printed.
>>>> If I switch the position of .process() & .filter() (ie. filter first, then
>>>> process), both "a" & "b" are printed, as expected.
>>>>
>>>> I guess it's a bit hard to say what the side output should include in
>>>> this case: the stream before filtering or after it?
>>>>
>>>> What I would suggest is Flink to protect against this kind of a user
>>>> error that is hard to debug. Would it be possible that Flink throws an
>>>> exception if one tries to call .getSideOutput() on anything that doesn't
>>>> actually provide that side output? Now that I think of it this seems like a
>>>> bug to me: why does the call to getSideOutput succeed if it doesn't provide
>>>> _any_ input? I would expect it to get the side output data stream _before_
>>>> applying .filter().
>>>>
>>>> import org.apache.flink.api.common.functions.FilterFunction;
>>>> import org.apache.flink.streaming.api.datastream.DataStreamSource;
>>>> import org.apache.flink.streaming.api.datastream.SingleOutputStream
>>>> Operator;
>>>> import org.apache.flink.streaming.api.environment.StreamExecutionEn
>>>> vironment;
>>>> import org.apache.flink.streaming.api.functions.ProcessFunction;
>>>> import org.apache.flink.util.Collector;
>>>> import org.apache.flink.util.OutputTag;
>>>>
>>>> public class SideOutputProblem {
>>>>
>>>>     public static void main(String[] args) throws Exception {
>>>>
>>>>         StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>>>> ExecutionEnvironment();
>>>>         DataStreamSource<String> stream = env.fromElements("a", "b");
>>>>         OutputTag<String> sideOutputTag = new
>>>> OutputTag<String>("side-output"){};
>>>>
>>>>         SingleOutputStreamOperator<String> processed = stream
>>>>
>>>>                 .process(new ProcessFunction<String, String>() {
>>>>                     @Override
>>>>                     public void processElement(String s, Context
>>>> context, Collector<String> collector) throws Exception {
>>>>                         if ("a".equals(s)) {
>>>>                             collector.collect(s);
>>>>                         } else {
>>>>                             context.output(sideOutputTag, s);
>>>>                         }
>>>>                     }
>>>>                 })
>>>>
>>>>                 .filter(new FilterFunction<String>() {
>>>>                     @Override
>>>>                     public boolean filter(String s) throws Exception {
>>>>                         return true;
>>>>                     }
>>>>                 });
>>>>
>>>>         processed.getSideOutput(sideOutputTag).printToErr();
>>>>
>>>>         processed.print();
>>>>
>>>>         env.execute();
>>>>     }
>>>>
>>>> }
>>>>
>>>> Cheers,
>>>> Juho
>>>>
>>>>
>>>
>>
>
>

Reply via email to