Hi Juho,

Now that I think of it this seems like a bug to me: why does the call to 
getSideOutput succeed if it doesn't provide _any_ input?

With the way side outputs work, I don’t think this is possible (or would make 
sense). An operator does not know whether or not it would ever emit some 
element with a given tag.
As far as I understand it, calling `getSideOutput` essentially adds a virtual 
node to the result stream graph that listens to the specified tag from the 
upstream input.

While I’m not aware whether or not your observation is expected behavior, from 
an API perspective, I can see why it is a bit confusing for you.
Aljoscha would be the expert here, maybe he’ll have more insights. I’ve looped 
him in cc’ed.

Cheers,
Gordon

On 12 January 2018 at 4:05:13 PM, Juho Autio (juho.au...@rovio.com) wrote:

When I run the code below (Flink 1.4.0 or 1.3.1), only "a" is printed. If I 
switch the position of .process() & .filter() (ie. filter first, then process), 
both "a" & "b" are printed, as expected.

I guess it's a bit hard to say what the side output should include in this 
case: the stream before filtering or after it?

What I would suggest is Flink to protect against this kind of a user error that 
is hard to debug. Would it be possible that Flink throws an exception if one 
tries to call .getSideOutput() on anything that doesn't actually provide that 
side output? Now that I think of it this seems like a bug to me: why does the 
call to getSideOutput succeed if it doesn't provide _any_ input? I would expect 
it to get the side output data stream _before_ applying .filter().

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class SideOutputProblem {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream = env.fromElements("a", "b");
        OutputTag<String> sideOutputTag = new 
OutputTag<String>("side-output"){};

        SingleOutputStreamOperator<String> processed = stream

                .process(new ProcessFunction<String, String>() {
                    @Override
                    public void processElement(String s, Context context, 
Collector<String> collector) throws Exception {
                        if ("a".equals(s)) {
                            collector.collect(s);
                        } else {
                            context.output(sideOutputTag, s);
                        }
                    }
                })

                .filter(new FilterFunction<String>() {
                    @Override
                    public boolean filter(String s) throws Exception {
                        return true;
                    }
                });

        processed.getSideOutput(sideOutputTag).printToErr();

        processed.print();

        env.execute();
    }

}

Cheers,
Juho

Reply via email to