I am trying to create a custom streaming source in Spark 2.3.0 and getting the 
following error:

scala> 2018-03-19 17:43:20 ERROR MicroBatchExecution:91 - Query [id = 
48bb7a4c-7c66-4ad3-926b-81f8369a6efb, runId = 
50800f9b-434d-43df-8d6a-3e0fdc865aeb] terminated with error
java.lang.AssertionError: assertion failed: DataFrame returned by getBatch from 
MyDataStreamSource@6c88c38b did not have isStreaming=true
LogicalRDD [string#5], false


A similar issue was experienced in azure-eventhubs-databricks_2.11:3.3 which 
seemed to have been fixed in azure-eventhubs-databricks_2.11:3.4
See - https://github.com/Azure/azure-event-hubs-spark/issues/213

Could this be related to https://issues.apache.org/jira/browse/SPARK-21765 ?

Any help/pointers greatly appreciated !!


If it matters, here's my trivial custom streaming source:


import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext}
import org.apache.spark.sql.execution.streaming.{Offset, Source}
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
import sun.font.TrueTypeFont


class MyDataStreamProvider
  extends DataSourceRegister
  with StreamSourceProvider {

  val DEFAULT_NUM_PARTITIONS = "2"
  val DEFAULT_ROWS_PER_PARTITION = "5"

  private val myDataStreamSchema: StructType = new 
StructType(Array[StructField](new StructField("string", StringType, false)))

  override def shortName(): String = "mydatastream"

  override def sourceSchema(sqlContext: SQLContext,
                            schema: Option[StructType],
                            providerName: String,
                            parameters: Map[String, String]): (String, 
StructType) = {
    ("MyDataStream", schema.getOrElse(myDataStreamSchema))
  }

  override def createSource(sqlContext: SQLContext,
                            metadataPath: String,
                            schema: Option[StructType],
                            providerName: String, parameters: Map[String, 
String]): Source = {
    val numPartitions: Int = parameters.getOrElse("numpartitions", 
DEFAULT_NUM_PARTITIONS).toInt
    val rowsPerPartition: Int = parameters.getOrElse("rowsperpartition", 
DEFAULT_ROWS_PER_PARTITION).toInt
    new MyDataStreamSource(sqlContext, schema.getOrElse(myDataStreamSchema), 
numPartitions, rowsPerPartition)
  }

}

class MyDataStreamSource(sqlContext: SQLContext,
                         override val schema: StructType,
                         numPartitions: Int,
                         numRowsPerPartition: Int)
  extends Source {

  override def getOffset: Option[Offset] = Some(new MyDataStreamOffset(offset = 
System.currentTimeMillis()))

  override def commit(end: Offset): Unit = {}

  override def stop: Unit = {}

  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
    val batchStreamTime = System.currentTimeMillis()
    val rdd: RDD[Row] = new MyDataStreamRDD(sqlContext.sparkContext, 
batchStreamTime, numPartitions, numRowsPerPartition)
    val df = sqlContext.createDataFrame(rdd, schema)
    df
  }

}

class MyDataStreamOffset(offset: Long)
  extends Offset {

  def value: Long = offset

  override def json: String = s"""{"offset" : ${offset}}"""

}

class MyDataStreamRDD(_sc: SparkContext,
                      batchStreamTime: Long,
                      numPartitions: Int,
                      rowsPerPartition: Int)
  extends RDD[Row](_sc, Nil) {
  override def getPartitions: Array[Partition] = {
    val partitionSeq: Seq[Int] = 0 until numPartitions
    val partitions = partitionSeq.map(partitionId => new 
MyDataStreamPartition(partitionId, batchStreamTime, rowsPerPartition))
    partitions.toArray
 }

  override def compute(partition: Partition, context: TaskContext): 
Iterator[Row] = {
    val myDataSourcePartition = partition.asInstanceOf[MyDataStreamPartition]
    val partitionId = myDataSourcePartition.index
    val rows = myDataSourcePartition.rowCount
    val time = myDataSourcePartition.batchStreamTime
    val partitionData = 1 to rows map(r => Row(s"Partition: ${partitionId} for 
time ${time}, row ${r} of ${rows}"))
    partitionData.iterator
  }

}


class MyDataStreamPartition(partitionId: Int, time: Long, rows: Int)
  extends Partition
  with Serializable {

  override def index: Int = partitionId

  override def toString: String = s"Partition: ${partitionId}  Time: ${time}  
Rows: ${rows}"

  def batchStreamTime: Long = time

  def rowCount: Int = rows

}

Reply via email to