Hi Stefan,

I agree with Sachin's approach. That should be the easiest solution and
would look like:

env.setParallelism(10); // default is 10

DataSet data = env.read(...) // large data set
DataSet smallData1 = env.read(...) // read first part of small data
DataSet smallData2 = env.read(...) // read second part of small data

DataSet mapped1 =
data.flatMap(yourMap).withBroadcastSet(smallData1,"data").setParallelism(5);
DataSet mapped2 =
data.flatMap(yourMap).withBroadcastSet(smallData2,"data").setParallelism(5);
DataSet result = mapped1.union(mapped2);

Unless you need to split your data into too many partitions this should
work pretty well.
However, I agree that the custom partitioning function is a bit limited.

Best, Fabian


2015-09-13 21:51 GMT+02:00 Sachin Goel <sachingoel0...@gmail.com>:

> Of course, someone else might have better ideas in re the partitioner. :)
> On Sep 14, 2015 1:12 AM, "Sachin Goel" <sachingoel0...@gmail.com> wrote:
>
>> Hi Stefan
>> Just a clarification : The output corresponding to an element based on
>> the whole data will be a union of the outputs based on the two halves. Is
>> this what you're trying to achieve? [It appears  like that since every
>> flatMap task will independently produce outputs.]
>>
>> In that case, one solution could be to simply have two flatMap operations
>> based on parts of the *broadcast* data set, and take a union.
>>
>> Cheers
>> Sachin
>> On Sep 13, 2015 7:04 PM, "Stefan Bunk" <stefan.b...@googlemail.com>
>> wrote:
>>
>>> Hi!
>>>
>>> Following problem: I have 10 nodes on which I want to execute a flatMap
>>> operator on a DataSet. In the open method of the operator, some data is
>>> read from disk and preprocessed, which is necessary for the operator.
>>> Problem is, the data does not fit in memory on one node, however, half of
>>> the data does.
>>> So in five out of ten nodes, I stored one half of the data to be read in
>>> the open method, and the other half on the other five nodes.
>>>
>>> Now my question: How can I distribute my DataSet, so that each element
>>> is sent once to a node with the first half of my data and once to a node
>>> with the other half?
>>>
>>> I looked at implementing a custom partitioner, however my problems were:
>>> (i) I have no mapping from the number I am supposed to return to the
>>> nodes to the data. How do I know, that index 5 contains one half of the
>>> data, and index 6 the other half?
>>> (ii) I do not know the current index. Obviously, I want to send my
>>> DataSet element only once over the network.
>>>
>>> Best,
>>> Stefan
>>>
>>

Reply via email to