That is right, Timo, as far as I would understand it. On Mon, Mar 9, 2015 at 12:04 PM, Timo Walther <twal...@apache.org> wrote:
> Thanks for the clarification. If I have understood it correctly, forwarded > fields are only interesting for key fields, right? So I will implement that > key information is passed to the analyzer for consideration. > > So if GroupReduce1 is grouped by f1, the result will be > @ForwardedFields("1") in this example and not "*": > > public static class GroupReduce1 implements > GroupReduceFunction<Tuple2<Long, > Long>,Tuple2<Long, Long>> { > @Override > public void reduce(Iterable<Tuple2<Long, Long>> values, > Collector<Tuple2<Long, Long>> out) throws Exception { > out.collect(values.iterator().next()); > } > } > > > On 08.03.2015 23:21, Fabian Hueske wrote: > >> I added you comment and an answer to FLINK-1656: >> >> "Right, that's a good point. >> >> +1 limiting to key fields. That's much easier to reason about for users. >> >> However, I am not sure how it is implemented right now. >> I guess secondary sort info is already removed by the property filtering, >> but I need to verify that." >> >> 2015-03-08 21:53 GMT+01:00 Stephan Ewen <se...@apache.org>: >> >> Any other thoughts in this? >>> >>> On Fri, Mar 6, 2015 at 12:12 PM, Stephan Ewen <se...@apache.org> wrote: >>> >>> I think the order of emitting elements is not part of the forward field >>>> properties, but would rather be a separate one that we do not have right >>>> now. >>>> >>>> At the moment, we would assume that all group operations destroy >>>> >>> secondary >>> >>>> orders. >>>> >>>> In that sense, forward fields in group operations only make sense for >>>> fields where all fields are the same in the group (key fields). >>>> >>>> On Fri, Mar 6, 2015 at 11:25 AM, Fabian Hueske <fhue...@gmail.com> >>>> >>> wrote: >>> >>>> Hi Timo, >>>>> >>>>> there are several restrictions for forwarded fields of operators with >>>>> iterator input. >>>>> 1) forwarded fields must be emitted in the order in which they are >>>>> received >>>>> through the iterator >>>>> 2) all forwarded fields of a record must stick together, i.e., if your >>>>> function builds record from field 0 of the 1st, 3rd, 5th, ... and field >>>>> >>>> 1 >>> >>>> of the 2nd, 4th, ... record coming through the iterator, these are not >>>>> valid forwarded fields. >>>>> 3) it is OK to completely filter out records coming through the >>>>> >>>> iterator. >>> >>>> The reason for these rules is, that the optimizer uses forwarded fields >>>>> >>>> to >>> >>>> reason about physical data properties such as order and grouping. If you >>>>> mix up the order of records or emit records which are composed from >>>>> different input records, you might destroy a (secondary) order or >>>>> grouping. >>>>> >>>>> Considering these rules, your second example is correct as well. >>>>> In case of the TriadBuilder, the information is correct (in the context >>>>> >>>> of >>> >>>> the Program) as well, because field 0 is used as key. It is however >>>>> >>>> true, >>> >>>> that there is a strange dependency between the function and the context >>>>> >>>> in >>> >>>> which it is used within the program. It would be better to remove the >>>>> class >>>>> annotation, and add this information through the >>>>> >>>> .withForwardedFields("0") >>> >>>> method in the program, to make that clear. >>>>> >>>>> It is very good that you raise this point. >>>>> This is currently not reflected in the documentation is should be made >>>>> clear very soon. I will open a JIRA for that. >>>>> >>>>> Thanks, Fabian >>>>> >>>>> >>>>> >>>>> 2015-03-06 10:19 GMT+01:00 Timo Walther <twal...@apache.org>: >>>>> >>>>> Hey all, >>>>>> >>>>>> I'm currently working a lot on the UDF static code analyzer. But I >>>>>> >>>>> have >>> >>>> a >>>>> >>>>>> general question about Semantic Properties which might be also >>>>>> >>>>> interesting >>>>> >>>>>> for other users. >>>>>> >>>>>> How is the ForwardedFields annotation interpreted for UDF functions >>>>>> >>>>> with >>> >>>> Iterables? >>>>>> >>>>>> An example can be found in: org.apache.flink.examples. >>>>>> java.graph.EnumTrianglesBasic.TriadBuilder >>>>>> >>>>>> Does this mean that each call of "collect" must happen in the same >>>>>> >>>>> order >>> >>>> than the call of "next"? But this is not the case in the example >>>>>> >>>>> above. >>> >>>> Or >>>>> >>>>>> does the annotation only refer to the first iterator element? >>>>>> >>>>>> Other examples: >>>>>> >>>>>> @ForwardedFields("*") // CORRECT? >>>>>> public static class GroupReduce1 implements >>>>>> >>>>> GroupReduceFunction<Tuple2<Long, >>>>> >>>>>> Long>,Tuple2<Long, Long>> { >>>>>> @Override >>>>>> public void reduce(Iterable<Tuple2<Long, Long>> values, >>>>>> Collector<Tuple2<Long, Long>> out) throws Exception { >>>>>> out.collect(values.iterator().next()); >>>>>> } >>>>>> } >>>>>> >>>>>> @ForwardedFields("*") // NOT CORRECT? >>>>>> public static class GroupReduce3 implements >>>>>> >>>>> GroupReduceFunction<Tuple2<Long, >>>>> >>>>>> Long>,Tuple2<Long, Long>> { >>>>>> @Override >>>>>> public void reduce(Iterable<Tuple2<Long, Long>> values, >>>>>> Collector<Tuple2<Long, Long>> out) throws Exception { >>>>>> Iterator<Tuple2<Long, Long>> it = values.iterator(); >>>>>> while (it.hasNext()) { >>>>>> Tuple2<Long,Long> t = it.next(); >>>>>> if (t.f0 == 42) { >>>>>> out.collect(t); >>>>>> } >>>>>> } >>>>>> } >>>>>> } >>>>>> >>>>>> Thanks in advance. >>>>>> >>>>>> Regards, >>>>>> Timo >>>>>> >>>>>> >>>> >