[ https://issues.apache.org/jira/browse/FLINK-4613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15528812#comment-15528812 ]
ASF GitHub Bot commented on FLINK-4613: --------------------------------------- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/2542#discussion_r80862241 --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ImplicitALSTest.scala --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.recommendation + +import org.apache.flink.ml.util.FlinkTestBase +import org.scalatest._ + +import scala.language.postfixOps +import org.apache.flink.api.scala._ +import org.apache.flink.core.testutils.CommonTestUtils + +class ImplicitALSTest + extends FlatSpec + with Matchers + with FlinkTestBase { + + override val parallelism = 2 + + behavior of "The modification of the alternating least squares (ALS) implementation" + + "for implicit feedback datasets." + + it should "properly compute Y^T * Y, and factorize matrix" in { + import ExampleMatrix._ + + val rand = scala.util.Random + val numBlocks = 3 + // randomly split matrix to blocks + val blocksY = Y + // add a random block id to every row + .map { row => + (rand.nextInt(numBlocks), row) + } + // get the block via grouping + .groupBy(_._1).values + // add a block id (-1) to each block + .map(b => (-1, b.map(_._2))) + .toSeq + + // use Flink to compute YtY + val env = ExecutionEnvironment.getExecutionEnvironment + + val distribBlocksY = env.fromCollection(blocksY) + + val YtY = ALS + .computeXtX(distribBlocksY, factors) + .collect().head + + // check YtY size + YtY.length should be (factors * (factors - 1) / 2 + factors) + + // check result is as expected + expectedUpperTriangleYtY + .zip(YtY) + .foreach { case (expected, result) => + result should be (expected +- 0.1) + } + + // temporary directory to avoid too few memory segments + val tempDir = CommonTestUtils.getTempDir + "/" + + // factorize matrix with implicit ALS + val als = ALS() + .setIterations(iterations) + .setLambda(lambda) + .setBlocks(blocks) + .setNumFactors(factors) + .setImplicit(true) + .setAlpha(alpha) + .setSeed(seed) + .setTemporaryPath(tempDir) + + val inputDS = env.fromCollection(implicitRatings) + + als.fit(inputDS) + + // check predictions on some user-item pairs + val testData = env.fromCollection(expectedResult.map{ + case (userID, itemID, rating) => (userID, itemID) + }) + + val predictions = als.predict(testData).collect() + + predictions.length should equal(expectedResult.length) + + val resultMap = expectedResult map { + case (uID, iID, value) => (uID, iID) -> value + } toMap + + predictions foreach { + case (uID, iID, value) => { + resultMap.isDefinedAt((uID, iID)) should be(true) + + value should be(resultMap((uID, iID)) +- 1e-5) + } + } + + } + +} + +object ExampleMatrix { --- End diff -- Data should go to the `Recommendation.scala` file, as with the plain ALS matrix. > Extend ALS to handle implicit feedback datasets > ----------------------------------------------- > > Key: FLINK-4613 > URL: https://issues.apache.org/jira/browse/FLINK-4613 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library > Reporter: Gábor Hermann > Assignee: Gábor Hermann > > The Alternating Least Squares implementation should be extended to handle > _implicit feedback_ datasets. These datasets do not contain explicit ratings > by users, they are rather built by collecting user behavior (e.g. user > listened to artist X for Y minutes), and they require a slightly different > optimization objective. See details by [Hu et > al|http://dx.doi.org/10.1109/ICDM.2008.22]. > We do not need to modify much in the original ALS algorithm. See [Spark ALS > implementation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala], > which could be a basis for this extension. Only the updating factor part is > modified, and most of the changes are in the local parts of the algorithm > (i.e. UDFs). In fact, the only modification that is not local, is > precomputing a matrix product Y^T * Y and broadcasting it to all the nodes, > which we can do with broadcast DataSets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)