[ 
https://issues.apache.org/jira/browse/FLINK-4613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15528812#comment-15528812
 ] 

ASF GitHub Bot commented on FLINK-4613:
---------------------------------------

Github user thvasilo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2542#discussion_r80862241
  
    --- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ImplicitALSTest.scala
 ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.recommendation
    +
    +import org.apache.flink.ml.util.FlinkTestBase
    +import org.scalatest._
    +
    +import scala.language.postfixOps
    +import org.apache.flink.api.scala._
    +import org.apache.flink.core.testutils.CommonTestUtils
    +
    +class ImplicitALSTest
    +  extends FlatSpec
    +    with Matchers
    +    with FlinkTestBase {
    +
    +  override val parallelism = 2
    +
    +  behavior of "The modification of the alternating least squares (ALS) 
implementation" +
    +    "for implicit feedback datasets."
    +
    +  it should "properly compute Y^T * Y, and factorize matrix" in {
    +    import ExampleMatrix._
    +
    +    val rand = scala.util.Random
    +    val numBlocks = 3
    +    // randomly split matrix to blocks
    +    val blocksY = Y
    +      // add a random block id to every row
    +      .map { row =>
    +        (rand.nextInt(numBlocks), row)
    +      }
    +      // get the block via grouping
    +      .groupBy(_._1).values
    +      // add a block id (-1) to each block
    +      .map(b => (-1, b.map(_._2)))
    +      .toSeq
    +
    +    // use Flink to compute YtY
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +
    +    val distribBlocksY = env.fromCollection(blocksY)
    +
    +    val YtY = ALS
    +      .computeXtX(distribBlocksY, factors)
    +      .collect().head
    +
    +    // check YtY size
    +    YtY.length should be (factors * (factors - 1) / 2 + factors)
    +
    +    // check result is as expected
    +    expectedUpperTriangleYtY
    +      .zip(YtY)
    +      .foreach { case (expected, result) =>
    +        result should be (expected +- 0.1)
    +      }
    +
    +    // temporary directory to avoid too few memory segments
    +    val tempDir = CommonTestUtils.getTempDir + "/"
    +
    +    // factorize matrix with implicit ALS
    +    val als = ALS()
    +      .setIterations(iterations)
    +      .setLambda(lambda)
    +      .setBlocks(blocks)
    +      .setNumFactors(factors)
    +      .setImplicit(true)
    +      .setAlpha(alpha)
    +      .setSeed(seed)
    +      .setTemporaryPath(tempDir)
    +
    +    val inputDS = env.fromCollection(implicitRatings)
    +
    +    als.fit(inputDS)
    +
    +    // check predictions on some user-item pairs
    +    val testData = env.fromCollection(expectedResult.map{
    +      case (userID, itemID, rating) => (userID, itemID)
    +    })
    +
    +    val predictions = als.predict(testData).collect()
    +
    +    predictions.length should equal(expectedResult.length)
    +
    +    val resultMap = expectedResult map {
    +      case (uID, iID, value) => (uID, iID) -> value
    +    } toMap
    +
    +    predictions foreach {
    +      case (uID, iID, value) => {
    +        resultMap.isDefinedAt((uID, iID)) should be(true)
    +
    +        value should be(resultMap((uID, iID)) +- 1e-5)
    +      }
    +    }
    +
    +  }
    +
    +}
    +
    +object ExampleMatrix {
    --- End diff --
    
    Data should go to the `Recommendation.scala` file, as with the plain ALS 
matrix.


> Extend ALS to handle implicit feedback datasets
> -----------------------------------------------
>
>                 Key: FLINK-4613
>                 URL: https://issues.apache.org/jira/browse/FLINK-4613
>             Project: Flink
>          Issue Type: New Feature
>          Components: Machine Learning Library
>            Reporter: Gábor Hermann
>            Assignee: Gábor Hermann
>
> The Alternating Least Squares implementation should be extended to handle 
> _implicit feedback_ datasets. These datasets do not contain explicit ratings 
> by users, they are rather built by collecting user behavior (e.g. user 
> listened to artist X for Y minutes), and they require a slightly different 
> optimization objective. See details by [Hu et 
> al|http://dx.doi.org/10.1109/ICDM.2008.22].
> We do not need to modify much in the original ALS algorithm. See [Spark ALS 
> implementation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala],
>  which could be a basis for this extension. Only the updating factor part is 
> modified, and most of the changes are in the local parts of the algorithm 
> (i.e. UDFs). In fact, the only modification that is not local, is 
> precomputing a matrix product Y^T * Y and broadcasting it to all the nodes, 
> which we can do with broadcast DataSets. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to