Could someone with knowledge of the right terms create this in JIRA,
please? I guess I could also create it if needed..

On Mon, Jan 15, 2018 at 3:15 PM, Chesnay Schepler <ches...@apache.org>
wrote:

> yes, i meant that process() returns the special operator. This would
> definitely deserve a JIRA issue.
>
>
> On 15.01.2018 14:09, Juho Autio wrote:
>
> Thanks for the explanation. Did you meant that process() would return a
> SingleOutputWithSideOutputOperator?
>
> Any way, that should be enough to avoid the problem that I hit (and it
> also seems like the best & only way).
>
> Maybe the name should be something more generic though, like
> ProcessedSingleOutputOperator or something, I wouldn't know..
>
> Would this deserve an improvement ticket in JIRA?
>
> On Mon, Jan 15, 2018 at 12:43 PM, Chesnay Schepler <ches...@apache.org>
> wrote:
>
>> It would mean that getSideOutput() would return a
>> SingleOutputWithSideOutputOperator which extends SingleOutputOperator
>> offering getSideOutput(). Other transformations would still return a
>> SingleOutputOperator.
>>
>> With this the following code wouldn't compile.
>>
>> stream
>>     .process(...)
>>     .filter(...)
>>     .getSideOutput(...) // compile error
>>
>> You would have to explicitly define the code as below, which makes the
>> behavior unambiguous:
>>
>> processed = stream
>>     .process(...)
>>
>> filtered = processed
>>     .filter(...)
>>
>> filteredSideOutput = processed
>>     .getSideOutput(...)
>>     .filter(...)
>>
>>
>> On 15.01.2018 09:55, Juho Autio wrote:
>>
>> > sideoutput might deserve a seperate class which inherit form
>> singleoutput. It might prevent lot of confusions
>>
>> Thanks, but how could that be done? Do you mean that if one calls
>> .process(), then the stream would change to another class which would only
>> allow calls like .getMainOutput() or .getSideOutput("name")? Of course
>> compile time error would be even better than a runtime error, but I don't
>> see yet how it could be done in practice.
>>
>> On Sun, Jan 14, 2018 at 4:55 AM, Chen Qin <qinnc...@gmail.com> wrote:
>>
>>> Hi Juho,
>>>
>>> I think sideoutput might deserve a seperate class which inherit form
>>> singleoutput. It might prevent lot of confusions. A more generic question
>>> is whether datastream api can be mulitple ins and mulitple outs natively.
>>> It's more like scheduling problem when you come from single process system
>>> to multiple process system, which one should get resource and how much
>>> sharing same hardware resources, I guess it will open gate to lots of edge
>>> cases -> strategies-> more edge cases :)
>>>
>>> Chen
>>>
>>> On Fri, Jan 12, 2018 at 6:34 AM, Juho Autio <juho.au...@rovio.com>
>>> wrote:
>>>
>>>> Maybe I could express it in a slightly different way: if adding the
>>>> .filter() after .process() causes the side output to be somehow totally
>>>> "lost", then I believe the .getSideOutput() could be aware that there is
>>>> not such side output to be listened to from upstream, and throw an
>>>> exception. I mean, this should be possible when building the DAG, it
>>>> shouldn't require starting the stream to detect? Thanks..
>>>>
>>>> On Fri, Jan 12, 2018 at 2:48 PM, Tzu-Li (Gordon) Tai <
>>>> tzuli...@apache.org> wrote:
>>>>
>>>>> Hi Juho,
>>>>>
>>>>> Now that I think of it this seems like a bug to me: why does the call
>>>>> to getSideOutput succeed if it doesn't provide _any_ input?
>>>>>
>>>>>
>>>>> With the way side outputs work, I don’t think this is possible (or
>>>>> would make sense). An operator does not know whether or not it would ever
>>>>> emit some element with a given tag.
>>>>> As far as I understand it, calling `getSideOutput` essentially adds a
>>>>> virtual node to the result stream graph that listens to the specified tag
>>>>> from the upstream input.
>>>>>
>>>>> While I’m not aware whether or not your observation is expected
>>>>> behavior, from an API perspective, I can see why it is a bit confusing for
>>>>> you.
>>>>> Aljoscha would be the expert here, maybe he’ll have more insights.
>>>>> I’ve looped him in cc’ed.
>>>>>
>>>>> Cheers,
>>>>> Gordon
>>>>>
>>>>>
>>>>> On 12 January 2018 at 4:05:13 PM, Juho Autio (juho.au...@rovio.com)
>>>>> wrote:
>>>>>
>>>>> When I run the code below (Flink 1.4.0 or 1.3.1), only "a" is printed.
>>>>> If I switch the position of .process() & .filter() (ie. filter first, then
>>>>> process), both "a" & "b" are printed, as expected.
>>>>>
>>>>> I guess it's a bit hard to say what the side output should include in
>>>>> this case: the stream before filtering or after it?
>>>>>
>>>>> What I would suggest is Flink to protect against this kind of a user
>>>>> error that is hard to debug. Would it be possible that Flink throws an
>>>>> exception if one tries to call .getSideOutput() on anything that doesn't
>>>>> actually provide that side output? Now that I think of it this seems like 
>>>>> a
>>>>> bug to me: why does the call to getSideOutput succeed if it doesn't 
>>>>> provide
>>>>> _any_ input? I would expect it to get the side output data stream _before_
>>>>> applying .filter().
>>>>>
>>>>> import org.apache.flink.api.common.functions.FilterFunction;
>>>>> import org.apache.flink.streaming.api.datastream.DataStreamSource;
>>>>> import org.apache.flink.streaming.api.datastream.SingleOutputStream
>>>>> Operator;
>>>>> import org.apache.flink.streaming.api.environment.StreamExecutionEn
>>>>> vironment;
>>>>> import org.apache.flink.streaming.api.functions.ProcessFunction;
>>>>> import org.apache.flink.util.Collector;
>>>>> import org.apache.flink.util.OutputTag;
>>>>>
>>>>> public class SideOutputProblem {
>>>>>
>>>>>     public static void main(String[] args) throws Exception {
>>>>>
>>>>>         StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>>>>> ExecutionEnvironment();
>>>>>         DataStreamSource<String> stream = env.fromElements("a", "b");
>>>>>         OutputTag<String> sideOutputTag = new
>>>>> OutputTag<String>("side-output"){};
>>>>>
>>>>>         SingleOutputStreamOperator<String> processed = stream
>>>>>
>>>>>                 .process(new ProcessFunction<String, String>() {
>>>>>                     @Override
>>>>>                     public void processElement(String s, Context
>>>>> context, Collector<String> collector) throws Exception {
>>>>>                         if ("a".equals(s)) {
>>>>>                             collector.collect(s);
>>>>>                         } else {
>>>>>                             context.output(sideOutputTag, s);
>>>>>                         }
>>>>>                     }
>>>>>                 })
>>>>>
>>>>>                 .filter(new FilterFunction<String>() {
>>>>>                     @Override
>>>>>                     public boolean filter(String s) throws Exception {
>>>>>                         return true;
>>>>>                     }
>>>>>                 });
>>>>>
>>>>>         processed.getSideOutput(sideOutputTag).printToErr();
>>>>>
>>>>>         processed.print();
>>>>>
>>>>>         env.execute();
>>>>>     }
>>>>>
>>>>> }
>>>>>
>>>>> Cheers,
>>>>> Juho
>>>>>
>>>>>
>>>>
>>>
>>
>>
>

Reply via email to