Hi, Sparkers:
I wonder if I can convert a RDD of my own Java class into DataFrame in Spark
1.3.
Here is what I tried to archive, I want to load the data from Cassandra, and
store them into HDFS using either AVRO or Parquet format. I want to test if I
can do this in Spark.
I am using Spark 1.3.1, with Cassandra Spark Connector 1.3. If I create a
DataFrame directly using Cassandra Spark Connector 1.3, I have a problem to
handle the UUID type in the Cassandra in the Spark.
So I will try to create a RDD instead in the Cassandra Spark Connector 1.3, and
save the data into a Java Object generated from the AVRO Schema, but I have
problem to convert that RDD to DataFrame.
If I use a case class, it works fine for me, as below:
scala>val rdd = sc.cassandraTable("keyspace_name", "tableName")rdd:
com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow]
= CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:15
scala>case class Output (id1: Long, id2: String)scala>val outputRdd =
rdd.map(row => Output(row.getLong("id1"),
row.getUUID("id2").toString))scala>import sqlContext.implicits._scala> val df =
outputRdd.toDFoutputDF: org.apache.spark.sql.DataFrame = [id1: bigint, id2:
string]
So the above code works fine for a simple case class.
But the table in the Cassaandra is more complex that this, so I want to reuse a
Java object which generated from an AVRO schema which matches with the
Cassandra table.
Let's say there is already a Java Class named "Coupon", which in fact is a Java
class generated from the AVRO schema, but the following code not working:
scala>val rdd = sc.cassandraTable("keyspace_name", "tableName")rdd:
com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow]
= CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:15
scala>case class Output (id1: Long, id2: String)scala>val outputRdd =
rdd.map(row => new Coupon(row.getLong("id1",
row.getUUID("id2").toString))outputRdd: org.apache.spark.rdd.RDD[Coupon] =
MapPartitionsRDD[4] at map at <console>:30scala>import
sqlContext.implicits._scala> val df = outputRdd.toDF<console>:32: error: value
toDF is not a member of org.apache.spark.rdd.RDD[Coupon] val outputDF =
outputRdd.toDF
So my questions are:
1) Why a case class works above, but not a customize Java class? Does the toDF
ONLY works with a Scala class?2) I have to use DataFrame, as I want to output
to Avro format, which only doable for DataFrame, not RDD. But I need the option
to convert UUID with toString, so this type can work. What is my option?3) I
know that there is SQLContext.createDataframe method, but I only have AVRO
schema, not a DataFrame schema. If I have to use this method, instead of
toDF(), any easy way to get the DataFrame schema from an AVRO schema?4) The
real schema of "coupon" has quite some structs, and even nested structure, so I
don't want to create a case class in this case. I want to reuse my AVRO class,
can I do that?
Thanks
Yong