Of course! On 21 September 2015 at 19:10, Fabian Hueske <fhue...@gmail.com> wrote:
> The custom partitioner does not know its task id but the mapper that > assigns the partition ids knows its subtaskid. > > So if the mapper with subtask id 2 assigns partition ids 2 and 7, only 7 > will be send over the network. > On Sep 21, 2015 6:56 PM, "Stefan Bunk" <stefan.b...@googlemail.com> wrote: > >> Hi Fabian, >> >> that sounds good, thank you. >> >> One final question: As I said earlier, this also distributes data in some >> unnecessary cases, say ID 4 sends data to ID 3. >> Is there no way to find out the ID of the current node? I guess that >> number is already available on the node and just needs to be exposed >> somehow, right? >> >> Cheers >> Stefan >> >> >> >> On 17 September 2015 at 18:39, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi Stefan, >>> >>> I think I have a solution for your problem :-) >>> >>> 1) Distribute both parts of the small data to each machine (you have >>> done that) >>> 2) Your mapper should have a parallelism of 10, the tasks with ID 0 to 4 >>> (get ID via RichFunction.getRuntimeContext().getIndexOfThisSubtask()) read >>> the first half, tasks 5 to 9 read the second half. >>> 3) Give the large input into a FlatMapper which sends out two records >>> for each incoming record and assigns the first outgoing record a task ID in >>> range 0 to 4 and the second outgoing record an ID in range 5 to 9. >>> 4) Have a custom partitioner (DataSet.partitionCustom()) after the >>> duplicating mapper, which partitions the records based on the assigned task >>> Id before they go into the mapper with the other smaller data set. A record >>> with assigned task ID 0 will be sent to the mapper task with subtask index >>> 0. >>> >>> This setup is not very nice, but should work. >>> >>> Let me know, if you need more detail. >>> >>> Cheers, Fabian >>> >>> 2015-09-16 21:44 GMT+02:00 Stefan Bunk <stefan.b...@googlemail.com>: >>> >>>> Hi Fabian, >>>> >>>> the local file problem would however not exist, if I just copy both >>>> halves to all nodes, right? >>>> >>>> Lets say I have a file `1st` and a file `2nd`, which I copy to all >>>> nodes. >>>> Now with your approach from above, I do: >>>> >>>> // helper broadcast datasets to know on which half to operate >>>> val data1stHalf = env.fromCollection("1st") >>>> val data2ndHalf = env.fromCollection("2nd") >>>> >>>> val mapped1 = data.flatMap(yourMap).withBroadcastSet(data1stHalf, >>>> "fileName").setParallelism(5) >>>> val mapped2 = data.flatMap(yourMap).withBroadcastSet(data2ndHalf, >>>> "fileName").setParallelism(5) >>>> DataSet result = mapped1.union(mapped2) >>>> >>>> Then, in my custom operator implementation of flatMap I check the >>>> helper broadcast data to know which file to load: >>>> override def open(params: Configuration): Unit = { >>>> val fileName = >>>> getRuntimeContext.getBroadcastVariable[String]("fileName")(0) >>>> // read the file from the local filesystem which I copied there earlier >>>> this.data = loadFromFileIntoDatastructure("/home/data/" + fileName) >>>> } >>>> override def flatMap(document: Input, out: Collector[Output]): Unit = { >>>> // do sth. with this.data and the input >>>> out.collect(this.data.process(input)) >>>> } >>>> >>>> I think this should work, or do you see another problem here? >>>> >>>> Which brings us to the other question: >>>> The both halves are so large, that one half of the data fits in the >>>> user-remaining memory on a node, but not both halves. So my program would >>>> probably memory-crash, if the scheduling trusts one node so much, that it >>>> wants to execute two flatMaps there ;-). >>>> >>>> You are saying, that it is not guaranteed, that all 10 nodes are used, >>>> but how likely is it, that one node is given two flatMaps and another one >>>> is basically idling? I have no idea of the internals, but I guess there is >>>> some heuristic inside which decides how to distribute.In the normal setup >>>> that all 10 nodes are up, connection is good, all nodes have the same >>>> resources available, input data is evenly distributed in HDFS, then the >>>> default case should be to distribute to all 10 nodes, right? >>>> >>>> I am not running in production, so for me it would be ok, if this works >>>> out usually. >>>> >>>> Cheers >>>> Stefan >>>> >>>> >>>> On 15 September 2015 at 23:40, Fabian Hueske <fhue...@gmail.com> wrote: >>>> >>>>> Hi Stefan, >>>>> >>>>> the problem is that you cannot directly influence the scheduling of >>>>> tasks to nodes to ensure that you can read the data that you put in the >>>>> local filesystems of your nodes. HDFS gives a shared file system which >>>>> means that each node can read data from anywhere in the cluster. >>>>> I assumed the data is small enough to broadcast because you want to >>>>> keep it in memory. >>>>> >>>>> Regarding your question. It is not guaranteed that two different >>>>> tasks, each with parallelism 5, will be distributed to all 10 nodes (even >>>>> if you have only 10 processing slots). >>>>> What would work is to have one map task with parallelism 10 and a >>>>> Flink setup with 10 task managers on 10 machines with only one processing >>>>> slot per TM. However, you won't be able to replicate the data to both sets >>>>> of maps because you cannot know which task instance will be executed on >>>>> which machine (you cannot distinguish the tasks of both task sets). >>>>> >>>>> As I said, reading from local file system in a cluster and forcing >>>>> task scheduling to specific nodes is quite tricky. >>>>> Cheers, Fabian >>>>> >>>>> 2015-09-15 23:15 GMT+02:00 Stefan Bunk <stefan.b...@googlemail.com>: >>>>> >>>>>> Hi Fabian, >>>>>> >>>>>> I think we might have a misunderstanding here. I have already copied >>>>>> the first file to five nodes, and the second file to five other nodes, >>>>>> outside of Flink. In the open() method of the operator, I just read that >>>>>> file via normal Java means. I do not see, why this is tricky or how HDFS >>>>>> should help here. >>>>>> Then, I have a normal Flink DataSet, which I want to run through the >>>>>> operator (using the previously read data in the flatMap implementation). >>>>>> As >>>>>> I run the program several times, I do not want to broadcast the data >>>>>> every >>>>>> time, but rather just copy it on the nodes, and be fine with it. >>>>>> >>>>>> Can you answer my question from above? If the setParallelism-method >>>>>> works and selects five nodes for the first flatMap and five _other_ nodes >>>>>> for the second flatMap, then that would be fine for me if there is no >>>>>> other >>>>>> easy solution. >>>>>> >>>>>> Thanks for your help! >>>>>> Best >>>>>> Stefan >>>>>> >>>>>> >>>>>> On 14 September 2015 at 22:28, Fabian Hueske <fhue...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Stefan, >>>>>>> >>>>>>> forcing the scheduling of tasks to certain nodes and reading files >>>>>>> from the local file system in a multi-node setup is actually quite >>>>>>> tricky >>>>>>> and requires a bit understanding of the internals. >>>>>>> It is possible and I can help you with that, but would recommend to >>>>>>> use a shared filesystem such as HDFS if that is possible. >>>>>>> >>>>>>> Best, Fabian >>>>>>> >>>>>>> 2015-09-14 19:16 GMT+02:00 Stefan Bunk <stefan.b...@googlemail.com>: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> actually, I am distributing my data before the program starts, >>>>>>>> without using broadcast sets. >>>>>>>> >>>>>>>> However, the approach should still work, under one condition: >>>>>>>> >>>>>>>>> DataSet mapped1 = >>>>>>>>> data.flatMap(yourMap).withBroadcastSet(smallData1,"data").setParallelism(5); >>>>>>>>> DataSet mapped2 = >>>>>>>>> data.flatMap(yourMap).withBroadcastSet(smallData2,"data").setParallelism(5); >>>>>>>>> >>>>>>>> Is it guaranteed, that this selects a disjoint set of nodes, i.e. >>>>>>>> five nodes for mapped1 and five other nodes for mapped2? >>>>>>>> >>>>>>>> Is there any way of selecting the five nodes concretely? Currently, >>>>>>>> I have stored the first half of the data on nodes 1-5 and the second >>>>>>>> half >>>>>>>> on nodes 6-10. With this approach, I guess, nodes are selected >>>>>>>> randomly so >>>>>>>> I would have to copy both halves to all of the nodes. >>>>>>>> >>>>>>>> Best, >>>>>>>> Stefan >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>