Hi Timo, there are several restrictions for forwarded fields of operators with iterator input. 1) forwarded fields must be emitted in the order in which they are received through the iterator 2) all forwarded fields of a record must stick together, i.e., if your function builds record from field 0 of the 1st, 3rd, 5th, ... and field 1 of the 2nd, 4th, ... record coming through the iterator, these are not valid forwarded fields. 3) it is OK to completely filter out records coming through the iterator.
The reason for these rules is, that the optimizer uses forwarded fields to reason about physical data properties such as order and grouping. If you mix up the order of records or emit records which are composed from different input records, you might destroy a (secondary) order or grouping. Considering these rules, your second example is correct as well. In case of the TriadBuilder, the information is correct (in the context of the Program) as well, because field 0 is used as key. It is however true, that there is a strange dependency between the function and the context in which it is used within the program. It would be better to remove the class annotation, and add this information through the .withForwardedFields("0") method in the program, to make that clear. It is very good that you raise this point. This is currently not reflected in the documentation is should be made clear very soon. I will open a JIRA for that. Thanks, Fabian 2015-03-06 10:19 GMT+01:00 Timo Walther <twal...@apache.org>: > Hey all, > > I'm currently working a lot on the UDF static code analyzer. But I have a > general question about Semantic Properties which might be also interesting > for other users. > > How is the ForwardedFields annotation interpreted for UDF functions with > Iterables? > > An example can be found in: org.apache.flink.examples. > java.graph.EnumTrianglesBasic.TriadBuilder > > Does this mean that each call of "collect" must happen in the same order > than the call of "next"? But this is not the case in the example above. Or > does the annotation only refer to the first iterator element? > > Other examples: > > @ForwardedFields("*") // CORRECT? > public static class GroupReduce1 implements > GroupReduceFunction<Tuple2<Long, > Long>,Tuple2<Long, Long>> { > @Override > public void reduce(Iterable<Tuple2<Long, Long>> values, > Collector<Tuple2<Long, Long>> out) throws Exception { > out.collect(values.iterator().next()); > } > } > > @ForwardedFields("*") // NOT CORRECT? > public static class GroupReduce3 implements > GroupReduceFunction<Tuple2<Long, > Long>,Tuple2<Long, Long>> { > @Override > public void reduce(Iterable<Tuple2<Long, Long>> values, > Collector<Tuple2<Long, Long>> out) throws Exception { > Iterator<Tuple2<Long, Long>> it = values.iterator(); > while (it.hasNext()) { > Tuple2<Long,Long> t = it.next(); > if (t.f0 == 42) { > out.collect(t); > } > } > } > } > > Thanks in advance. > > Regards, > Timo >