I am trying to create a custom streaming source in Spark 2.3.0 and getting the following error:
scala> 2018-03-19 17:43:20 ERROR MicroBatchExecution:91 - Query [id = 48bb7a4c-7c66-4ad3-926b-81f8369a6efb, runId = 50800f9b-434d-43df-8d6a-3e0fdc865aeb] terminated with error java.lang.AssertionError: assertion failed: DataFrame returned by getBatch from MyDataStreamSource@6c88c38b did not have isStreaming=true LogicalRDD [string#5], false A similar issue was experienced in azure-eventhubs-databricks_2.11:3.3 which seemed to have been fixed in azure-eventhubs-databricks_2.11:3.4 See - https://github.com/Azure/azure-event-hubs-spark/issues/213 Could this be related to https://issues.apache.org/jira/browse/SPARK-21765 ? Any help/pointers greatly appreciated !! If it matters, here's my trivial custom streaming source: import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext} import org.apache.spark.sql.execution.streaming.{Offset, Source} import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.{Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD import sun.font.TrueTypeFont class MyDataStreamProvider extends DataSourceRegister with StreamSourceProvider { val DEFAULT_NUM_PARTITIONS = "2" val DEFAULT_ROWS_PER_PARTITION = "5" private val myDataStreamSchema: StructType = new StructType(Array[StructField](new StructField("string", StringType, false))) override def shortName(): String = "mydatastream" override def sourceSchema(sqlContext: SQLContext, schema: Option[StructType], providerName: String, parameters: Map[String, String]): (String, StructType) = { ("MyDataStream", schema.getOrElse(myDataStreamSchema)) } override def createSource(sqlContext: SQLContext, metadataPath: String, schema: Option[StructType], providerName: String, parameters: Map[String, String]): Source = { val numPartitions: Int = parameters.getOrElse("numpartitions", DEFAULT_NUM_PARTITIONS).toInt val rowsPerPartition: Int = parameters.getOrElse("rowsperpartition", DEFAULT_ROWS_PER_PARTITION).toInt new MyDataStreamSource(sqlContext, schema.getOrElse(myDataStreamSchema), numPartitions, rowsPerPartition) } } class MyDataStreamSource(sqlContext: SQLContext, override val schema: StructType, numPartitions: Int, numRowsPerPartition: Int) extends Source { override def getOffset: Option[Offset] = Some(new MyDataStreamOffset(offset = System.currentTimeMillis())) override def commit(end: Offset): Unit = {} override def stop: Unit = {} override def getBatch(start: Option[Offset], end: Offset): DataFrame = { val batchStreamTime = System.currentTimeMillis() val rdd: RDD[Row] = new MyDataStreamRDD(sqlContext.sparkContext, batchStreamTime, numPartitions, numRowsPerPartition) val df = sqlContext.createDataFrame(rdd, schema) df } } class MyDataStreamOffset(offset: Long) extends Offset { def value: Long = offset override def json: String = s"""{"offset" : ${offset}}""" } class MyDataStreamRDD(_sc: SparkContext, batchStreamTime: Long, numPartitions: Int, rowsPerPartition: Int) extends RDD[Row](_sc, Nil) { override def getPartitions: Array[Partition] = { val partitionSeq: Seq[Int] = 0 until numPartitions val partitions = partitionSeq.map(partitionId => new MyDataStreamPartition(partitionId, batchStreamTime, rowsPerPartition)) partitions.toArray } override def compute(partition: Partition, context: TaskContext): Iterator[Row] = { val myDataSourcePartition = partition.asInstanceOf[MyDataStreamPartition] val partitionId = myDataSourcePartition.index val rows = myDataSourcePartition.rowCount val time = myDataSourcePartition.batchStreamTime val partitionData = 1 to rows map(r => Row(s"Partition: ${partitionId} for time ${time}, row ${r} of ${rows}")) partitionData.iterator } } class MyDataStreamPartition(partitionId: Int, time: Long, rows: Int) extends Partition with Serializable { override def index: Int = partitionId override def toString: String = s"Partition: ${partitionId} Time: ${time} Rows: ${rows}" def batchStreamTime: Long = time def rowCount: Int = rows }