Hi Jungtaek, Sorry about the delay in my response and thanks a ton for responding.
I am just trying to build a data pipeline which has a bunch of stages. The goal is to use a Dataset to accumulate the transformation errors that may happen in the stages of the pipeline. As a benefit, I can pass only the filtered Dataframe to the next stage. The stages look something like this: val pipelineStages = List( new AddRowKeyStage(EvergreenSchema), new WriteToHBaseStage(hBaseCatalog), new ReplaceCharDataStage(DoubleColsReplaceMap, EvergreenSchema, DoubleCols), new ReplaceCharDataStage(SpecialCharMap, EvergreenSchema, StringCols), new DataTypeValidatorStage(EvergreenSchema), new DataTypeCastStage(EvergreenSchema) ) Each of the stage's implementation looks something like the following. Some may return errors or some are just side-effecting. Say, the following stage (AddRowKeyStage) just adds an UUID column to each row and therefore returns an empty DataSet[Error]. A DataTypeValidatorStage on the other hand may return a filled in DataSet[Errors] along with the filtered Dataframe value. import cats.data.Writer import com.thoughtworks.awayday.ingest.DataFrameOps import com.thoughtworks.awayday.ingest.UDFs.generateUUID import com.thoughtworks.awayday.ingest.models.ErrorModels.{DataError, DataSetWithErrors} import com.thoughtworks.awayday.ingest.stages.StageConstants.RowKey import org.apache.spark.sql.{DataFrame, Encoder, SparkSession} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ class AddRowKeyStage(schemaWithRowKey: StructType) (implicit spark: SparkSession, encoder: Encoder[DataError]) extends DataStage[DataFrame] { override val stage: String = getClass.getSimpleName def apply(dataRecords: DataFrame): DataSetWithErrors[DataFrame] = addRowKeys(dataRecords) def addRowKeys(data: DataFrame): DataSetWithErrors[DataFrame] = { val colOrder = schemaWithRowKey.fields.map(_.name) val withRowKeyDf = data.withColumn(RowKey, lit(generateUUID())) val returnDf = withRowKeyDf.select(colOrder.map(col): _*) Writer(DataFrameOps.emptyErrors(spark, encoder), returnDf) } } For accumulating the errors at each stage, I am using a Writer monad from the Cats library. I have made provisions that the combination of errors happen automatically by implementing a Semigroup for Spark Dataset. This way, I could do the following and have two Datasets (one for error and one for value) when I start the stream. val validRecordsWithErrors = pipelineStages.foldLeft(initDf) { case (dfWithErrors, stage) => for { df <- dfWithErrors applied <- stage(df) } yield applied } The validRecords is a combination of both transformation errors (left side) and the dataframe of records that has successfully passed through the stages (right) Now, the tricky bit is this : val initDf = Writer(*DataFrameOps.emptyErrorStream(spark)*, sourceRawDf) The "zero" value of the fold and the error value for side-effecting stages must be an empty stream. With Spark batch, I can always use an "emptyDataFrame" but I have no clue on how to achieve this in Spark streaming. Unfortunately, "emptyDataFrame" is not "isStreaming" and therefore I won't be able to union the errors together. I am sorry if I haven't done a good job in explaining it well. Cheers, Arun On Tue, Nov 6, 2018 at 7:34 AM Jungtaek Lim <kabh...@gmail.com> wrote: > Could you explain what you're trying to do? It should have no batch for no > data in stream, so it will end up to no-op even it is possible. > > - Jungtaek Lim (HeartSaVioR) > > 2018년 11월 6일 (화) 오전 8:29, Arun Manivannan <a...@arunma.com>님이 작성: > >> Hi, >> >> I would like to create a "zero" value for a Structured Streaming >> Dataframe and unfortunately, I couldn't find any leads. With Spark batch, >> I can do a "emptyDataFrame" or "createDataFrame" with "emptyRDD" but with >> StructuredStreaming, I am lost. >> >> If I use the "emptyDataFrame" as the zero value, I wouldn't be able to >> join them with any other DataFrames in the program because Spark doesn't >> allow you to mix batch and stream data frames. (isStreaming=false for the >> Batch ones). >> >> Any clue is greatly appreciated. Here are the alternatives that I have at >> the moment. >> >> *1. Reading from an empty file * >> *Disadvantages : poll is expensive because it involves IO and it's error >> prone in the sense that someone might accidentally update the file.* >> >> val emptyErrorStream = (spark: SparkSession) => { >> spark >> .readStream >> .format("csv") >> .schema(DataErrorSchema) >> >> .load("/Users/arunma/IdeaProjects/OSS/SparkDatalakeKitchenSink/src/test/resources/dummy1.txt") >> .as[DataError] >> } >> >> *2. Use MemoryStream* >> >> *Disadvantages: MemoryStream itself is not recommended for production use >> because of the ability to mutate it but I am converting it to DS >> immediately. So, I am leaning towards this at the moment. * >> >> >> val emptyErrorStream = (spark:SparkSession) => { >> implicit val sqlC = spark.sqlContext >> MemoryStream[DataError].toDS() >> } >> >> Cheers, >> Arun >> >