In the dawn of Flink when Flink Operators were still called PACTs (short for Parallelization Contracts) the system used to support the so called "output contracts" via custom annotations that can be attached to the UDF (the ForwardedFields annotation is a descendant of that concept).
Amonst others, if I remember correctly there was an output contract indicating that a DataSet is hash-partitioned by key, which was used in order to avoid unnecessary re-partitioning of an input (e.g. for a subsequent reducer, coGroup). I wonder what happened to that, as I can't find it any more - I am looking here: https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java Help / suggestions how to realize the same functionality with the current version of the API are appreciated. As a fallback, I think that "partitionByHash" could maybe do the trick at the expense of one pipelined pass over the data, but I am not sure whether the receiver IDs are sheduled on the same machines as their sender counterparts. In other words, can I assume that the following happens: machine1: (task[0]) partitionByHash (task[0]) machine2 : (task[1]) partitionByHash (task[1]) ... machine2 : (task[n]) partitionByHash (task[n]) Cheers, Alexander