Github user gaborhermann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2838#discussion_r89489117
  
    --- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala
 ---
    @@ -72,14 +77,142 @@ trait Predictor[Self] extends Estimator[Self] with 
WithParameters {
         */
       def evaluate[Testing, PredictionValue](
           testing: DataSet[Testing],
    -      evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit
    -      evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue])
    +      evaluateParameters: ParameterMap = ParameterMap.Empty)
    +      (implicit evaluator: EvaluateDataSetOperation[Self, Testing, 
PredictionValue])
         : DataSet[(PredictionValue, PredictionValue)] = {
         FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment)
         evaluator.evaluateDataSet(this, evaluateParameters, testing)
       }
     }
     
    +trait RankingPredictor[Self] extends Estimator[Self] with WithParameters {
    +  that: Self =>
    +
    +  def predictRankings(
    +    k: Int,
    +    users: DataSet[Int],
    +    predictParameters: ParameterMap = ParameterMap.Empty)(implicit
    +    rankingPredictOperation : RankingPredictOperation[Self])
    +  : DataSet[(Int,Int,Int)] =
    +    rankingPredictOperation.predictRankings(this, k, users, 
predictParameters)
    +
    +  def evaluateRankings(
    +    testing: DataSet[(Int,Int,Double)],
    +    evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit
    +    rankingPredictOperation : RankingPredictOperation[Self])
    +  : DataSet[(Int,Int,Int)] = {
    +    // todo: do not burn 100 topK into code
    +    predictRankings(100, testing.map(_._1).distinct(), evaluateParameters)
    +  }
    +}
    +
    +trait RankingPredictOperation[Instance] {
    +  def predictRankings(
    +    instance: Instance,
    +    k: Int,
    +    users: DataSet[Int],
    +    predictParameters: ParameterMap = ParameterMap.Empty)
    +  : DataSet[(Int, Int, Int)]
    +}
    +
    +/**
    +  * Trait for providing auxiliary data for ranking evaluations.
    +  *
    +  * They are useful e.g. for excluding items found in the training 
[[DataSet]]
    +  * from the recommended top K items.
    +  */
    +trait TrainingRatingsProvider {
    +
    +  def getTrainingData: DataSet[(Int, Int, Double)]
    +
    +  /**
    +    * Retrieving the training items.
    +    * Although this can be calculated from the training data, it requires 
a costly
    +    * [[DataSet.distinct]] operation, while in matrix factor models the 
set items could be
    +    * given more efficiently from the item factors.
    +    */
    +  def getTrainingItems: DataSet[Int] = {
    +    getTrainingData.map(_._2).distinct()
    +  }
    +}
    +
    +/**
    +  * Ranking predictions for the most common case.
    +  * If we can predict ratings, we can compute top K lists by sorting the 
predicted ratings.
    +  */
    +class RankingFromRatingPredictOperation[Instance <: 
TrainingRatingsProvider]
    +(val ratingPredictor: PredictDataSetOperation[Instance, (Int, Int), (Int, 
Int, Double)])
    +  extends RankingPredictOperation[Instance] {
    +
    +  private def getUserItemPairs(users: DataSet[Int], items: DataSet[Int], 
exclude: DataSet[(Int, Int)])
    +  : DataSet[(Int, Int)] = {
    +    users.cross(items)
    --- End diff --
    
    You're right. Although there's not much we can do generally to avoid this, 
we might be able to optimize for matrix factorization. This solution works for 
*every* predictor that predicts ratings, and we currently use it in ALS 
([here](https://github.com/apache/flink/pull/2838/files/45c98a97ef82d1012062dbcf6ade85a8d566062d#diff-80639a21b8fd166b5f7df5280cd609a9R467)).
 With a matrix factorization model *specifically*, we can avoid materializing 
all user-item pairs as tuples, and compute the rankings more directly, and that 
might be more efficient. So we could use a more specific `RankingPredictor` 
implementation in `ALS`. But even in that case, we still need to go through all 
the items for a particular user to calculate the top k items for that user.
    
    Also this is only calculated with for the users we'd like to give rankings 
to. E.g. in a testing scenario, for the users in the test data which might be 
significantly less than the users in the training data.
    
    I suggest to keep this anyway as this is general. We might come up with a 
solution that's slightly efficient in most cases for MF models. Should put 
effort in working on it? What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to