I try to read a stream using my custom data source (v2, using spark 2.3),
and it fails *in the second iteration* with the following exception while
reading prune columns:Query [id=xxx, runId=yyy] terminated with exception:
assertion failed: Invalid batch: a#660,b#661L,c#662,d#663,,... 26 more
fields != b#1291L

Datafream creation:

val df = sparkSession.readStream.format("myV2Source").load("/")
val df1 = df.filter(df("a") >= "-1").select("b")

Stream execution:

      val streamingQuery = df1
        .writeStream
        .format("console")
        .trigger(Trigger.ProcessingTime("10 seconds"))
        .outputMode(OutputMode.Append())
        .start()

    streamingQuery.awaitTermination()

if I remove the select (i.e. val df1 = df.filter(df("a") >= "-1")), it
works fine.

Any idea why?

Reply via email to