[ https://issues.apache.org/jira/browse/FLINK-4613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15654378#comment-15654378 ]
ASF GitHub Bot commented on FLINK-4613: --------------------------------------- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/2542#discussion_r87421513 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala --- @@ -675,7 +756,69 @@ object ALS { collector.collect((blockID, array)) } } - }.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0") + } + + // broadcasting XtX matrix in the implicit case + val updatedFactorMatrix = if (implicitPrefs) { + newMatrix.withBroadcastSet(XtXtoBroadcast.get, "XtX") + } else { + newMatrix + } + + updatedFactorMatrix.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0") + } + + /** + * Computes the XtX matrix for the implicit version before updating the factors. + * This matrix is intended to be broadcast, but as we cannot use a sink inside a Flink + * iteration, so we represent it as a [[DataSet]] with a single element containing the matrix. + * + * The algorithm computes `X_i^T * X_i` for every block `X_i` of `X`, + * then sums all these computed matrices to get `X^T * X`. + */ + private[recommendation] def computeXtX(x: DataSet[(Int, Array[Array[Double]])], factors: Int): + DataSet[Array[Double]] = { + val triangleSize = factors * (factors - 1) / 2 + factors + + type MtxBlock = (Int, Array[Array[Double]]) + // construct XtX for all blocks + val xtx = x + .mapPartition(new MapPartitionFunction[MtxBlock, Array[Double]]() { + var xtxForBlock: Array[Double] = null + + override def mapPartition(blocks: Iterable[(Int, Array[Array[Double]])], + out: Collector[Array[Double]]): Unit = { + + if (xtxForBlock == null) { + // creating the matrix if not yet created + xtxForBlock = Array.fill(triangleSize)(0.0) + } else { + // erasing the matrix + var i = 0 + while (i < xtxForBlock.length) { --- End diff -- I don't imagine this making a major difference in performance, so let's just go with the cleaner code angle and use `fill`. I wish we had an easy to use integrated way to do proper profiling so such decisions can be easier (i.e. if this is 0.5% of the CPU cost, then optimizing is pointless but right now we don't know) > Extend ALS to handle implicit feedback datasets > ----------------------------------------------- > > Key: FLINK-4613 > URL: https://issues.apache.org/jira/browse/FLINK-4613 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library > Reporter: Gábor Hermann > Assignee: Gábor Hermann > > The Alternating Least Squares implementation should be extended to handle > _implicit feedback_ datasets. These datasets do not contain explicit ratings > by users, they are rather built by collecting user behavior (e.g. user > listened to artist X for Y minutes), and they require a slightly different > optimization objective. See details by [Hu et > al|http://dx.doi.org/10.1109/ICDM.2008.22]. > We do not need to modify much in the original ALS algorithm. See [Spark ALS > implementation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala], > which could be a basis for this extension. Only the updating factor part is > modified, and most of the changes are in the local parts of the algorithm > (i.e. UDFs). In fact, the only modification that is not local, is > precomputing a matrix product Y^T * Y and broadcasting it to all the nodes, > which we can do with broadcast DataSets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)