Hi, Sparkers:
I wonder if I can convert a RDD of my own Java class into DataFrame in Spark 
1.3.
Here is what I tried to archive, I want to load the data from Cassandra, and 
store them into HDFS using either AVRO or Parquet format. I want to test if I 
can do this in Spark. 
I am using Spark 1.3.1, with Cassandra Spark Connector 1.3. If I create a 
DataFrame directly using Cassandra Spark Connector 1.3, I have a problem to 
handle the UUID type in the Cassandra in the Spark.
So I will try to create a RDD instead in the Cassandra Spark Connector 1.3, and 
save the data into a Java Object generated from the AVRO Schema, but I have 
problem to convert that RDD to DataFrame.
If I use a case class, it works fine for me, as below:
scala>val rdd = sc.cassandraTable("keyspace_name", "tableName")rdd: 
com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow]
 = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:15
scala>case class Output (id1: Long, id2: String)scala>val outputRdd = 
rdd.map(row => Output(row.getLong("id1"), 
row.getUUID("id2").toString))scala>import sqlContext.implicits._scala> val df = 
outputRdd.toDFoutputDF: org.apache.spark.sql.DataFrame = [id1: bigint, id2: 
string]
So the above code works fine for a simple case class.
But the table in the Cassaandra is more complex that this, so I want to reuse a 
Java object which generated from an AVRO schema which matches with the 
Cassandra table. 
Let's say there is already a Java Class named "Coupon", which in fact is a Java 
class generated from the AVRO schema, but the following code not working:
scala>val rdd = sc.cassandraTable("keyspace_name", "tableName")rdd: 
com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow]
 = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:15
scala>case class Output (id1: Long, id2: String)scala>val outputRdd = 
rdd.map(row => new Coupon(row.getLong("id1", 
row.getUUID("id2").toString))outputRdd: org.apache.spark.rdd.RDD[Coupon] = 
MapPartitionsRDD[4] at map at <console>:30scala>import 
sqlContext.implicits._scala> val df = outputRdd.toDF<console>:32: error: value 
toDF is not a member of org.apache.spark.rdd.RDD[Coupon]       val outputDF = 
outputRdd.toDF
So my questions are:
1) Why a case class works above, but not a customize Java class? Does the toDF 
ONLY works with a Scala class?2) I have to use DataFrame, as I want to output 
to Avro format, which only doable for DataFrame, not RDD. But I need the option 
to convert UUID with toString, so this type can work. What is my option?3) I 
know that there is SQLContext.createDataframe method, but I only have AVRO 
schema, not a DataFrame schema. If I have to use this method, instead of 
toDF(), any easy way to get the DataFrame schema from an AVRO schema?4) The 
real schema of "coupon" has quite some structs, and even nested structure, so I 
don't want to create a case class in this case. I want to reuse my AVRO class, 
can I do that?
Thanks
Yong                                      

Reply via email to