Hi Ventura!

Sorry for the late response. Here are a few ideas or comments that may help
you:

1) We want to make it possible for a function (such as MapFunction) to
figure out on which TaskManager it is running. The mechanism would be
something like "getRuntimeContext().getTaskManagerInformation()". That
should help you determine which TaskManager you are.

2) When you are scheduling tasks, it is not guaranteed that slots 0, 1, 2,
... are on the same TaskManager. The assignment is a based on locality of
the input data stream and the availability of slots.


Can you explain a bit more what the feature you want to add actually tries
to achieve? Then I may be able to give you more pointers.

When you say that you need local re-distribution, does it imply something
like below, where a change of parallelism between operators implies that
the only locally repartition (not across the boundaries of TaskManagers)?


 (map) (map)  (map) (map)
   \     /      \    /
    \   /        \  /
   (reduce)    (reduce)
      ^ ^        ^ ^
      | \        / |
      |  +------+  |
      | /        \ |
   (source)     (source)



Greetings,
Stephan



On Fri, May 22, 2015 at 10:58 AM, Ventura Del Monte <
venturadelmo...@gmail.com> wrote:

> Hello,
>
> I am trying to introduce a new feature in my flink project, I would like
> to shuffle (random repartition) my dataset only locally to a task manager,
> so that each internal worker will have a different set of objects to work
> on. I have looked to internal flink mechanism, and I know (i hope) how it
> handles partitions. I think there are two ways to do it:
>
> a) using a mapPartiton, which for each input object X should output a
> tuple (X, destinationChannel), where the destinationChannel is the id of
> the new worker that will receive X. The main problem of this solution is to
> determine the correct destinationChannel in the mapPartition task. I think
> every operation in flink is unaware of the task manager on which it is
> executed, so I will need to read taskmanager config in order to get the
> number of slots available on the current TM, but then how should I relate
> this number to the total channels count, since I could have a situation
> like this:
>
> +----+----+----+----+----+----+----+----+----+---+---+---+---+----+
> |    |    |    |    |    |    |    |    |    |   |   |   |   |    |
> | 0  | 1  | 2  | 3  | 4  | 5  |  6 |  7 |  8 | 9 | 10| 11| 12| 13 |
> +----+----+----+---------+----+----+----+----+--------------------+
> |                   |                            |                |
> |      TM1          |            TM2             |       TM3      |
> +-------------------+----------------------------+----------------+
>
> So even if I knew TM2 had 6 slots, i would not be able to know their id
> range -> [4,9]
>
> b) Destination channels are choosen in RegularPactTask.getOutputCollector,
> so some modifications of this method would make the local repartition
> possible using either a range or a custom partition, in order to make
> them taskmanager-aware. Yet this will involve some edits to flink runtime.
>
> Tbh, I would like to avoid the b. but I think I am at a dead end, and I
> will have to edit it.
>
> Do you have better suggestions? Thank you in advance.
>

Reply via email to