Hi, On Thu, Mar 12, 2015 at 12:08 AM, Huang, Jie <jie.hu...@intel.com> wrote: > > According to my understanding, your approach is to register a series of > tables by using transformWith, right? And then, you can get a new Dstream > (i.e., SchemaDstream), which consists of lots of SchemaRDDs. >
Yep, it's basically the following: case class SchemaDStream(sqlc: SQLContext, dataStream: DStream[Row], schemaStream: DStream[StructType]) { def registerStreamAsTable(name: String): Unit = { foreachRDD(_.registerTempTable(name)) } def foreachRDD(func: SchemaRDD => Unit): Unit = { def executeFunction(dataRDD: RDD[Row], schemaRDD: RDD[StructType]): RDD[Unit] = { val schema: StructType = schemaRDD.collect.head val dataWithSchema: SchemaRDD = sqlc.applySchema(dataRDD, schema) val result = func(dataWithSchema) schemaRDD.map(x => result) // return RDD[Unit] } dataStream.transformWith(schemaStream, executeFunction _).foreachRDD(_.count()) } } In a similar way you could add a `transform(func: SchemaRDD => SchemaRDD)` method. But as I said, I am not sure about performance. Tobias