I added you comment and an answer to FLINK-1656: "Right, that's a good point.
+1 limiting to key fields. That's much easier to reason about for users. However, I am not sure how it is implemented right now. I guess secondary sort info is already removed by the property filtering, but I need to verify that." 2015-03-08 21:53 GMT+01:00 Stephan Ewen <se...@apache.org>: > Any other thoughts in this? > > On Fri, Mar 6, 2015 at 12:12 PM, Stephan Ewen <se...@apache.org> wrote: > > > I think the order of emitting elements is not part of the forward field > > properties, but would rather be a separate one that we do not have right > > now. > > > > At the moment, we would assume that all group operations destroy > secondary > > orders. > > > > In that sense, forward fields in group operations only make sense for > > fields where all fields are the same in the group (key fields). > > > > On Fri, Mar 6, 2015 at 11:25 AM, Fabian Hueske <fhue...@gmail.com> > wrote: > > > >> Hi Timo, > >> > >> there are several restrictions for forwarded fields of operators with > >> iterator input. > >> 1) forwarded fields must be emitted in the order in which they are > >> received > >> through the iterator > >> 2) all forwarded fields of a record must stick together, i.e., if your > >> function builds record from field 0 of the 1st, 3rd, 5th, ... and field > 1 > >> of the 2nd, 4th, ... record coming through the iterator, these are not > >> valid forwarded fields. > >> 3) it is OK to completely filter out records coming through the > iterator. > >> > >> The reason for these rules is, that the optimizer uses forwarded fields > to > >> reason about physical data properties such as order and grouping. If you > >> mix up the order of records or emit records which are composed from > >> different input records, you might destroy a (secondary) order or > >> grouping. > >> > >> Considering these rules, your second example is correct as well. > >> In case of the TriadBuilder, the information is correct (in the context > of > >> the Program) as well, because field 0 is used as key. It is however > true, > >> that there is a strange dependency between the function and the context > in > >> which it is used within the program. It would be better to remove the > >> class > >> annotation, and add this information through the > .withForwardedFields("0") > >> method in the program, to make that clear. > >> > >> It is very good that you raise this point. > >> This is currently not reflected in the documentation is should be made > >> clear very soon. I will open a JIRA for that. > >> > >> Thanks, Fabian > >> > >> > >> > >> 2015-03-06 10:19 GMT+01:00 Timo Walther <twal...@apache.org>: > >> > >> > Hey all, > >> > > >> > I'm currently working a lot on the UDF static code analyzer. But I > have > >> a > >> > general question about Semantic Properties which might be also > >> interesting > >> > for other users. > >> > > >> > How is the ForwardedFields annotation interpreted for UDF functions > with > >> > Iterables? > >> > > >> > An example can be found in: org.apache.flink.examples. > >> > java.graph.EnumTrianglesBasic.TriadBuilder > >> > > >> > Does this mean that each call of "collect" must happen in the same > order > >> > than the call of "next"? But this is not the case in the example > above. > >> Or > >> > does the annotation only refer to the first iterator element? > >> > > >> > Other examples: > >> > > >> > @ForwardedFields("*") // CORRECT? > >> > public static class GroupReduce1 implements > >> GroupReduceFunction<Tuple2<Long, > >> > Long>,Tuple2<Long, Long>> { > >> > @Override > >> > public void reduce(Iterable<Tuple2<Long, Long>> values, > >> > Collector<Tuple2<Long, Long>> out) throws Exception { > >> > out.collect(values.iterator().next()); > >> > } > >> > } > >> > > >> > @ForwardedFields("*") // NOT CORRECT? > >> > public static class GroupReduce3 implements > >> GroupReduceFunction<Tuple2<Long, > >> > Long>,Tuple2<Long, Long>> { > >> > @Override > >> > public void reduce(Iterable<Tuple2<Long, Long>> values, > >> > Collector<Tuple2<Long, Long>> out) throws Exception { > >> > Iterator<Tuple2<Long, Long>> it = values.iterator(); > >> > while (it.hasNext()) { > >> > Tuple2<Long,Long> t = it.next(); > >> > if (t.f0 == 42) { > >> > out.collect(t); > >> > } > >> > } > >> > } > >> > } > >> > > >> > Thanks in advance. > >> > > >> > Regards, > >> > Timo > >> > > >> > > > > >