Hi Fabian,
I implemented your approach from above. However, the runtime decides to run
two subtasks on the same node, resulting in an out of memory error.
I thought partitioning really does partition the data to nodes, but now it
seems like its partitioning to tasks, and tasks can be one the same
Of course!
On 21 September 2015 at 19:10, Fabian Hueske wrote:
> The custom partitioner does not know its task id but the mapper that
> assigns the partition ids knows its subtaskid.
>
> So if the mapper with subtask id 2 assigns partition ids 2 and 7, only 7
> will be send over the network.
> O
The custom partitioner does not know its task id but the mapper that
assigns the partition ids knows its subtaskid.
So if the mapper with subtask id 2 assigns partition ids 2 and 7, only 7
will be send over the network.
On Sep 21, 2015 6:56 PM, "Stefan Bunk" wrote:
> Hi Fabian,
>
> that sounds g
Hi Fabian,
that sounds good, thank you.
One final question: As I said earlier, this also distributes data in some
unnecessary cases, say ID 4 sends data to ID 3.
Is there no way to find out the ID of the current node? I guess that number
is already available on the node and just needs to be expos
Hi Stefan,
I think I have a solution for your problem :-)
1) Distribute both parts of the small data to each machine (you have done
that)
2) Your mapper should have a parallelism of 10, the tasks with ID 0 to 4
(get ID via RichFunction.getRuntimeContext().getIndexOfThisSubtask()) read
the first h
Hi Fabian,
the local file problem would however not exist, if I just copy both halves
to all nodes, right?
Lets say I have a file `1st` and a file `2nd`, which I copy to all nodes.
Now with your approach from above, I do:
// helper broadcast datasets to know on which half to operate
val data1stH
Hi Stefan,
the problem is that you cannot directly influence the scheduling of tasks
to nodes to ensure that you can read the data that you put in the local
filesystems of your nodes. HDFS gives a shared file system which means that
each node can read data from anywhere in the cluster.
I assumed t
Hi Fabian,
I think we might have a misunderstanding here. I have already copied the
first file to five nodes, and the second file to five other nodes, outside
of Flink. In the open() method of the operator, I just read that file via
normal Java means. I do not see, why this is tricky or how HDFS s
Hi Stefan,
forcing the scheduling of tasks to certain nodes and reading files from the
local file system in a multi-node setup is actually quite tricky and
requires a bit understanding of the internals.
It is possible and I can help you with that, but would recommend to use a
shared filesystem suc
Hi,
actually, I am distributing my data before the program starts, without
using broadcast sets.
However, the approach should still work, under one condition:
> DataSet mapped1 =
> data.flatMap(yourMap).withBroadcastSet(smallData1,"data").setParallelism(5);
> DataSet mapped2 =
> data.flatMap(you
Hi Stefan,
I agree with Sachin's approach. That should be the easiest solution and
would look like:
env.setParallelism(10); // default is 10
DataSet data = env.read(...) // large data set
DataSet smallData1 = env.read(...) // read first part of small data
DataSet smallData2 = env.read(...) // re
Of course, someone else might have better ideas in re the partitioner. :)
On Sep 14, 2015 1:12 AM, "Sachin Goel" wrote:
> Hi Stefan
> Just a clarification : The output corresponding to an element based on the
> whole data will be a union of the outputs based on the two halves. Is this
> what you'
Hi Stefan
Just a clarification : The output corresponding to an element based on the
whole data will be a union of the outputs based on the two halves. Is this
what you're trying to achieve? [It appears like that since every flatMap
task will independently produce outputs.]
In that case, one solu
Hi!
Following problem: I have 10 nodes on which I want to execute a flatMap
operator on a DataSet. In the open method of the operator, some data is
read from disk and preprocessed, which is necessary for the operator.
Problem is, the data does not fit in memory on one node, however, half of
the da
14 matches
Mail list logo