The custom partitioner does not know its task id but the mapper that
assigns the partition ids knows its subtaskid.

So if the mapper with subtask id 2 assigns partition ids 2 and 7, only 7
will be send over the network.
On Sep 21, 2015 6:56 PM, "Stefan Bunk" <stefan.b...@googlemail.com> wrote:

> Hi Fabian,
>
> that sounds good, thank you.
>
> One final question: As I said earlier, this also distributes data in some
> unnecessary cases, say ID 4 sends data to ID 3.
> Is there no way to find out the ID of the current node? I guess that
> number is already available on the node and just needs to be exposed
> somehow, right?
>
> Cheers
> Stefan
>
>
>
> On 17 September 2015 at 18:39, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Stefan,
>>
>> I think I have a solution for your problem :-)
>>
>> 1) Distribute both parts of the small data to each machine (you have done
>> that)
>> 2) Your mapper should have a parallelism of 10, the tasks with ID 0 to 4
>> (get ID via RichFunction.getRuntimeContext().getIndexOfThisSubtask()) read
>> the first half, tasks 5 to 9 read the second half.
>> 3) Give the large input into a FlatMapper which sends out two records for
>> each incoming record and assigns the first outgoing record a task ID in
>> range 0 to 4 and the second outgoing record an ID in range 5 to 9.
>> 4) Have a custom partitioner (DataSet.partitionCustom()) after the
>> duplicating mapper, which partitions the records based on the assigned task
>> Id before they go into the mapper with the other smaller data set. A record
>> with assigned task ID 0 will be sent to the mapper task with subtask index
>> 0.
>>
>> This setup is not very nice, but should work.
>>
>> Let me know, if you need more detail.
>>
>> Cheers, Fabian
>>
>> 2015-09-16 21:44 GMT+02:00 Stefan Bunk <stefan.b...@googlemail.com>:
>>
>>> Hi Fabian,
>>>
>>> the local file problem would however not exist, if I just copy both
>>> halves to all nodes, right?
>>>
>>> Lets say I have a file `1st` and a file `2nd`, which I copy to all nodes.
>>> Now with your approach from above, I do:
>>>
>>> // helper broadcast datasets to know on which half to operate
>>> val data1stHalf = env.fromCollection("1st")
>>> val data2ndHalf = env.fromCollection("2nd")
>>>
>>> val mapped1 = data.flatMap(yourMap).withBroadcastSet(data1stHalf,
>>> "fileName").setParallelism(5)
>>> val mapped2 = data.flatMap(yourMap).withBroadcastSet(data2ndHalf,
>>> "fileName").setParallelism(5)
>>> DataSet result = mapped1.union(mapped2)
>>>
>>> Then, in my custom operator implementation of flatMap I check the helper
>>> broadcast data to know which file to load:
>>> override def open(params: Configuration): Unit = {
>>> val fileName =
>>> getRuntimeContext.getBroadcastVariable[String]("fileName")(0)
>>> // read the file from the local filesystem which I copied there earlier
>>> this.data = loadFromFileIntoDatastructure("/home/data/" + fileName)
>>> }
>>> override def flatMap(document: Input, out: Collector[Output]): Unit = {
>>> // do sth. with this.data and the input
>>> out.collect(this.data.process(input))
>>> }
>>>
>>> I think this should work, or do you see another problem here?
>>>
>>> Which brings us to the other question:
>>> The both halves are so large, that one half of the data fits in the
>>> user-remaining memory on a node, but not both halves. So my program would
>>> probably memory-crash, if the scheduling trusts one node so much, that it
>>> wants to execute two flatMaps there ;-).
>>>
>>> You are saying, that it is not guaranteed, that all 10 nodes are used,
>>> but how likely is it, that one node is given two flatMaps and another one
>>> is basically idling? I have no idea of the internals, but I guess there is
>>> some heuristic inside which decides how to distribute.In the normal setup
>>> that all 10 nodes are up, connection is good, all nodes have the same
>>> resources available, input data is evenly distributed in HDFS, then the
>>> default case should be to distribute to all 10 nodes, right?
>>>
>>> I am not running in production, so for me it would be ok, if this works
>>> out usually.
>>>
>>> Cheers
>>> Stefan
>>>
>>>
>>> On 15 September 2015 at 23:40, Fabian Hueske <fhue...@gmail.com> wrote:
>>>
>>>> Hi Stefan,
>>>>
>>>> the problem is that you cannot directly influence the scheduling of
>>>> tasks to nodes to ensure that you can read the data that you put in the
>>>> local filesystems of your nodes. HDFS gives a shared file system which
>>>> means that each node can read data from anywhere in the cluster.
>>>> I assumed the data is small enough to broadcast because you want to
>>>> keep it in memory.
>>>>
>>>> Regarding your question. It is not guaranteed that two different tasks,
>>>> each with parallelism 5, will be distributed to all 10 nodes (even if you
>>>> have only 10 processing slots).
>>>> What would work is to have one map task with parallelism 10 and a Flink
>>>> setup with 10 task managers on 10 machines with only one processing slot
>>>> per TM. However, you won't be able to replicate the data to both sets of
>>>> maps because you cannot know which task instance will be executed on which
>>>> machine (you cannot distinguish the tasks of both task sets).
>>>>
>>>> As I said, reading from local file system in a cluster and forcing task
>>>> scheduling to specific nodes is quite tricky.
>>>> Cheers, Fabian
>>>>
>>>> 2015-09-15 23:15 GMT+02:00 Stefan Bunk <stefan.b...@googlemail.com>:
>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> I think we might have a misunderstanding here. I have already copied
>>>>> the first file to five nodes, and the second file to five other nodes,
>>>>> outside of Flink. In the open() method of the operator, I just read that
>>>>> file via normal Java means. I do not see, why this is tricky or how HDFS
>>>>> should help here.
>>>>> Then, I have a normal Flink DataSet, which I want to run through the
>>>>> operator (using the previously read data in the flatMap implementation). 
>>>>> As
>>>>> I run the program several times, I do not want to broadcast the data every
>>>>> time, but rather just copy it on the nodes, and be fine with it.
>>>>>
>>>>> Can you answer my question from above? If the setParallelism-method
>>>>> works and selects five nodes for the first flatMap and five _other_ nodes
>>>>> for the second flatMap, then that would be fine for me if there is no 
>>>>> other
>>>>> easy solution.
>>>>>
>>>>> Thanks for your help!
>>>>> Best
>>>>> Stefan
>>>>>
>>>>>
>>>>> On 14 September 2015 at 22:28, Fabian Hueske <fhue...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Stefan,
>>>>>>
>>>>>> forcing the scheduling of tasks to certain nodes and reading files
>>>>>> from the local file system in a multi-node setup is actually quite tricky
>>>>>> and requires a bit understanding of the internals.
>>>>>> It is possible and I can help you with that, but would recommend to
>>>>>> use a shared filesystem such as HDFS if that is possible.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> 2015-09-14 19:16 GMT+02:00 Stefan Bunk <stefan.b...@googlemail.com>:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> actually, I am distributing my data before the program starts,
>>>>>>> without using broadcast sets.
>>>>>>>
>>>>>>> However, the approach should still work, under one condition:
>>>>>>>
>>>>>>>> DataSet mapped1 =
>>>>>>>> data.flatMap(yourMap).withBroadcastSet(smallData1,"data").setParallelism(5);
>>>>>>>> DataSet mapped2 =
>>>>>>>> data.flatMap(yourMap).withBroadcastSet(smallData2,"data").setParallelism(5);
>>>>>>>>
>>>>>>> Is it guaranteed, that this selects a disjoint set of nodes, i.e.
>>>>>>> five nodes for mapped1 and five other nodes for mapped2?
>>>>>>>
>>>>>>> Is there any way of selecting the five nodes concretely? Currently,
>>>>>>> I have stored the first half of the data on nodes 1-5 and the second 
>>>>>>> half
>>>>>>> on nodes 6-10. With this approach, I guess, nodes are selected randomly 
>>>>>>> so
>>>>>>> I would have to copy both halves to all of the nodes.
>>>>>>>
>>>>>>> Best,
>>>>>>> Stefan
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to