---------------------------------------------- Code ---------------------------------------------- scala> import org.apache.spark.SparkContext._ import org.apache.spark.SparkContext._
scala> import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD scala> import org.apache.spark.sql.SchemaRDD import org.apache.spark.sql.SchemaRDD scala> import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.HiveContext scala> import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.{SparkConf, SparkContext} scala> val hiveContext: HiveContext = new HiveContext(sc) hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@2de76244 scala> val numDays = 2 numDays: Int = 2 scala> case class Click( /* about 20 fields of type STRING */ ) defined class Click scala> val inputRDD = new Array[SchemaRDD](numDays) inputRDD: Array[org.apache.spark.sql.SchemaRDD] = Array(null, null) scala> for (i <- 1 to numDays) { | if (i < 10) { | inputRDD(i - 1) = hiveContext.parquetFile("hdfs://................" + i) | } else { | inputRDD(i - 1) = hiveContext.parquetFile("hdfs://................" + i) | } | | } SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. scala> var unionRDD = inputRDD(1) unionRDD: org.apache.spark.sql.SchemaRDD = SchemaRDD[1] at RDD at SchemaRDD.scala:104 scala> for (i <- 1 to inputRDD.length - 1) { | unionRDD = unionRDD.unionAll(inputRDD(i)) | } scala> val inputRDD = unionRDD inputRDD: org.apache.spark.sql.SchemaRDD = SchemaRDD[2] at RDD at SchemaRDD.scala:104 scala> scala> inputRDD.registerTempTable("urlInfo") scala> val clickstreamRDD = hiveContext.sql("select * from urlInfo " + | "where guid regexp '^[0-9a-f-]{36}$' " + | "AND ((callerid > 3 AND callerid <10000) OR callerid > 100000 " + | "OR (callerid =3 AND browsertype = 'IE')) " + | "AND countrycode regexp '^[A-Z]{2}$'") clickstreamRDD: org.apache.spark.sql.SchemaRDD = SchemaRDD[3] at RDD at SchemaRDD.scala:104 scala> scala> clickstreamRDD.registerTempTable("clickstream") scala> clickstreamRDD.cache() res4: clickstreamRDD.type = SchemaRDD[3] at RDD at SchemaRDD.scala:104 scala> val guidClickRDD = clickstreamRDD.map(row => (row(7).asInstanceOf[String], { | val value = Click(row(0).asInstanceOf[String], | row(1).asInstanceOf[String], row(2).asInstanceOf[String], | row(3).asInstanceOf[String], row(4).asInstanceOf[String], | row(5).asInstanceOf[String], row(6).asInstanceOf[String], | row(7).asInstanceOf[String], row(8).asInstanceOf[String], | row(9).asInstanceOf[String], row(10).asInstanceOf[String], | row(11).asInstanceOf[String], row(12).asInstanceOf[String], | row(13).asInstanceOf[String], row(14).asInstanceOf[String], | row(15).asInstanceOf[String], row(16).asInstanceOf[String], | row(17).asInstanceOf[String], row(18).asInstanceOf[String], | row(19).asInstanceOf[String]) | value | })) guidClickRDD: org.apache.spark.rdd.RDD[(String, Click)] = MappedRDD[14] at map at <console>:25 scala> val blackList: RDD[(String, Click)] = guidClickRDD.groupByKey().filter(row => row._2.size == 1).map(row => | (row._1.asInstanceOf[String], Click("", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""))) blackList: org.apache.spark.rdd.RDD[(String, Click)] = MappedRDD[27] at map at <console>:27 scala> val guidClickFRDD = guidClickRDD.subtractByKey(blackList) guidClickFRDD: org.apache.spark.rdd.RDD[(String, Click)] = SubtractedRDD[28] at subtractByKey at <console>:29 scala> guidClickFRDD.reduceByKey((x, y) => { | /* commutative and associative function */ | Click("US", "US", "US", "US", "US", "US", "US", "US", "US", "US", "US", "US", "US", "US", "US", "US", "US", "US", "US", "US") | }).take(200).foreach(println) ---------------------------------------------- EXPECTED OUTPUT ---------------------------------------------- (Key_A,Click(US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US)) (Key_B,Click(US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US)) (Key_C,Click(US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US)) (Key_D,Click(US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US)) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ERROR-YarnClientClusterScheduler-Lost-executor-Akka-client-disassociated-tp20625p20626.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org