Hi Ventura! Concerning (1) :
What would be good is to make the "org.apache.flink.runtime.instance.InstanceConnectionInfo" in the getruntimeContext()'s RuntimeContext object. In order to do that, we could need to move that into the flink-core package. We could also rename it simply to "ConnectionInfo" Concerning (2) : I think this may be a bit harder to add. I am curious what your results are without this optimization. Stephan On Mon, Jun 8, 2015 at 4:49 PM, Ventura Del Monte <venturadelmo...@gmail.com > wrote: > Hi Stephan, > > Many thank for your reply! > > 1) This would be a nice feature. I have already done something similar, if > you told me which informations you would like to export in the runtime > context, I could add them to my code, update unit tests and share them. > > 2) Yes, I have figured that out. However, I needed this kind of local > repartition since I was working on a dataset sampler based on the filter > operator (this is the first step of the iterative pipeline I am > developing). To be honest, this repartition is just a plus because I have > already achieved good results (even if a sampler like the one offered by > spark when the ratio is low would be a good feature). The main drawback of > this filter operation is that it takes in input always the same partition, > so, if the partition is enough big, then the probability of sampling > different items in consecutive filtering operations should be high (of > course, using a good sampling factor and a correctly seeded rng). Yet if it > was possible to shuffle the partitions on the same task manager, the > following sampling operation would benefit, in my opinion, as the produced > partition would contain different items with an even higher probability. Of > course, I think this shuffle operation (being local to each tm) should not > involve neither a network nor a disk transfer, otherwise, the game is not > worth the candle. > About the change of parallelism, I read that it triggers a sort of local > re-distribution, but I do no think it is my case. Anyway, do you think this > kind of shuffling/sampling can be achieved in flink? Does it make sense in > your opinion? > > > Best Regards, > Ventura > > 2015-06-03 14:57 GMT+02:00 Stephan Ewen <se...@apache.org>: > >> Hi Ventura! >> >> Sorry for the late response. Here are a few ideas or comments that may >> help you: >> >> 1) We want to make it possible for a function (such as MapFunction) to >> figure out on which TaskManager it is running. The mechanism would be >> something like "getRuntimeContext().getTaskManagerInformation()". That >> should help you determine which TaskManager you are. >> >> 2) When you are scheduling tasks, it is not guaranteed that slots 0, 1, >> 2, ... are on the same TaskManager. The assignment is a based on locality >> of the input data stream and the availability of slots. >> >> >> Can you explain a bit more what the feature you want to add actually >> tries to achieve? Then I may be able to give you more pointers. >> >> When you say that you need local re-distribution, does it imply something >> like below, where a change of parallelism between operators implies that >> the only locally repartition (not across the boundaries of TaskManagers)? >> >> >> (map) (map) (map) (map) >> \ / \ / >> \ / \ / >> (reduce) (reduce) >> ^ ^ ^ ^ >> | \ / | >> | +------+ | >> | / \ | >> (source) (source) >> >> >> >> Greetings, >> Stephan >> >> >> >> On Fri, May 22, 2015 at 10:58 AM, Ventura Del Monte < >> venturadelmo...@gmail.com> wrote: >> >>> Hello, >>> >>> I am trying to introduce a new feature in my flink project, I would like >>> to shuffle (random repartition) my dataset only locally to a task manager, >>> so that each internal worker will have a different set of objects to work >>> on. I have looked to internal flink mechanism, and I know (i hope) how it >>> handles partitions. I think there are two ways to do it: >>> >>> a) using a mapPartiton, which for each input object X should output a >>> tuple (X, destinationChannel), where the destinationChannel is the id of >>> the new worker that will receive X. The main problem of this solution is to >>> determine the correct destinationChannel in the mapPartition task. I think >>> every operation in flink is unaware of the task manager on which it is >>> executed, so I will need to read taskmanager config in order to get the >>> number of slots available on the current TM, but then how should I relate >>> this number to the total channels count, since I could have a situation >>> like this: >>> >>> +----+----+----+----+----+----+----+----+----+---+---+---+---+----+ >>> | | | | | | | | | | | | | | | >>> | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10| 11| 12| 13 | >>> +----+----+----+---------+----+----+----+----+--------------------+ >>> | | | | >>> | TM1 | TM2 | TM3 | >>> +-------------------+----------------------------+----------------+ >>> >>> So even if I knew TM2 had 6 slots, i would not be able to know their id >>> range -> [4,9] >>> >>> b) Destination channels are choosen in >>> RegularPactTask.getOutputCollector, so some modifications of this method >>> would make the local repartition possible using either a range or a >>> custom partition, in order to make them taskmanager-aware. Yet this will >>> involve some edits to flink runtime. >>> >>> Tbh, I would like to avoid the b. but I think I am at a dead end, and I >>> will have to edit it. >>> >>> Do you have better suggestions? Thank you in advance. >>> >> >> >