Thanks for the explanation. Did you meant that process() would return a SingleOutputWithSideOutputOperator?
Any way, that should be enough to avoid the problem that I hit (and it also seems like the best & only way). Maybe the name should be something more generic though, like ProcessedSingleOutputOperator or something, I wouldn't know.. Would this deserve an improvement ticket in JIRA? On Mon, Jan 15, 2018 at 12:43 PM, Chesnay Schepler <ches...@apache.org> wrote: > It would mean that getSideOutput() would return a > SingleOutputWithSideOutputOperator which extends SingleOutputOperator > offering getSideOutput(). Other transformations would still return a > SingleOutputOperator. > > With this the following code wouldn't compile. > > stream > .process(...) > .filter(...) > .getSideOutput(...) // compile error > > You would have to explicitly define the code as below, which makes the > behavior unambiguous: > > processed = stream > .process(...) > > filtered = processed > .filter(...) > > filteredSideOutput = processed > .getSideOutput(...) > .filter(...) > > > On 15.01.2018 09:55, Juho Autio wrote: > > > sideoutput might deserve a seperate class which inherit form > singleoutput. It might prevent lot of confusions > > Thanks, but how could that be done? Do you mean that if one calls > .process(), then the stream would change to another class which would only > allow calls like .getMainOutput() or .getSideOutput("name")? Of course > compile time error would be even better than a runtime error, but I don't > see yet how it could be done in practice. > > On Sun, Jan 14, 2018 at 4:55 AM, Chen Qin <qinnc...@gmail.com> wrote: > >> Hi Juho, >> >> I think sideoutput might deserve a seperate class which inherit form >> singleoutput. It might prevent lot of confusions. A more generic question >> is whether datastream api can be mulitple ins and mulitple outs natively. >> It's more like scheduling problem when you come from single process system >> to multiple process system, which one should get resource and how much >> sharing same hardware resources, I guess it will open gate to lots of edge >> cases -> strategies-> more edge cases :) >> >> Chen >> >> On Fri, Jan 12, 2018 at 6:34 AM, Juho Autio <juho.au...@rovio.com> wrote: >> >>> Maybe I could express it in a slightly different way: if adding the >>> .filter() after .process() causes the side output to be somehow totally >>> "lost", then I believe the .getSideOutput() could be aware that there is >>> not such side output to be listened to from upstream, and throw an >>> exception. I mean, this should be possible when building the DAG, it >>> shouldn't require starting the stream to detect? Thanks.. >>> >>> On Fri, Jan 12, 2018 at 2:48 PM, Tzu-Li (Gordon) Tai < >>> tzuli...@apache.org> wrote: >>> >>>> Hi Juho, >>>> >>>> Now that I think of it this seems like a bug to me: why does the call >>>> to getSideOutput succeed if it doesn't provide _any_ input? >>>> >>>> >>>> With the way side outputs work, I don’t think this is possible (or >>>> would make sense). An operator does not know whether or not it would ever >>>> emit some element with a given tag. >>>> As far as I understand it, calling `getSideOutput` essentially adds a >>>> virtual node to the result stream graph that listens to the specified tag >>>> from the upstream input. >>>> >>>> While I’m not aware whether or not your observation is expected >>>> behavior, from an API perspective, I can see why it is a bit confusing for >>>> you. >>>> Aljoscha would be the expert here, maybe he’ll have more insights. I’ve >>>> looped him in cc’ed. >>>> >>>> Cheers, >>>> Gordon >>>> >>>> >>>> On 12 January 2018 at 4:05:13 PM, Juho Autio (juho.au...@rovio.com) >>>> wrote: >>>> >>>> When I run the code below (Flink 1.4.0 or 1.3.1), only "a" is printed. >>>> If I switch the position of .process() & .filter() (ie. filter first, then >>>> process), both "a" & "b" are printed, as expected. >>>> >>>> I guess it's a bit hard to say what the side output should include in >>>> this case: the stream before filtering or after it? >>>> >>>> What I would suggest is Flink to protect against this kind of a user >>>> error that is hard to debug. Would it be possible that Flink throws an >>>> exception if one tries to call .getSideOutput() on anything that doesn't >>>> actually provide that side output? Now that I think of it this seems like a >>>> bug to me: why does the call to getSideOutput succeed if it doesn't provide >>>> _any_ input? I would expect it to get the side output data stream _before_ >>>> applying .filter(). >>>> >>>> import org.apache.flink.api.common.functions.FilterFunction; >>>> import org.apache.flink.streaming.api.datastream.DataStreamSource; >>>> import org.apache.flink.streaming.api.datastream.SingleOutputStream >>>> Operator; >>>> import org.apache.flink.streaming.api.environment.StreamExecutionEn >>>> vironment; >>>> import org.apache.flink.streaming.api.functions.ProcessFunction; >>>> import org.apache.flink.util.Collector; >>>> import org.apache.flink.util.OutputTag; >>>> >>>> public class SideOutputProblem { >>>> >>>> public static void main(String[] args) throws Exception { >>>> >>>> StreamExecutionEnvironment env = StreamExecutionEnvironment.get >>>> ExecutionEnvironment(); >>>> DataStreamSource<String> stream = env.fromElements("a", "b"); >>>> OutputTag<String> sideOutputTag = new >>>> OutputTag<String>("side-output"){}; >>>> >>>> SingleOutputStreamOperator<String> processed = stream >>>> >>>> .process(new ProcessFunction<String, String>() { >>>> @Override >>>> public void processElement(String s, Context >>>> context, Collector<String> collector) throws Exception { >>>> if ("a".equals(s)) { >>>> collector.collect(s); >>>> } else { >>>> context.output(sideOutputTag, s); >>>> } >>>> } >>>> }) >>>> >>>> .filter(new FilterFunction<String>() { >>>> @Override >>>> public boolean filter(String s) throws Exception { >>>> return true; >>>> } >>>> }); >>>> >>>> processed.getSideOutput(sideOutputTag).printToErr(); >>>> >>>> processed.print(); >>>> >>>> env.execute(); >>>> } >>>> >>>> } >>>> >>>> Cheers, >>>> Juho >>>> >>>> >>> >> > >