When I run the code below (Flink 1.4.0 or 1.3.1), only "a" is printed. If I
switch the position of .process() & .filter() (ie. filter first, then
process), both "a" & "b" are printed, as expected.

I guess it's a bit hard to say what the side output should include in this
case: the stream before filtering or after it?

What I would suggest is Flink to protect against this kind of a user error
that is hard to debug. Would it be possible that Flink throws an exception
if one tries to call .getSideOutput() on anything that doesn't actually
provide that side output? Now that I think of it this seems like a bug to
me: why does the call to getSideOutput succeed if it doesn't provide _any_
input? I would expect it to get the side output data stream _before_
applying .filter().

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class SideOutputProblem {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream = env.fromElements("a", "b");
        OutputTag<String> sideOutputTag = new
OutputTag<String>("side-output"){};

        SingleOutputStreamOperator<String> processed = stream

                .process(new ProcessFunction<String, String>() {
                    @Override
                    public void processElement(String s, Context context,
Collector<String> collector) throws Exception {
                        if ("a".equals(s)) {
                            collector.collect(s);
                        } else {
                            context.output(sideOutputTag, s);
                        }
                    }
                })

                .filter(new FilterFunction<String>() {
                    @Override
                    public boolean filter(String s) throws Exception {
                        return true;
                    }
                });

        processed.getSideOutput(sideOutputTag).printToErr();

        processed.print();

        env.execute();
    }

}

Cheers,
Juho

Reply via email to