Hi All,
I am having some trouble trying to write generic code that uses sqlContext
and RDDs. Can you suggest what might be wrong?
class SparkTable[T : ClassTag](val sqlContext:SQLContext, val extractor:
(String) => (T) ) {
private[this] var location:Option[String] =None
private[this] var name:Option[String]=None
private[this] val sc = sqlContext.sparkContext
...
def makeRDD(sqlQuery:String):SchemaRDD={
require(this.location!=None)
require(this.name!=None)
import sqlContext._
val rdd:RDD[String] = sc.textFile(this.location.get)
val rddT:RDD[T] = rdd.map(extractor)
val schemaRDD:SchemaRDD= createSchemaRDD(rddT)
schemaRDD.registerAsTable(name.get)
val all = sqlContext.sql(sqlQuery)
all
}
}
I use it as below:
def extractor(line:String):POJO={
val splits= line.split(pattern).toList
POJO(splits(0),splits(1),splits(2),splits(3))
}
val pojoTable:SparkTable[POJO] = new
SparkTable[POJO](sqlContext,extractor)
val identityData:SchemaRDD=
pojoTable.atLocation("hdfs://location/table")
.withName("pojo")
.makeRDD("SELECT * FROM pojo")
I get compilation failure
inferred type arguments [T] do not conform to method createSchemaRDD's type
parameter bounds [A <: Product]
[error] val schemaRDD:SchemaRDD= createSchemaRDD(rddT)
[error] ^
[error] SparkTable.scala:37: type mismatch;
[error] found : org.apache.spark.rdd.RDD[T]
[error] required: org.apache.spark.rdd.RDD[A]
[error] val schemaRDD:SchemaRDD= createSchemaRDD(rddT)
[error] ^
[error] two errors found
I am probably missing something basic either in scala reflection/types or
implicits?
Any hints would be appreciated.
Thanks
Amit