Hi Lydia, Spark and Flink are not identical. Thus, you’ll concepts in both system which won’t have a corresponding counter part in the other system. For example, rows.context.broadcast(v) broadcasts the value v so that you can use it on all Executors. Flink follows a slightly different concept when you broadcast values. In Flink you’ll always broadcast the contents of DataSets. That way you avoid to collect the result on some central node from which it is then broadcasted.
The treeAggregate is an aggregation operation which is partly executed on the cluster. It is similar to a combinable reduce operation in Flink. However, you can choose an arbitrary result type (similar to a fold operation compared to a reduce operation). You can do the same with Flink if you first apply a combineGroup function on the DataSet and then a reduce function. Cheers, Till On Thu, Feb 4, 2016 at 3:13 PM, Lydia Ickler <ickle...@googlemail.com> wrote: > Hi all, > > as mentioned before I am trying to import the RowMatrix from Spark to > Flink… > > In the code I already ran into a dead end… In the function > multiplyGramianMatrixBy() > (see end of mail) there is the line: > rows.context.broadcast(v) (rows is a DataSet[Vector] > What exactly is this line doing? Does it fill the „content“ of v into the > variable *rows*? > And another question: > What is the function treeAggregate doing ? And how would you tackle > a „copy“ of that in Flink? > > Thanks in advance! > Best regards, > Lydia > > > private[flink] def multiplyGramianMatrixBy(v: DenseVector[Double]): > DenseVector[Double] = { > val n = numCols().toInt > > val vbr = rows.context.broadcast(v) > > rows.treeAggregate(BDV.zeros[Double](n))( > seqOp = (U, r) => { > val rBrz = r.toBreeze > val a = rBrz.dot(vbr.data) > rBrz match { > // use specialized axpy for better performance > case _: BDV[_] => brzAxpy(a, rBrz.asInstanceOf[BDV[Double]], U) > case _: BSV[_] => brzAxpy(a, rBrz.asInstanceOf[BSV[Double]], U) > case _ => throw new UnsupportedOperationException( > s"Do not support vector operation from type > ${rBrz.getClass.getName}.") > } > U > }, combOp = (U1, U2) => U1 += U2) > } > > >