you see, the core of ALS 1.0.0 is the following code: there should be flatMap and groupByKey when running ALS iterations , right? but when I run als iteration, there are ONLY flatMap tasks... do you know why?
private def updateFeatures( products: RDD[(Int, Array[Array[Double]])], productOutLinks: RDD[(Int, OutLinkBlock)], userInLinks: RDD[(Int, InLinkBlock)], partitioner: Partitioner, rank: Int, lambda: Double, alpha: Double, YtY: Option[Broadcast[DoubleMatrix]]) : RDD[(Int, Array[Array[Double]])] = { val numBlocks = products.partitions.size productOutLinks.join(products).flatMap { case (bid, (outLinkBlock, factors)) => val toSend = Array.fill(numBlocks)(new ArrayBuffer[Array[Double]]) for (p <- 0 until outLinkBlock.elementIds.length; userBlock <- 0 until numBlocks) { if (outLinkBlock.shouldSend(p)(userBlock)) { toSend(userBlock) += factors(p) } } toSend.zipWithIndex.map{ case (buf, idx) => (idx, (bid, buf.toArray)) } }.groupByKey(new HashPartitioner(numBlocks)) //这里1.0.0 的 als代码有bug,那个版本用的是传入的partitioner,起不到作用,会导致data skew .join(userInLinks) .mapValues{ case (messages, inLinkBlock) => updateBlock(messages, inLinkBlock, rank, lambda, alpha, YtY) } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/what-are-the-types-of-tasks-when-running-ALS-iterations-tp21966.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org