yes, i meant that process() returns the special operator. This would definitely deserve a JIRA issue.

On 15.01.2018 14:09, Juho Autio wrote:
Thanks for the explanation. Did you meant that process() would return a SingleOutputWithSideOutputOperator?

Any way, that should be enough to avoid the problem that I hit (and it also seems like the best & only way).

Maybe the name should be something more generic though, like ProcessedSingleOutputOperator or something, I wouldn't know..

Would this deserve an improvement ticket in JIRA?

On Mon, Jan 15, 2018 at 12:43 PM, Chesnay Schepler <ches...@apache.org <mailto:ches...@apache.org>> wrote:

    It would mean that getSideOutput() would return a
    SingleOutputWithSideOutputOperator which extends
    SingleOutputOperator offering getSideOutput(). Other
    transformations would still return a SingleOutputOperator.

    With this the following code wouldn't compile.

    stream
        .process(...)
        .filter(...)
        .getSideOutput(...) // compile error

    You would have to explicitly define the code as below, which makes
    the behavior unambiguous:

    processed = stream
        .process(...)

    filtered = processed
        .filter(...)

    filteredSideOutput = processed
        .getSideOutput(...)
        .filter(...)


    On 15.01.2018 09:55, Juho Autio wrote:
    > sideoutput might deserve a seperate class which inherit form
    singleoutput. It might prevent lot of confusions

    Thanks, but how could that be done? Do you mean that if one calls
    .process(), then the stream would change to another class which
    would only allow calls like .getMainOutput() or
    .getSideOutput("name")? Of course compile time error would be
    even better than a runtime error, but I don't see yet how it
    could be done in practice.

    On Sun, Jan 14, 2018 at 4:55 AM, Chen Qin <qinnc...@gmail.com
    <mailto:qinnc...@gmail.com>> wrote:

        Hi Juho,

        I think sideoutput might deserve a seperate class which
        inherit form singleoutput. It might prevent lot of
        confusions. A more generic question is whether datastream api
        can be mulitple ins and mulitple outs natively. It's more
        like scheduling problem when you come from single process
        system to multiple process system, which one should get
        resource and how much sharing same hardware resources, I
        guess it will open gate to lots of edge cases -> strategies->
        more edge cases :)

        Chen

        On Fri, Jan 12, 2018 at 6:34 AM, Juho Autio
        <juho.au...@rovio.com <mailto:juho.au...@rovio.com>> wrote:

            Maybe I could express it in a slightly different way: if
            adding the .filter() after .process() causes the side
            output to be somehow totally "lost", then I believe the
            .getSideOutput() could be aware that there is not such
            side output to be listened to from upstream, and throw an
            exception. I mean, this should be possible when building
            the DAG, it shouldn't require starting the stream to
            detect? Thanks..

            On Fri, Jan 12, 2018 at 2:48 PM, Tzu-Li (Gordon) Tai
            <tzuli...@apache.org <mailto:tzuli...@apache.org>> wrote:

                Hi Juho,

                Now that I think of it this seems like a bug to me:
                why does the call to getSideOutput succeed if it
                doesn't provide _any_ input?

                With the way side outputs work, I don’t think this is
                possible (or would make sense). An operator does not
                know whether or not it would ever emit some element
                with a given tag.
                As far as I understand it, calling `getSideOutput`
                essentially adds a virtual node to the result stream
                graph that listens to the specified tag from the
                upstream input.

                While I’m not aware whether or not your observation
                is expected behavior, from an API perspective, I can
                see why it is a bit confusing for you.
                Aljoscha would be the expert here, maybe he’ll have
                more insights. I’ve looped him in cc’ed.

                Cheers,
                Gordon


                On 12 January 2018 at 4:05:13 PM, Juho Autio
                (juho.au...@rovio.com <mailto:juho.au...@rovio.com>)
                wrote:

                When I run the code below (Flink 1.4.0 or 1.3.1),
                only "a" is printed. If I switch the position of
                .process() & .filter() (ie. filter first, then
                process), both "a" & "b" are printed, as expected.

                I guess it's a bit hard to say what the side output
                should include in this case: the stream before
                filtering or after it?

                What I would suggest is Flink to protect against
                this kind of a user error that is hard to debug.
                Would it be possible that Flink throws an exception
                if one tries to call .getSideOutput() on anything
                that doesn't actually provide that side output? Now
                that I think of it this seems like a bug to me: why
                does the call to getSideOutput succeed if it doesn't
                provide _any_ input? I would expect it to get the
                side output data stream _before_ applying .filter().

                import
                org.apache.flink.api.common.functions.FilterFunction;
                import
                org.apache.flink.streaming.api.datastream.DataStreamSource;
                import
                
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
                import
                
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                import
                org.apache.flink.streaming.api.functions.ProcessFunction;
                import org.apache.flink.util.Collector;
                import org.apache.flink.util.OutputTag;

                public class SideOutputProblem {

                public static void main(String[] args) throws
                Exception {

                StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
                DataStreamSource<String> stream =
                env.fromElements("a", "b");
                OutputTag<String> sideOutputTag = new
                OutputTag<String>("side-output"){};

                SingleOutputStreamOperator<String> processed = stream

                .process(new ProcessFunction<String, String>() {
                @Override
                public void processElement(String s, Context
                context, Collector<String> collector) throws Exception {
                if ("a".equals(s)) {
                collector.collect(s);
                } else {
                context.output(sideOutputTag, s);
                }
                          }
                      })

                .filter(new FilterFunction<String>() {
                @Override
                public boolean filter(String s) throws Exception {
                return true;
                          }
                      });

                processed.getSideOutput(sideOutputTag).printToErr();

                processed.print();

                env.execute();
                    }

                }

                Cheers,
                Juho






Reply via email to