Hi Jacek, An interesting question! I don't know the exact answer and will be happy to learn by the way :) Below you can find my understanding for these 2 things, hoping it helps a little.
For me, we can distinguish 2 different source categories. The first of them is a source with some fixed schema. A good example is Apache Kafka which exposes the topic name, key, value and you can't change that; it's always the same, whenever you run the reader in Company A or in Company B. What changes is the data extraction logic from the key, value or headers. But it's business-specific, not data store-specific. You can find the schema implementation here: Kafka <https://github.com/apache/spark/blob/v3.1.1/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala#L122> The second type is a source with user-defined schema, like a RDBMS table or a NoSQL schemaless store. Here, predicting the schema will not only be business-specific, but also data store-specific; you can set any name for a Primary Key column, there is no such rule like "key" or "value" in Kafka. To avoid runtime errors (= favor fail-fast approach before the data is read), Spark can use the metadata to assert (analyze) the schema specified by the user to confirm it or fail fast before reading the data. Best, Bartosz. On Mon, Mar 29, 2021 at 1:07 PM Jacek Laskowski <ja...@japila.pl> wrote: > Hi, > > I've been developing a data source with a source and sink for Spark > Structured Streaming. > > I've got a question about Source.getBatch [1]: > > def getBatch(start: Option[Offset], end: Offset): DataFrame > > getBatch returns a streaming DataFrame between the offsets so the idiom > (?) is to have a code as follows: > > val relation = new MyRelation(...)(sparkSession) > val plan = LogicalRelation(relation, isStreaming = true) > new Dataset[Row](sparkSession, plan, RowEncoder(schema)) > > Note the use of schema [2] that is another part of the Source abstraction: > > def schema: StructType > > This is the "source" of my question. Is the above OK in a streaming sink / > Source.getBatch? > > Since there are no interim operators that could change attributes (schema) > I think it's OK. > > I've seen the following code and that made me wonder whether it's better > or not compared to the solution above: > > val relation = new MyRelation(...)(sparkSession) > val plan = LogicalRelation(relation, isStreaming = true) > > // When would we have to execute plan? > val qe = sparkSession.sessionState.executePlan(plan) > new Dataset[Row](sparkSession, plan, RowEncoder(qe.analyzed.schema)) > > When would or do we have to use qe.analyzed.schema vs simply schema? Could > this qe.analyzed.schema help avoid some edge cases and is a preferred > approach? > > Thank you for any help you can offer. Much appreciated. > > [1] > https://github.com/apache/spark/blob/053dd858d38e6107bc71e0aa3a4954291b74f8c8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala#L61 > [2] > https://github.com/apache/spark/blob/053dd858d38e6107bc71e0aa3a4954291b74f8c8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala#L35 > > Pozdrawiam, > Jacek Laskowski > ---- > https://about.me/JacekLaskowski > "The Internals Of" Online Books <https://books.japila.pl/> > Follow me on https://twitter.com/jaceklaskowski > > <https://twitter.com/jaceklaskowski> > -- Bartosz Konieczny data engineer https://www.waitingforcode.com https://github.com/bartosz25/ https://twitter.com/waitingforcode