Hi Andrew, I might be wrong, but I think this problem is caused by an assumption of how Flink reads input data. In Flink, each InputSplit is not read by a new task and a split does not correspond to a partition. This is different from how Hadoop MR and Spark handle InputSplits.
Instead, Flink creates as many DataSource tasks as specified by the task parallelism and lazily assigns InputSplits to its subtasks. Idle DataSource subtasks request InputSplits from the JobManager and the assignment happens first-come-first-serve. Hence, the subtask ID (or partition ID) of an InputSplit is not deterministic and a DataSource might read more than one or also no split at all (such as in your case). If you need the split ID in your program, you can implement an InputFormat, which wraps another IF and assigns the ID of the current InputSplit to the read data, i.e., converts the DataType from T to Tuple2[Int, T]. Hope this helps, Fabian 2016-04-25 11:27 GMT+02:00 Till Rohrmann <trohrm...@apache.org>: > Hi Andrew, > > I think the problem is that you assume that both matrices have the same > partitioning. If you guarantee that this is the case, then you can use the > subtask index as the block index. But in the general case this is not true, > and then you have to calculate the blocks by first assigning a block index > (e.g. rows with 0-9 index are assigned to block 0, rows with 10-19 assigned > to block 1, etc.) and then create the blocks by reducing on this block > index. That's because the distribution of the individual rows in the > cluster is not necessarily the same between two matrices. > > Cheers, > Till > > On Mon, Apr 25, 2016 at 1:40 AM, Andrew Palumbo <ap....@outlook.com> > wrote: > > > Hi All, > > > > > > I've run into a problem with empty partitions when the number of elements > > in a DataSet is less than the Degree of Parallelism. I've created a gist > > here to describe it: > > > > > > https://gist.github.com/andrewpalumbo/1768dac6d2f5fdf963abacabd859aaf3 > > > > > > I have two 2x2 matrices, Matrix A and Matrix B and an execution > > environment where the degree of parallelism is 4. Both matrices are > > blockified in 2 different DataSet s . In this case (the case of a 2x2 > > matrices with 4 partitions) this means that each row goes into a > partition > > leaving 2 empty partitions. In Matrix A, the rows go into partitions 0, > 1. > > However the rows of Matrix B end up in partitions 1, 2. I assign the > > ordinal index of the blockified matrix's partition to its block, and then > > join on that index. > > > > > > However in this case, with differently partitioned matrices of the same > > geometry, the intersection of the blockified matrices' indices is 1, and > > partitions 0 and 2 are dropped. > > > > > > I've tried explicitly defining the dop for Matrix B using the count of > > non-empty partitions in Matrix A, however this changes the order of the > > DataSet, placing partition 2 into partition 0. > > > > > > Is there a way to make sure that these datasets are partitioned in the > > same way? > > > > > > Thank you, > > > > > > Andy > > > > > > >