Could someone with knowledge of the right terms create this in JIRA,
please? I guess I could also create it if needed..

> yes, i meant that process() returns the special operator. This would
> definitely deserve a JIRA issue.
> Thanks for the explanation. Did you meant that process() would return a
> SingleOutputWithSideOutputOperator?
> Any way, that should be enough to avoid the problem that I hit (and it
> also seems like the best & only way).
> Maybe the name should be something more generic though, like
> ProcessedSingleOutputOperator or something, I wouldn't know..
> Would this deserve an improvement ticket in JIRA?
>> It would mean that getSideOutput() would return a
>> SingleOutputWithSideOutputOperator which extends SingleOutputOperator
>> offering getSideOutput(). Other transformations would still return a
>> SingleOutputOperator.
>> With this the following code wouldn't compile.
>> stream
>>     .process(...)
>>     .filter(...)
>>     .getSideOutput(...) // compile error
>> You would have to explicitly define the code as below, which makes the
>> behavior unambiguous:
>> processed = stream
>>     .process(...)
>> filtered = processed
>>     .filter(...)
>> filteredSideOutput = processed
>>     .getSideOutput(...)
>>     .filter(...)
>> > sideoutput might deserve a seperate class which inherit form
>> singleoutput. It might prevent lot of confusions
>> Thanks, but how could that be done? Do you mean that if one calls
>> .process(), then the stream would change to another class which would only
>> allow calls like .getMainOutput() or .getSideOutput("name")? Of course
>> compile time error would be even better than a runtime error, but I don't
>> see yet how it could be done in practice.
>>> Hi Juho,
>>> I think sideoutput might deserve a seperate class which inherit form
>>> singleoutput. It might prevent lot of confusions. A more generic question
>>> is whether datastream api can be mulitple ins and mulitple outs natively.
>>> It's more like scheduling problem when you come from single process system
>>> to multiple process system, which one should get resource and how much
>>> sharing same hardware resources, I guess it will open gate to lots of edge
>>> cases -> strategies-> more edge cases :)
>>> Chen
>>>> Maybe I could express it in a slightly different way: if adding the
>>>> .filter() after .process() causes the side output to be somehow totally
>>>> "lost", then I believe the .getSideOutput() could be aware that there is
>>>> not such side output to be listened to from upstream, and throw an
>>>> exception. I mean, this should be possible when building the DAG, it
>>>> shouldn't require starting the stream to detect? Thanks..
>>>>> Hi Juho,
>>>>> Now that I think of it this seems like a bug to me: why does the call
>>>>> to getSideOutput succeed if it doesn't provide _any_ input?
>>>>> With the way side outputs work, I don’t think this is possible (or
>>>>> would make sense). An operator does not know whether or not it would ever
>>>>> emit some element with a given tag.
>>>>> As far as I understand it, calling `getSideOutput` essentially adds a
>>>>> virtual node to the result stream graph that listens to the specified tag
>>>>> from the upstream input.
>>>>> While I’m not aware whether or not your observation is expected
>>>>> behavior, from an API perspective, I can see why it is a bit confusing for
>>>>> you.
>>>>> Aljoscha would be the expert here, maybe he’ll have more insights.
>>>>> I’ve looped him in cc’ed.
>>>>> Cheers,
>>>>> Gordon
>>>>> When I run the code below (Flink 1.4.0 or 1.3.1), only "a" is printed.
>>>>> If I switch the position of .process() & .filter() (ie. filter first, then
>>>>> process), both "a" & "b" are printed, as expected.
>>>>> I guess it's a bit hard to say what the side output should include in
>>>>> this case: the stream before filtering or after it?
>>>>> What I would suggest is Flink to protect against this kind of a user
>>>>> error that is hard to debug. Would it be possible that Flink throws an
>>>>> exception if one tries to call .getSideOutput() on anything that doesn't
>>>>> actually provide that side output? Now that I think of it this seems like 
>>>>> a
>>>>> bug to me: why does the call to getSideOutput succeed if it doesn't 
>>>>> provide
>>>>> _any_ input? I would expect it to get the side output data stream _before_
>>>>> applying .filter().
>>>>> import org.apache.flink.api.common.functions.FilterFunction;
>>>>> import org.apache.flink.streaming.api.datastream.DataStreamSource;
>>>>> import org.apache.flink.streaming.api.datastream.SingleOutputStream
>>>>> Operator;
>>>>> import org.apache.flink.streaming.api.environment.StreamExecutionEn
>>>>> vironment;
>>>>> import org.apache.flink.streaming.api.functions.ProcessFunction;
>>>>> import org.apache.flink.util.Collector;
>>>>> import org.apache.flink.util.OutputTag;
>>>>> public class SideOutputProblem {
>>>>>     public static void main(String[] args) throws Exception {
>>>>>         StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>>>>> ExecutionEnvironment();
>>>>>         DataStreamSource<String> stream = env.fromElements("a", "b");
>>>>>         OutputTag<String> sideOutputTag = new
>>>>> OutputTag<String>("side-output"){};
>>>>>         SingleOutputStreamOperator<String> processed = stream
>>>>>                 .process(new ProcessFunction<String, String>() {
>>>>>                     @Override
>>>>>                     public void processElement(String s, Context
>>>>> context, Collector<String> collector) throws Exception {
>>>>>                         if ("a".equals(s)) {
>>>>>                             collector.collect(s);
>>>>>                         } else {
>>>>>                             context.output(sideOutputTag, s);
>>>>>                         }
>>>>>                     }
>>>>>                 })
>>>>>                 .filter(new FilterFunction<String>() {
>>>>>                     @Override
>>>>>                     public boolean filter(String s) throws Exception {
>>>>>                         return true;
>>>>>                     }
>>>>>                 });
>>>>>         processed.getSideOutput(sideOutputTag).printToErr();
>>>>>         processed.print();
>>>>>         env.execute();
>>>>>     }
>>>>> }
>>>>> Cheers,
>>>>> Juho

