Hello,

I am trying to introduce a new feature in my flink project, I would like to
shuffle (random repartition) my dataset only locally to a task manager, so
that each internal worker will have a different set of objects to work on.
I have looked to internal flink mechanism, and I know (i hope) how it
handles partitions. I think there are two ways to do it:

a) using a mapPartiton, which for each input object X should output a tuple
(X, destinationChannel), where the destinationChannel is the id of the new
worker that will receive X. The main problem of this solution is to
determine the correct destinationChannel in the mapPartition task. I think
every operation in flink is unaware of the task manager on which it is
executed, so I will need to read taskmanager config in order to get the
number of slots available on the current TM, but then how should I relate
this number to the total channels count, since I could have a situation
like this:

+----+----+----+----+----+----+----+----+----+---+---+---+---+----+
|    |    |    |    |    |    |    |    |    |   |   |   |   |    |
| 0  | 1  | 2  | 3  | 4  | 5  |  6 |  7 |  8 | 9 | 10| 11| 12| 13 |
+----+----+----+---------+----+----+----+----+--------------------+
|                   |                            |                |
|      TM1          |            TM2             |       TM3      |
+-------------------+----------------------------+----------------+

So even if I knew TM2 had 6 slots, i would not be able to know their id
range -> [4,9]

b) Destination channels are choosen in RegularPactTask.getOutputCollector,
so some modifications of this method would make the local repartition
possible using either a range or a custom partition, in order to make them
taskmanager-aware. Yet this will involve some edits to flink runtime.

Tbh, I would like to avoid the b. but I think I am at a dead end, and I
will have to edit it.

Do you have better suggestions? Thank you in advance.

Reply via email to