Thank you Fabian and Till for answering,

 I think that my explanation of the problem was a bit over simplified (I am 
trying to implement an operator that will pass our tests, and didn't want to 
throw too much code at you).  I realize that this is an odd case, a 2x2 matrix 
in a distributed context, but it is for a specific Unit test that we enforce.

So we have two different Distributed Matrix Representations: Row-Wise and 
Blockified.  The Row-Wise representation is a `DataSet[(K, Vector)]` where K is 
e.g., an Int Key and Vector is a row of the Matrix.  The Blockified 
representation is a `DataSet[Array(K), Matrix]`.  In the Gist that I posted, I 
was working with a Blockified Distributed dataset.  Since it was a 2x2 matrix 
that was Blockified into 4 partitions, the non-empty partitions actually 
contain a 1x2 Matrix (rather than a (Vector) "row" as i think It reads I will 
update that to be more clear.   

@Fabian In this case, I am using ExecutionEnvironment.FromCollection to create 
the original Row-Wise Matrix DataSet.  (There are other cases in which we read 
from HDFS directly).  But for this problem I am doing something like:

     val inCoreA = dense((1, 2), (3, 4))
     val inCoreB = dense((3, 5), (4, 6))

     val rowsA = (0 until m.nrow).map(i => (i, inCoreA(i, ::)))
     drmA = env.fromCollection(rows).partitionByRange(0)

     val rowsB = (0 until m.nrow).map(i => (i, inCoreB(i, ::)))
     drmA = env.fromCollection(rows).partitionByRange(0)

>If you need the split ID in your program, you can implement an InputFormat,
>which wraps another IF and assigns the ID of the current InputSplit to the
>read data, i.e., converts the DataType from T to Tuple2[Int, T].

I'm not sure if the partitioning at this point matters (of the row-wise 
Matrices)?  (In next map these into Blockified Matrices)

@Till I think that you're right in that my assumption of Identical partitioning 
is a problem. 

The above Matrices are then mapped into Blockified Matrices currently using the 
method something as follows:

    val blocksA = drmA.mapPartition {
      values =>
        val (keys, vectors) = values.toIterable.unzip

        if (vectors.nonEmpty) {
          val vector = vectors.head
          val matrix: Matrix = if (vector.isDense) {
            val matrix = new DenseMatrix(vectors.size, ncolLocal)
            vectors.zipWithIndex.foreach { case (vec, idx) => matrix(idx, ::) 
:= vec }
            matrix
          } else {
            new SparseRowMatrix(vectors.size, ncolLocal, vectors.toArray)
          }
          Seq((keys.toArray(classTag), matrix))
        } else {
          Seq()
        }
    }

And the same for Matrix B.

Which is where the partition index assignment begins in the gist: 
https://gist.github.com/andrewpalumbo/1768dac6d2f5fdf963abacabd859aaf3


> I think the problem is that you assume that both matrices have the same
> partitioning. If you guarantee that this is the case, then you can use the
> subtask index as the block index. 

Yes I think this is the problem- I'd assumed that when mapping into partitions, 
the 0 partiton would be used first and then the 1 partition and so on...  I 
understand what your saying now though re: lazy assignment via task Id.  So 
essentially the partition that the data ends up in is arbitrary based on the 
task ID that happens to be assigning it.  

But in the general case this is not true,
> and then you have to calculate the blocks by first assigning a block index
> (e.g. rows with 0-9 index are assigned to block 0, rows with 10-19 assigned
> to block 1, etc.) and then create the blocks by reducing on this block
> index. That's because the distribution of the individual rows in the
> cluster is not necessarily the same between two matrices.

I suppose this would have to be done when Blockifying in the method above.  The 
row-wise matrix may be 2x2 or 20000000 x 2 and directly read from HDFS.  I'm 
not sure how to assign divide the data and partition it myself when mapping a 
row-wise matrix into blocks.  Eg. can I know the size of the DataSet before the 
computation is triggered by env.execute()? If I guess what you are saying is to 
hand- partition the data in the above `.asBlockified()` method.


As well is it not still possible that i may end up with the same problem when 
the # of matrix blocks is < the degree of parallelism?



In the end what I really need to do is be able to join the two Bockified 
DataSets (of any size) in the correct order.. so maybe there is an other way to 
do this?


Thanks again for your time.

Andy
________________________________________
From: Fabian Hueske <fhue...@gmail.com>
Sent: Monday, April 25, 2016 6:09 AM
To: dev@flink.apache.org
Subject: Re: Partition problem

Hi Andrew,

I might be wrong, but I think this problem is caused by an assumption of
how Flink reads input data.
In Flink, each InputSplit is not read by a new task and a split does not
correspond to a partition. This is different from how Hadoop MR and Spark
handle InputSplits.

Instead, Flink creates as many DataSource tasks as specified by the task
parallelism and lazily assigns InputSplits to its subtasks. Idle DataSource
subtasks request InputSplits from the JobManager and the assignment happens
first-come-first-serve.
Hence, the subtask ID (or partition ID) of an InputSplit is not
deterministic and a DataSource might read more than one or also no split at
all (such as in your case).

If you need the split ID in your program, you can implement an InputFormat,
which wraps another IF and assigns the ID of the current InputSplit to the
read data, i.e., converts the DataType from T to Tuple2[Int, T].

Hope this helps,
Fabian


2016-04-25 11:27 GMT+02:00 Till Rohrmann <trohrm...@apache.org>:

> Hi Andrew,
>
> I think the problem is that you assume that both matrices have the same
> partitioning. If you guarantee that this is the case, then you can use the
> subtask index as the block index. But in the general case this is not true,
> and then you have to calculate the blocks by first assigning a block index
> (e.g. rows with 0-9 index are assigned to block 0, rows with 10-19 assigned
> to block 1, etc.) and then create the blocks by reducing on this block
> index. That's because the distribution of the individual rows in the
> cluster is not necessarily the same between two matrices.
>
> Cheers,
> Till
>
> On Mon, Apr 25, 2016 at 1:40 AM, Andrew Palumbo <ap....@outlook.com>
> wrote:
>
> > Hi All,
> >
> >
> > I've run into a problem with empty partitions when the number of elements
> > in a DataSet is less than the Degree of Parallelism.  I've created a gist
> > here to describe it:
> >
> >
> > https://gist.github.com/andrewpalumbo/1768dac6d2f5fdf963abacabd859aaf3
> >
> >
> > I have two 2x2 matrices, Matrix A and Matrix B and an execution
> > environment where the degree of parallelism is 4. Both matrices   are
> > blockified in  2 different DataSet s . In this case (the case of a 2x2
> > matrices with 4 partitions) this means that each row goes into a
> partition
> > leaving 2 empty partitions. In Matrix A, the rows go into partitions 0,
> 1.
> > However the rows of Matrix B end up in partitions 1, 2. I assign the
> > ordinal index of the blockified matrix's partition to its block, and then
> > join on that index.
> >
> >
> > However in this case, with differently partitioned matrices of the same
> > geometry, the intersection of the blockified matrices' indices is 1, and
> > partitions 0 and 2 are dropped.
> >
> >
> > I've tried explicitly defining the dop for Matrix B using the count of
> > non-empty partitions in Matrix A, however this changes the order of the
> > DataSet, placing partition 2 into partition 0.
> >
> >
> > Is there a way to make sure that these datasets are partitioned in the
> > same way?
> >
> >
> > Thank you,
> >
> >
> > Andy
> >
> >
> >
>

Reply via email to