Could someone with knowledge of the right terms create this in JIRA, please? I guess I could also create it if needed..
On Mon, Jan 15, 2018 at 3:15 PM, Chesnay Schepler <ches...@apache.org> wrote: > yes, i meant that process() returns the special operator. This would > definitely deserve a JIRA issue. > > > On 15.01.2018 14:09, Juho Autio wrote: > > Thanks for the explanation. Did you meant that process() would return a > SingleOutputWithSideOutputOperator? > > Any way, that should be enough to avoid the problem that I hit (and it > also seems like the best & only way). > > Maybe the name should be something more generic though, like > ProcessedSingleOutputOperator or something, I wouldn't know.. > > Would this deserve an improvement ticket in JIRA? > > On Mon, Jan 15, 2018 at 12:43 PM, Chesnay Schepler <ches...@apache.org> > wrote: > >> It would mean that getSideOutput() would return a >> SingleOutputWithSideOutputOperator which extends SingleOutputOperator >> offering getSideOutput(). Other transformations would still return a >> SingleOutputOperator. >> >> With this the following code wouldn't compile. >> >> stream >> .process(...) >> .filter(...) >> .getSideOutput(...) // compile error >> >> You would have to explicitly define the code as below, which makes the >> behavior unambiguous: >> >> processed = stream >> .process(...) >> >> filtered = processed >> .filter(...) >> >> filteredSideOutput = processed >> .getSideOutput(...) >> .filter(...) >> >> >> On 15.01.2018 09:55, Juho Autio wrote: >> >> > sideoutput might deserve a seperate class which inherit form >> singleoutput. It might prevent lot of confusions >> >> Thanks, but how could that be done? Do you mean that if one calls >> .process(), then the stream would change to another class which would only >> allow calls like .getMainOutput() or .getSideOutput("name")? Of course >> compile time error would be even better than a runtime error, but I don't >> see yet how it could be done in practice. >> >> On Sun, Jan 14, 2018 at 4:55 AM, Chen Qin <qinnc...@gmail.com> wrote: >> >>> Hi Juho, >>> >>> I think sideoutput might deserve a seperate class which inherit form >>> singleoutput. It might prevent lot of confusions. A more generic question >>> is whether datastream api can be mulitple ins and mulitple outs natively. >>> It's more like scheduling problem when you come from single process system >>> to multiple process system, which one should get resource and how much >>> sharing same hardware resources, I guess it will open gate to lots of edge >>> cases -> strategies-> more edge cases :) >>> >>> Chen >>> >>> On Fri, Jan 12, 2018 at 6:34 AM, Juho Autio <juho.au...@rovio.com> >>> wrote: >>> >>>> Maybe I could express it in a slightly different way: if adding the >>>> .filter() after .process() causes the side output to be somehow totally >>>> "lost", then I believe the .getSideOutput() could be aware that there is >>>> not such side output to be listened to from upstream, and throw an >>>> exception. I mean, this should be possible when building the DAG, it >>>> shouldn't require starting the stream to detect? Thanks.. >>>> >>>> On Fri, Jan 12, 2018 at 2:48 PM, Tzu-Li (Gordon) Tai < >>>> tzuli...@apache.org> wrote: >>>> >>>>> Hi Juho, >>>>> >>>>> Now that I think of it this seems like a bug to me: why does the call >>>>> to getSideOutput succeed if it doesn't provide _any_ input? >>>>> >>>>> >>>>> With the way side outputs work, I don’t think this is possible (or >>>>> would make sense). An operator does not know whether or not it would ever >>>>> emit some element with a given tag. >>>>> As far as I understand it, calling `getSideOutput` essentially adds a >>>>> virtual node to the result stream graph that listens to the specified tag >>>>> from the upstream input. >>>>> >>>>> While I’m not aware whether or not your observation is expected >>>>> behavior, from an API perspective, I can see why it is a bit confusing for >>>>> you. >>>>> Aljoscha would be the expert here, maybe he’ll have more insights. >>>>> I’ve looped him in cc’ed. >>>>> >>>>> Cheers, >>>>> Gordon >>>>> >>>>> >>>>> On 12 January 2018 at 4:05:13 PM, Juho Autio (juho.au...@rovio.com) >>>>> wrote: >>>>> >>>>> When I run the code below (Flink 1.4.0 or 1.3.1), only "a" is printed. >>>>> If I switch the position of .process() & .filter() (ie. filter first, then >>>>> process), both "a" & "b" are printed, as expected. >>>>> >>>>> I guess it's a bit hard to say what the side output should include in >>>>> this case: the stream before filtering or after it? >>>>> >>>>> What I would suggest is Flink to protect against this kind of a user >>>>> error that is hard to debug. Would it be possible that Flink throws an >>>>> exception if one tries to call .getSideOutput() on anything that doesn't >>>>> actually provide that side output? Now that I think of it this seems like >>>>> a >>>>> bug to me: why does the call to getSideOutput succeed if it doesn't >>>>> provide >>>>> _any_ input? I would expect it to get the side output data stream _before_ >>>>> applying .filter(). >>>>> >>>>> import org.apache.flink.api.common.functions.FilterFunction; >>>>> import org.apache.flink.streaming.api.datastream.DataStreamSource; >>>>> import org.apache.flink.streaming.api.datastream.SingleOutputStream >>>>> Operator; >>>>> import org.apache.flink.streaming.api.environment.StreamExecutionEn >>>>> vironment; >>>>> import org.apache.flink.streaming.api.functions.ProcessFunction; >>>>> import org.apache.flink.util.Collector; >>>>> import org.apache.flink.util.OutputTag; >>>>> >>>>> public class SideOutputProblem { >>>>> >>>>> public static void main(String[] args) throws Exception { >>>>> >>>>> StreamExecutionEnvironment env = StreamExecutionEnvironment.get >>>>> ExecutionEnvironment(); >>>>> DataStreamSource<String> stream = env.fromElements("a", "b"); >>>>> OutputTag<String> sideOutputTag = new >>>>> OutputTag<String>("side-output"){}; >>>>> >>>>> SingleOutputStreamOperator<String> processed = stream >>>>> >>>>> .process(new ProcessFunction<String, String>() { >>>>> @Override >>>>> public void processElement(String s, Context >>>>> context, Collector<String> collector) throws Exception { >>>>> if ("a".equals(s)) { >>>>> collector.collect(s); >>>>> } else { >>>>> context.output(sideOutputTag, s); >>>>> } >>>>> } >>>>> }) >>>>> >>>>> .filter(new FilterFunction<String>() { >>>>> @Override >>>>> public boolean filter(String s) throws Exception { >>>>> return true; >>>>> } >>>>> }); >>>>> >>>>> processed.getSideOutput(sideOutputTag).printToErr(); >>>>> >>>>> processed.print(); >>>>> >>>>> env.execute(); >>>>> } >>>>> >>>>> } >>>>> >>>>> Cheers, >>>>> Juho >>>>> >>>>> >>>> >>> >> >> >