Hi,

I'm trying to transform one RDD two times. I'm using foreachParition and
embedded I have two map transformations on it. First time, it works fine and
I get results, but second time I call map on it, it behaves like RDD has no
elements.
This is my code:

    val credentialsIdsScala: Seq[java.lang.Long] =
credentialsIds.asScala.toSeq
    println("ALL CREDENTIALS:" + credentialsIdsScala.mkString(","))


    val credentialsRDD: RDD[Long] = sc.parallelize(credentialsIdsScala.map {
Long2long })
    val connector = CassandraConnector(sc.getConf)
    credentialsRDD.foreachPartition {
      credentials => {
        val userCourseKMeansProfiles: Iterator[Iterable[Tuple5[Long, String,
Long, Long, String]]] = credentials.map { credentialid =>
          println("RUNNING USER PROFILE CLUSTERING FOR CREDENTIAL:" +
credentialid)
          val userCourseProfile: Iterable[Tuple5[Long, String, Long, Long,
String]] = runPeriodicalKMeansClustering(dbName, days, numClusters,
numFeatures, credentialid)
          userCourseProfile
        }
        userCourseKMeansProfiles.foreach(userProfile => {
          val query = "INSERT INTO " + dbName + "." +
TablesNames.PROFILE_USERQUARTILE_FEATURES_BYPROFILE + "(course, 
profile,date, userid, sequence) VALUES (?, ?, ?,?,?) ";
          connector.withSessionDo {
            session => {
              userProfile.foreach(record => {
                println("USER PROFILE RECORD:" + record._1 + " " + record._2
+ " " + record._3 + " " + record._4 + " " + record._5)
                session.execute(query,
record._1.asInstanceOf[java.lang.Long], record._2.asInstanceOf[String],
record._3.asInstanceOf[java.lang.Long],
record._4.asInstanceOf[java.lang.Long], record._5.asInstanceOf[String])
              })
            }
          }
        })
        val secondMapping = credentials.map {
          credentialid =>
            println("credential id:" + credentialid)
            credentialid
        }
        secondMapping.foreach(cid=>println("credentialid:"+cid))
        println("Second mapping:" + secondMapping.length)
      }

Could someone explain me what is wrong with my code and how to fix it?

Thanks,
Zoran




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-transform-RDD-for-the-second-time-tp28441.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to