Hi all,
as mentioned before I am trying to import the RowMatrix from Spark to Flink…
In the code I already ran into a dead end… In the function
multiplyGramianMatrixBy() (see end of mail) there is the line:
rows.context.broadcast(v) (rows is a DataSet[Vector]
What exactly is this line doing? Does it fill the „content“ of v into the
variable rows?
And another question:
What is the function treeAggregate doing ? And how would you tackle a „copy“ of
that in Flink?
Thanks in advance!
Best regards,
Lydia
private[flink] def multiplyGramianMatrixBy(v: DenseVector[Double]):
DenseVector[Double] = {
val n = numCols().toInt
val vbr = rows.context.broadcast(v)
rows.treeAggregate(BDV.zeros[Double](n))(
seqOp = (U, r) => {
val rBrz = r.toBreeze
val a = rBrz.dot(vbr.data)
rBrz match {
// use specialized axpy for better performance
case _: BDV[_] => brzAxpy(a, rBrz.asInstanceOf[BDV[Double]], U)
case _: BSV[_] => brzAxpy(a, rBrz.asInstanceOf[BSV[Double]], U)
case _ => throw new UnsupportedOperationException(
s"Do not support vector operation from type
${rBrz.getClass.getName}.")
}
U
}, combOp = (U1, U2) => U1 += U2)
}