There isn't a query per se.im writing the entire dataframe from the output of the read stream. Once I got that working I was planning to test the query aspect
I'll do a bit more digging. Thank you very much for your help. Structued streaming is very exciting and I really am enjoying writing a connector for it! Regards Sam On Thu, 2 Feb 2017 at 00:02, Tathagata Das <tathagata.das1...@gmail.com> wrote: > What is the query you are apply writeStream on? Essentially can you print > the whole query. > > Also, you can do StreamingQuery.explain() to see in full details how the > logical plan changes to physical plan, for a batch of data. that might > help. try doing that with some other sink to make sure the source works > correctly, and then try using your sink. > > If you want further debugging, then you will have to dig into the > StreamingExecution class in Spark, and debug stuff there. > > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L523 > > On Wed, Feb 1, 2017 at 3:49 PM, Sam Elamin <hussam.ela...@gmail.com> > wrote: > > Yeah sorry Im still working on it, its on a branch you can find here > <https://github.com/samelamin/spark-bigquery/blob/Stream_reading/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySource.scala>, > ignore the logging messages I was trying to workout how the APIs work and > unfortunately because I have to shade the dependency I cant debug it in an > IDE (that I know of! ) > > So I can see the correct schema here > <https://github.com/samelamin/spark-bigquery/blob/Stream_reading/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySource.scala#L64> > and > also when the df is returned(After .load() ) > > But when that same df has writeStream applied to it, the addBatch > dataframe has a new schema. Its similar to the old schema but some ints > have been turned to strings. > > > > On Wed, Feb 1, 2017 at 11:40 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > > I am assuming that you have written your own BigQuerySource (i dont see > that code in the link you posted). In that source, you must have > implemented getBatch which uses offsets to return the Dataframe having the > data of a batch. Can you double check when this DataFrame returned by > getBatch, has the expected schema? > > On Wed, Feb 1, 2017 at 3:33 PM, Sam Elamin <hussam.ela...@gmail.com> > wrote: > > Thanks for the quick response TD! > > Ive been trying to identify where exactly this transformation happens > > The readStream returns a dataframe with the correct schema > > The minute I call writeStream, by the time I get to the addBatch method, > the dataframe there has an incorrect Schema > > So Im skeptical about the issue being prior to the readStream since the > output dataframe has the correct Schema > > > Am I missing something completely obvious? > > Regards > Sam > > On Wed, Feb 1, 2017 at 11:29 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > > You should make sure that schema of the streaming Dataset returned by > `readStream`, and the schema of the DataFrame returned by the sources > getBatch. > > On Wed, Feb 1, 2017 at 3:25 PM, Sam Elamin <hussam.ela...@gmail.com> > wrote: > > Hi All > > I am writing a bigquery connector here > <http://github.com/samelamin/spark-bigquery> and I am getting a strange > error with schemas being overwritten when a dataframe is passed over to the > Sink > > > for example the source returns this StructType > WARN streaming.BigQuerySource: > StructType(StructField(customerid,LongType,true), > > and the sink is recieving this StructType > WARN streaming.BigQuerySink: > StructType(StructField(customerid,StringType,true) > > > Any idea why this might be happening? > I dont have infering schema on > > spark.conf.set("spark.sql.streaming.schemaInference", "false") > > I know its off by default but I set it just to be sure > > So completely lost to what could be causing this > > Regards > Sam > > > > > > >