Hi,

On Thu, Mar 12, 2015 at 12:08 AM, Huang, Jie <jie.hu...@intel.com> wrote:
>
> According to my understanding, your approach is to register a series of
> tables by using transformWith, right? And then, you can get a new Dstream
> (i.e., SchemaDstream), which consists of lots of SchemaRDDs.
>

Yep, it's basically the following:

case class SchemaDStream(sqlc: SQLContext,
                         dataStream: DStream[Row],
                         schemaStream: DStream[StructType]) {
  def registerStreamAsTable(name: String): Unit = {
    foreachRDD(_.registerTempTable(name))
  }

  def foreachRDD(func: SchemaRDD => Unit): Unit = {
    def executeFunction(dataRDD: RDD[Row], schemaRDD: RDD[StructType]):
RDD[Unit] = {
      val schema: StructType = schemaRDD.collect.head
      val dataWithSchema: SchemaRDD = sqlc.applySchema(dataRDD, schema)
      val result = func(dataWithSchema)
      schemaRDD.map(x => result) // return RDD[Unit]
    }
    dataStream.transformWith(schemaStream, executeFunction
_).foreachRDD(_.count())
  }

}

In a similar way you could add a `transform(func: SchemaRDD => SchemaRDD)`
method. But as I said, I am not sure about performance.

Tobias

Reply via email to