Yes, thanks for the clarification.

If I undertand correctly there is no guarantee that after partitionByHash
sender[i] will be scheduled on a slot after receiver[i], so we will have to
come up with some workaround or tweak the code.

Based on that I think that our options are:

1) extend the scheduler so that the above constraint can be somehow
guaranteed.
2) extend the optimizer so that data source chains are already hash
partitioned can be annotated as such in a way which allows this
partitioning to be reused for a coGroup / join / reduce / groupReduce
operator.

I think that option (2) should be simpler as much of the logic is already
in place. Last time I checked the only problem was that the data source
partitioning metadata was incompatible with the operator partitioning
requirements.

Cheers,
Alex


2015-05-21 21:50 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:

> Hi Alex,
>
> did my previous mail answer these questions as well?
>
> Cheers, Fabian
>
> 2015-05-18 22:03 GMT+02:00 Alexander Alexandrov <
> alexander.s.alexand...@gmail.com>:
>
>> In the dawn of Flink when Flink Operators were still called PACTs (short
>> for Parallelization Contracts) the system used to support the so called
>> "output contracts" via custom annotations that can be attached to the UDF
>> (the ForwardedFields annotation is a descendant of that concept).
>>
>> Amonst others, if I remember correctly there was an output contract
>> indicating that a DataSet is hash-partitioned by key, which was used in
>> order to avoid unnecessary re-partitioning of an input (e.g. for a
>> subsequent reducer, coGroup). I wonder what happened to that, as I can't
>> find it any more - I am looking here:
>>
>>
>> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
>>
>> Help / suggestions how to realize the same functionality with the current
>> version of the API are appreciated.
>>
>> As a fallback, I think that "partitionByHash" could maybe do the trick at
>> the expense of one pipelined pass over the data, but I am not sure whether
>> the receiver IDs are sheduled on the same machines as their sender
>> counterparts. In other words, can I assume that the following happens:
>>
>>
>> machine1:  (task[0])  partitionByHash  (task[0])
>> machine2 : (task[1])  partitionByHash  (task[1])
>> ...
>> machine2 : (task[n])  partitionByHash  (task[n])
>>
>>
>> Cheers,
>> Alexander
>>
>
>

Reply via email to