> sideoutput might deserve a seperate class which inherit
form singleoutput. It might prevent lot of confusions
Thanks, but how could that be done? Do you mean that if one
calls .process(), then the stream would change to another
class which would only allow calls like .getMainOutput() or
.getSideOutput("name")? Of course compile time error would
be even better than a runtime error, but I don't see yet how
it could be done in practice.
On Sun, Jan 14, 2018 at 4:55 AM, Chen Qin
<qinnc...@gmail.com <mailto:qinnc...@gmail.com>> wrote:
Hi Juho,
I think sideoutput might deserve a seperate class which
inherit form singleoutput. It might prevent lot of
confusions. A more generic question is whether
datastream api can be mulitple ins and mulitple outs
natively. It's more like scheduling problem when you
come from single process system to multiple process
system, which one should get resource and how much
sharing same hardware resources, I guess it will open
gate to lots of edge cases -> strategies-> more edge
cases :)
Chen
On Fri, Jan 12, 2018 at 6:34 AM, Juho Autio
<juho.au...@rovio.com <mailto:juho.au...@rovio.com>> wrote:
Maybe I could express it in a slightly different
way: if adding the .filter() after .process() causes
the side output to be somehow totally "lost", then I
believe the .getSideOutput() could be aware that
there is not such side output to be listened to from
upstream, and throw an exception. I mean, this
should be possible when building the DAG, it
shouldn't require starting the stream to detect?
Thanks..
On Fri, Jan 12, 2018 at 2:48 PM, Tzu-Li (Gordon) Tai
<tzuli...@apache.org <mailto:tzuli...@apache.org>>
wrote:
Hi Juho,
Now that I think of it this seems like a bug to
me: why does the call to getSideOutput succeed
if it doesn't provide _any_ input?
With the way side outputs work, I don’t think
this is possible (or would make sense). An
operator does not know whether or not it would
ever emit some element with a given tag.
As far as I understand it, calling
`getSideOutput` essentially adds a virtual node
to the result stream graph that listens to the
specified tag from the upstream input.
While I’m not aware whether or not your
observation is expected behavior, from an API
perspective, I can see why it is a bit confusing
for you.
Aljoscha would be the expert here, maybe he’ll
have more insights. I’ve looped him in cc’ed.
Cheers,
Gordon
On 12 January 2018 at 4:05:13 PM, Juho Autio
(juho.au...@rovio.com
<mailto:juho.au...@rovio.com>) wrote:
When I run the code below (Flink 1.4.0 or
1.3.1), only "a" is printed. If I switch the
position of .process() & .filter() (ie. filter
first, then process), both "a" & "b" are
printed, as expected.
I guess it's a bit hard to say what the side
output should include in this case: the stream
before filtering or after it?
What I would suggest is Flink to protect
against this kind of a user error that is hard
to debug. Would it be possible that Flink
throws an exception if one tries to call
.getSideOutput() on anything that doesn't
actually provide that side output? Now that I
think of it this seems like a bug to me: why
does the call to getSideOutput succeed if it
doesn't provide _any_ input? I would expect it
to get the side output data stream _before_
applying .filter().
import
org.apache.flink.api.common.functions.FilterFunction;
import
org.apache.flink.streaming.api.datastream.DataStreamSource;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class SideOutputProblem {
public static void main(String[] args) throws
Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream =
env.fromElements("a", "b");
OutputTag<String> sideOutputTag = new
OutputTag<String>("side-output"){};
SingleOutputStreamOperator<String> processed =
stream
.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String s, Context
context, Collector<String> collector) throws
Exception {
if ("a".equals(s)) {
collector.collect(s);
} else {
context.output(sideOutputTag, s);
}
}
})
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String s) throws Exception {
return true;
}
});
processed.getSideOutput(sideOutputTag).printToErr();
processed.print();
env.execute();
}
}
Cheers,
Juho