Hi Stefan, I agree with Sachin's approach. That should be the easiest solution and would look like:
env.setParallelism(10); // default is 10 DataSet data = env.read(...) // large data set DataSet smallData1 = env.read(...) // read first part of small data DataSet smallData2 = env.read(...) // read second part of small data DataSet mapped1 = data.flatMap(yourMap).withBroadcastSet(smallData1,"data").setParallelism(5); DataSet mapped2 = data.flatMap(yourMap).withBroadcastSet(smallData2,"data").setParallelism(5); DataSet result = mapped1.union(mapped2); Unless you need to split your data into too many partitions this should work pretty well. However, I agree that the custom partitioning function is a bit limited. Best, Fabian 2015-09-13 21:51 GMT+02:00 Sachin Goel <sachingoel0...@gmail.com>: > Of course, someone else might have better ideas in re the partitioner. :) > On Sep 14, 2015 1:12 AM, "Sachin Goel" <sachingoel0...@gmail.com> wrote: > >> Hi Stefan >> Just a clarification : The output corresponding to an element based on >> the whole data will be a union of the outputs based on the two halves. Is >> this what you're trying to achieve? [It appears like that since every >> flatMap task will independently produce outputs.] >> >> In that case, one solution could be to simply have two flatMap operations >> based on parts of the *broadcast* data set, and take a union. >> >> Cheers >> Sachin >> On Sep 13, 2015 7:04 PM, "Stefan Bunk" <stefan.b...@googlemail.com> >> wrote: >> >>> Hi! >>> >>> Following problem: I have 10 nodes on which I want to execute a flatMap >>> operator on a DataSet. In the open method of the operator, some data is >>> read from disk and preprocessed, which is necessary for the operator. >>> Problem is, the data does not fit in memory on one node, however, half of >>> the data does. >>> So in five out of ten nodes, I stored one half of the data to be read in >>> the open method, and the other half on the other five nodes. >>> >>> Now my question: How can I distribute my DataSet, so that each element >>> is sent once to a node with the first half of my data and once to a node >>> with the other half? >>> >>> I looked at implementing a custom partitioner, however my problems were: >>> (i) I have no mapping from the number I am supposed to return to the >>> nodes to the data. How do I know, that index 5 contains one half of the >>> data, and index 6 the other half? >>> (ii) I do not know the current index. Obviously, I want to send my >>> DataSet element only once over the network. >>> >>> Best, >>> Stefan >>> >>