Nope. All of them are registered from the driver program.
However, I think we've found the culprit. If the join column between two
tables is not in the same column position in both tables, it triggers what
appears to be a bug. For example, this program fails:
import org.apache.spark.SparkContext._
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SchemaRDD
import org.apache.spark.sql.catalyst.types._
case class Record(value: String, key: Int)
case class Record2(key: Int, value: String)
object TestJob {
def main(args: Array[String]) {
run()
}
private def run() {
val sparkConf = new SparkConf()
sparkConf.setAppName("TestJob")
sparkConf.set("spark.cores.max", "8")
sparkConf.set("spark.storage.memoryFraction", "0.1")
sparkConf.set("spark.shuffle.memoryFracton", "0.2")
sparkConf.set("spark.executor.memory", "2g")
sparkConf.setJars(List("target/scala-2.10/spark-test-assembly-1.0.jar"))
sparkConf.setMaster(s"spark://dev1.dev.pulse.io:7077")
sparkConf.setSparkHome("/home/pulseio/spark/current")
val sc = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
val rdd1 = sc.parallelize((1 to 100).map(i => Record(s"val_$i", i)))
val rdd2 = sc.parallelize((1 to 100).map(i => Record2(i, s"rdd_$i")))
rdd1.registerAsTable("rdd1")
rdd2.registerAsTable("rdd2")
sql("SELECT * FROM rdd1").collect.foreach { row => println(row) }
sql("SELECT rdd1.key, rdd1.value, rdd2.value FROM rdd1 join rdd2 on
rdd1.key = rdd2.key order by rdd1.key").collect.foreach { row =>
println(row) }
sc.stop()
}
}
If you change the definition of Record and Record2 to the following, it
succeeds:
case class Record(key: Int, value: String)
case class Record2(key: Int, value: String)
as does:
case class Record(value: String, key: Int)
case class Record2(value: String, key: Int)
Let me know if you need anymore details.
On Tue, Jul 15, 2014 at 11:14 AM, Michael Armbrust <[email protected]>
wrote:
> Are you registering multiple RDDs of case classes as tables concurrently?
> You are possibly hitting SPARK-2178
> <https://issues.apache.org/jira/browse/SPARK-2178> which is caused by
> SI-6240 <https://issues.scala-lang.org/browse/SI-6240>.
>
>
> On Tue, Jul 15, 2014 at 10:49 AM, Keith Simmons <[email protected]>
> wrote:
>
>> HI folks,
>>
>> I'm running into the following error when trying to perform a join in my
>> code:
>>
>> java.lang.NoClassDefFoundError: Could not initialize class
>> org.apache.spark.sql.catalyst.types.LongType$
>>
>> I see similar errors for StringType$ and also:
>>
>> scala.reflect.runtime.ReflectError: value apache is not a package.
>>
>> Strangely, if I just work with a single table, everything is fine. I can
>> iterate through the records in both tables and print them out without a
>> problem.
>>
>> Furthermore, this code worked without an exception in Spark 1.0.0
>> (thought the join caused some field corruption, possibly related to
>> https://issues.apache.org/jira/browse/SPARK-1994
>> <https://www.google.com/url?q=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FSPARK-1994&sa=D&sntz=1&usg=AFQjCNHNxePxWgmuymCQSprulDZZcOn4-Q>).
>> The data is coming from a custom protocol buffer based format on hdfs that
>> is being mapped into the individual record types without a problem.
>>
>> The immediate cause seems to be a task trying to deserialize one or more
>> SQL case classes before loading the spark uber jar, but I have no idea why
>> this is happening, or why it only happens when I do a join. Ideas?
>>
>> Keith
>>
>> P.S. If it's relevant, we're using the Kryo serializer.
>>
>>
>>
>