Hello, I am trying to introduce a new feature in my flink project, I would like to shuffle (random repartition) my dataset only locally to a task manager, so that each internal worker will have a different set of objects to work on. I have looked to internal flink mechanism, and I know (i hope) how it handles partitions. I think there are two ways to do it:
a) using a mapPartiton, which for each input object X should output a tuple (X, destinationChannel), where the destinationChannel is the id of the new worker that will receive X. The main problem of this solution is to determine the correct destinationChannel in the mapPartition task. I think every operation in flink is unaware of the task manager on which it is executed, so I will need to read taskmanager config in order to get the number of slots available on the current TM, but then how should I relate this number to the total channels count, since I could have a situation like this: +----+----+----+----+----+----+----+----+----+---+---+---+---+----+ | | | | | | | | | | | | | | | | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10| 11| 12| 13 | +----+----+----+---------+----+----+----+----+--------------------+ | | | | | TM1 | TM2 | TM3 | +-------------------+----------------------------+----------------+ So even if I knew TM2 had 6 slots, i would not be able to know their id range -> [4,9] b) Destination channels are choosen in RegularPactTask.getOutputCollector, so some modifications of this method would make the local repartition possible using either a range or a custom partition, in order to make them taskmanager-aware. Yet this will involve some edits to flink runtime. Tbh, I would like to avoid the b. but I think I am at a dead end, and I will have to edit it. Do you have better suggestions? Thank you in advance.