Hi,
I am using Spark 1.2.0 with Hadoop 2.2. Now I have to 2 csv files, but have 8
fields. I know that the first field from both files are IDs. I want to find all
the IDs existed in the first file, but NOT in the 2nd file.
I am coming with the following code in spark-shell.
case class origAsLeft (id: String)case class newAsRight (id: String)val
OrigData = sc.textFile("hdfs://firstfile").map(_.split(",")).map( r=>(r(0),
origAsLeft(r(0))))val NewData =
sc.textFile("hdfs://secondfile").map(_.split(",")).map( r=>(r(0),
newAsRight(r(0))))val output = OrigData.leftOuterJoin(NewData).filter{ case (k,
v) => v._2 == null }
Find what I understand, after OrigData left outer join with NewData, it will
use the id as the key, and a tuple with (leftObject, RightObject).Since I want
to find out all the IDs existed in the first file, but not in the 2nd one, so
the output RDD will be the one I want, as it will filter out only when there is
no newAsRight object from NewData.
Then I run
output.first
Spark does start to run, but give me the following error message:15/02/12
16:43:38 INFO scheduler.DAGScheduler: Job 4 finished: first at <console>:21,
took 78.303549 sjava.lang.UnsupportedOperationException: empty collection
at org.apache.spark.rdd.RDD.first(RDD.scala:1095) at
$iwC$$iwC$$iwC$$iwC.<init>(<console>:21) at
$iwC$$iwC$$iwC.<init>(<console>:26) at $iwC$$iwC.<init>(<console>:28) at
$iwC.<init>(<console>:30) at <init>(<console>:32) at .<init>(<console>:36)
at .<clinit>(<console>) at .<init>(<console>:7) at .<clinit>(<console>) at
$print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:94)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
at java.lang.reflect.Method.invoke(Method.java:619) at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852) at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125) at
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674) at
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873) at
org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785) at
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628) at
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636) at
org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641) at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011) at
org.apache.spark.repl.Main$.main(Main.scala:31) at
org.apache.spark.repl.Main.main(Main.scala) at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:94)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
at java.lang.reflect.Method.invoke(Method.java:619) at
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Did I do anything wrong? What is the way to find all the id in the first file,
but not in the 2nd file?
Second question is how can I use the object field to do the compare in this
case? For example, if I define:
case class origAsLeft (id: String, name: String)case class newAsRight (id:
String, name: String)val OrigData =
sc.textFile("hdfs://firstfile").map(_.split(",")).map( r=>(r(0),
origAsLeft(r(0), r(1))))val NewData =
sc.textFile("hdfs://secondfile").map(_.split(",")).map( r=>(r(0),
newAsRight(r(0), r(1))))// in this case, I want to list all the data in the
first file which has the same ID as in the 2nd file, but with different value
in name, I want to do something like below:
val output = OrigData.join(NewData).filter{ case (k, v) => v._1.name !=
v._2.name }
But what is the syntax to use the field in the case class I defined?
Thanks
Yong