you see, the core of ALS 1.0.0 is the following code:
there should be flatMap and groupByKey when running ALS iterations , right?
but when I run als iteration, there are ONLY flatMap tasks...
do you know why? 

 private def updateFeatures(
                              products: RDD[(Int, Array[Array[Double]])],
                              productOutLinks: RDD[(Int, OutLinkBlock)],
                              userInLinks: RDD[(Int, InLinkBlock)],
                              partitioner: Partitioner,
                              rank: Int,
                              lambda: Double,
                              alpha: Double,
                              YtY: Option[Broadcast[DoubleMatrix]])
  : RDD[(Int, Array[Array[Double]])] =
  {
    val numBlocks = products.partitions.size
    productOutLinks.join(products).flatMap { case (bid, (outLinkBlock,
factors)) =>
      val toSend = Array.fill(numBlocks)(new ArrayBuffer[Array[Double]])
      for (p <- 0 until outLinkBlock.elementIds.length; userBlock <- 0 until
numBlocks) {
        if (outLinkBlock.shouldSend(p)(userBlock)) {
          toSend(userBlock) += factors(p)
        }
      }
      toSend.zipWithIndex.map{ case (buf, idx) => (idx, (bid, buf.toArray))
}
    }.groupByKey(new HashPartitioner(numBlocks)) //这里1.0.0 的
als代码有bug,那个版本用的是传入的partitioner,起不到作用,会导致data skew
      .join(userInLinks)
      .mapValues{ case (messages, inLinkBlock) =>
      updateBlock(messages, inLinkBlock, rank, lambda, alpha, YtY)
    }
  }




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/what-are-the-types-of-tasks-when-running-ALS-iterations-tp21966.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to