I have about 100 fields in my dataset and some of them have "null" in it. Does to_json fails to convert if that is the case?
Thanks! On Tue, Sep 12, 2017 at 12:32 PM, kant kodali <kanth...@gmail.com> wrote: > Hi Michael, > > Interestingly that doesn't seem to quite work for me for some reason. Here > is what I have > > Datset > > name | id | country > ------------------------- > kant | 1 | usa > john | 2 | usa > > > And here is my code > > Dataset<Row> ds = getKafkaStream(); // This dataset represents the one above > StreamingQuery query = > ds.writeStream().trigger(Trigger.ProcessingTime(1000)).format("console").start(); > query.awaitTermination(); > > *This works completely fine and I can see the rows on my console.* > > Now if I change it to this. > > Dataset<Row> ds = getKafkaStream(); // This dataset represents the one above > Dataset<String> jsonDS = > ds.select(to_json(struct(ds.col("*")))).as(Encoders.STRING()); > StreamingQuery query2 = > jsonDS.writeStream().trigger(Trigger.ProcessingTime(1000)).format("console").start(); > query2.awaitTermination(); > > *I dont see any rows on my console and I made sure I waited for a while.* > > *The moment I change it back to above code and run it works again.* > > > > > > > > > > > > On Mon, Sep 11, 2017 at 2:28 PM, Michael Armbrust <mich...@databricks.com> > wrote: > >> The following will convert the whole row to JSON. >> >> import org.apache.spark.sql.functions.* >> df.select(to_json(struct(col("*")))) >> >> On Sat, Sep 9, 2017 at 6:27 PM, kant kodali <kanth...@gmail.com> wrote: >> >>> Thanks Ryan! In this case, I will have Dataset<Row> so is there a way to >>> convert Row to Json string? >>> >>> Thanks >>> >>> On Sat, Sep 9, 2017 at 5:14 PM, Shixiong(Ryan) Zhu < >>> shixi...@databricks.com> wrote: >>> >>>> It's because "toJSON" doesn't support Structured Streaming. The current >>>> implementation will convert the Dataset to an RDD, which is not supported >>>> by streaming queries. >>>> >>>> On Sat, Sep 9, 2017 at 4:40 PM, kant kodali <kanth...@gmail.com> wrote: >>>> >>>>> yes it is a streaming dataset. so what is the problem with following >>>>> code? >>>>> >>>>> Dataset<String> ds = dataset.toJSON().map(()->{some function that returns >>>>> a string}); >>>>> StreamingQuery query = ds.writeStream().start(); >>>>> query.awaitTermination(); >>>>> >>>>> >>>>> On Sat, Sep 9, 2017 at 4:20 PM, Felix Cheung < >>>>> felixcheun...@hotmail.com> wrote: >>>>> >>>>>> What is newDS? >>>>>> If it is a Streaming Dataset/DataFrame (since you have writeStream >>>>>> there) then there seems to be an issue preventing toJSON to work. >>>>>> >>>>>> ------------------------------ >>>>>> *From:* kant kodali <kanth...@gmail.com> >>>>>> *Sent:* Saturday, September 9, 2017 4:04:33 PM >>>>>> *To:* user @spark >>>>>> *Subject:* Queries with streaming sources must be executed with >>>>>> writeStream.start() >>>>>> >>>>>> Hi All, >>>>>> >>>>>> I have the following code and I am not sure what's wrong with it? I >>>>>> cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark >>>>>> 2.2.0 so I am wondering if there is any work around? >>>>>> >>>>>> Dataset<String> ds = newDS.toJSON().map(()->{some function that returns >>>>>> a string}); >>>>>> StreamingQuery query = ds.writeStream().start(); >>>>>> query.awaitTermination(); >>>>>> >>>>>> >>>>> >>>> >>> >> >