----------------------------------------------
Code
----------------------------------------------
scala> import org.apache.spark.SparkContext._
import org.apache.spark.SparkContext._

scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD

scala> import org.apache.spark.sql.SchemaRDD
import org.apache.spark.sql.SchemaRDD

scala> import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.HiveContext

scala> import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SparkConf, SparkContext}

scala> val hiveContext: HiveContext = new HiveContext(sc)
hiveContext: org.apache.spark.sql.hive.HiveContext =
org.apache.spark.sql.hive.HiveContext@2de76244

scala> val numDays = 2
numDays: Int = 2

scala> case class Click(
/* about 20 fields of type STRING */
)
defined class Click

scala> val inputRDD = new Array[SchemaRDD](numDays)
inputRDD: Array[org.apache.spark.sql.SchemaRDD] = Array(null, null)

scala> for (i <- 1 to numDays) {
     |             if (i < 10) {
     |                 inputRDD(i - 1) =
hiveContext.parquetFile("hdfs://................" + i)
     |             } else {
     |                 inputRDD(i - 1) =
hiveContext.parquetFile("hdfs://................" + i)
     |             }
     | 
     |         }
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
details.

scala>         var unionRDD = inputRDD(1)
unionRDD: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[1] at RDD at SchemaRDD.scala:104

scala>         for (i <- 1 to inputRDD.length - 1) {
     |             unionRDD = unionRDD.unionAll(inputRDD(i))
     |         }

scala> val inputRDD = unionRDD
inputRDD: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[2] at RDD at SchemaRDD.scala:104
scala> 

scala> inputRDD.registerTempTable("urlInfo")

scala>         val clickstreamRDD = hiveContext.sql("select * from urlInfo "
+
     |             "where guid regexp '^[0-9a-f-]{36}$' " +
     |             "AND ((callerid  > 3 AND callerid <10000) OR callerid >
100000 " +
     |             "OR (callerid    =3 AND browsertype = 'IE')) " +
     |             "AND countrycode regexp '^[A-Z]{2}$'")
clickstreamRDD: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[3] at RDD at SchemaRDD.scala:104
scala> 

scala>         clickstreamRDD.registerTempTable("clickstream")

scala>         clickstreamRDD.cache()
res4: clickstreamRDD.type = 
SchemaRDD[3] at RDD at SchemaRDD.scala:104

scala>     val guidClickRDD = clickstreamRDD.map(row =>
(row(7).asInstanceOf[String], {
     |             val value = Click(row(0).asInstanceOf[String],
     |                 row(1).asInstanceOf[String],
row(2).asInstanceOf[String],
     |                 row(3).asInstanceOf[String],
row(4).asInstanceOf[String],
     |                 row(5).asInstanceOf[String],
row(6).asInstanceOf[String],
     |                 row(7).asInstanceOf[String],
row(8).asInstanceOf[String],
     |                 row(9).asInstanceOf[String],
row(10).asInstanceOf[String],
     |                 row(11).asInstanceOf[String],
row(12).asInstanceOf[String],
     |                 row(13).asInstanceOf[String],
row(14).asInstanceOf[String],
     |                 row(15).asInstanceOf[String],
row(16).asInstanceOf[String],
     |                 row(17).asInstanceOf[String],
row(18).asInstanceOf[String],
     |                 row(19).asInstanceOf[String])
     |             value
     |         }))
guidClickRDD: org.apache.spark.rdd.RDD[(String, Click)] = MappedRDD[14] at
map at <console>:25

scala> val blackList: RDD[(String, Click)] =
guidClickRDD.groupByKey().filter(row => row._2.size == 1).map(row =>
     |             (row._1.asInstanceOf[String], Click("", "", "", "", "",
"", "", "", "", "", "", "", "", "", "", "", "", "", "", "")))
blackList: org.apache.spark.rdd.RDD[(String, Click)] = MappedRDD[27] at map
at <console>:27

scala>         val guidClickFRDD = guidClickRDD.subtractByKey(blackList)
guidClickFRDD: org.apache.spark.rdd.RDD[(String, Click)] = SubtractedRDD[28]
at subtractByKey at <console>:29

scala> guidClickFRDD.reduceByKey((x, y) => {
     |             /* commutative and associative function */
     |             Click("US", "US", "US", "US", "US", "US", "US", "US",
"US", "US", "US", "US", "US", "US", "US", "US", "US", "US", "US", "US")
     | }).take(200).foreach(println)

----------------------------------------------
EXPECTED OUTPUT
----------------------------------------------

(Key_A,Click(US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US))
(Key_B,Click(US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US))
(Key_C,Click(US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US))
(Key_D,Click(US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US))



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ERROR-YarnClientClusterScheduler-Lost-executor-Akka-client-disassociated-tp20625p20626.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to