> sideoutput might deserve a seperate class which inherit form
singleoutput. It might prevent lot of confusions
Thanks, but how could that be done? Do you mean that if one calls
.process(), then the stream would change to another class which
would only allow calls like .getMainOutput() or
.getSideOutput("name")? Of course compile time error would be
even better than a runtime error, but I don't see yet how it
could be done in practice.
On Sun, Jan 14, 2018 at 4:55 AM, Chen Qin <qinnc...@gmail.com
<mailto:qinnc...@gmail.com>> wrote:
Hi Juho,
I think sideoutput might deserve a seperate class which
inherit form singleoutput. It might prevent lot of
confusions. A more generic question is whether datastream api
can be mulitple ins and mulitple outs natively. It's more
like scheduling problem when you come from single process
system to multiple process system, which one should get
resource and how much sharing same hardware resources, I
guess it will open gate to lots of edge cases -> strategies->
more edge cases :)
Chen
On Fri, Jan 12, 2018 at 6:34 AM, Juho Autio
<juho.au...@rovio.com <mailto:juho.au...@rovio.com>> wrote:
Maybe I could express it in a slightly different way: if
adding the .filter() after .process() causes the side
output to be somehow totally "lost", then I believe the
.getSideOutput() could be aware that there is not such
side output to be listened to from upstream, and throw an
exception. I mean, this should be possible when building
the DAG, it shouldn't require starting the stream to
detect? Thanks..
On Fri, Jan 12, 2018 at 2:48 PM, Tzu-Li (Gordon) Tai
<tzuli...@apache.org <mailto:tzuli...@apache.org>> wrote:
Hi Juho,
Now that I think of it this seems like a bug to me:
why does the call to getSideOutput succeed if it
doesn't provide _any_ input?
With the way side outputs work, I don’t think this is
possible (or would make sense). An operator does not
know whether or not it would ever emit some element
with a given tag.
As far as I understand it, calling `getSideOutput`
essentially adds a virtual node to the result stream
graph that listens to the specified tag from the
upstream input.
While I’m not aware whether or not your observation
is expected behavior, from an API perspective, I can
see why it is a bit confusing for you.
Aljoscha would be the expert here, maybe he’ll have
more insights. I’ve looped him in cc’ed.
Cheers,
Gordon
On 12 January 2018 at 4:05:13 PM, Juho Autio
(juho.au...@rovio.com <mailto:juho.au...@rovio.com>)
wrote:
When I run the code below (Flink 1.4.0 or 1.3.1),
only "a" is printed. If I switch the position of
.process() & .filter() (ie. filter first, then
process), both "a" & "b" are printed, as expected.
I guess it's a bit hard to say what the side output
should include in this case: the stream before
filtering or after it?
What I would suggest is Flink to protect against
this kind of a user error that is hard to debug.
Would it be possible that Flink throws an exception
if one tries to call .getSideOutput() on anything
that doesn't actually provide that side output? Now
that I think of it this seems like a bug to me: why
does the call to getSideOutput succeed if it doesn't
provide _any_ input? I would expect it to get the
side output data stream _before_ applying .filter().
import
org.apache.flink.api.common.functions.FilterFunction;
import
org.apache.flink.streaming.api.datastream.DataStreamSource;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class SideOutputProblem {
public static void main(String[] args) throws
Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream =
env.fromElements("a", "b");
OutputTag<String> sideOutputTag = new
OutputTag<String>("side-output"){};
SingleOutputStreamOperator<String> processed = stream
.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String s, Context
context, Collector<String> collector) throws Exception {
if ("a".equals(s)) {
collector.collect(s);
} else {
context.output(sideOutputTag, s);
}
}
})
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String s) throws Exception {
return true;
}
});
processed.getSideOutput(sideOutputTag).printToErr();
processed.print();
env.execute();
}
}
Cheers,
Juho