Hi Michael,

Interestingly that doesn't seem to quite work for me for some reason. Here
is what I have

Datset

name | id | country
-------------------------
kant   | 1  | usa
john   | 2  | usa


And here is my code

Dataset<Row> ds = getKafkaStream(); // This dataset represents the one above
StreamingQuery query =
ds.writeStream().trigger(Trigger.ProcessingTime(1000)).format("console").start();
query.awaitTermination();

*This works completely fine and I can see the rows on my console.*

Now if I change it to this.

Dataset<Row> ds = getKafkaStream(); // This dataset represents the one above
Dataset<String> jsonDS =
ds.select(to_json(struct(ds.col("*")))).as(Encoders.STRING());
StreamingQuery query2 =
jsonDS.writeStream().trigger(Trigger.ProcessingTime(1000)).format("console").start();
query2.awaitTermination();

*I dont see any rows on my console and I made sure I waited for a while.*

*The moment I change it back to above code and run it works again.*











On Mon, Sep 11, 2017 at 2:28 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> The following will convert the whole row to JSON.
>
> import org.apache.spark.sql.functions.*
> df.select(to_json(struct(col("*"))))
>
> On Sat, Sep 9, 2017 at 6:27 PM, kant kodali <kanth...@gmail.com> wrote:
>
>> Thanks Ryan! In this case, I will have Dataset<Row> so is there a way to
>> convert Row to Json string?
>>
>> Thanks
>>
>> On Sat, Sep 9, 2017 at 5:14 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> It's because "toJSON" doesn't support Structured Streaming. The current
>>> implementation will convert the Dataset to an RDD, which is not supported
>>> by streaming queries.
>>>
>>> On Sat, Sep 9, 2017 at 4:40 PM, kant kodali <kanth...@gmail.com> wrote:
>>>
>>>> yes it is a streaming dataset. so what is the problem with following
>>>> code?
>>>>
>>>> Dataset<String> ds = dataset.toJSON().map(()->{some function that returns 
>>>> a string});
>>>>  StreamingQuery query = ds.writeStream().start();
>>>>  query.awaitTermination();
>>>>
>>>>
>>>> On Sat, Sep 9, 2017 at 4:20 PM, Felix Cheung <felixcheun...@hotmail.com
>>>> > wrote:
>>>>
>>>>> What is newDS?
>>>>> If it is a Streaming Dataset/DataFrame (since you have writeStream
>>>>> there) then there seems to be an issue preventing toJSON to work.
>>>>>
>>>>> ------------------------------
>>>>> *From:* kant kodali <kanth...@gmail.com>
>>>>> *Sent:* Saturday, September 9, 2017 4:04:33 PM
>>>>> *To:* user @spark
>>>>> *Subject:* Queries with streaming sources must be executed with
>>>>> writeStream.start()
>>>>>
>>>>> Hi All,
>>>>>
>>>>> I  have the following code and I am not sure what's wrong with it? I
>>>>> cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark
>>>>> 2.2.0 so I am wondering if there is any work around?
>>>>>
>>>>>  Dataset<String> ds = newDS.toJSON().map(()->{some function that returns 
>>>>> a string});
>>>>>  StreamingQuery query = ds.writeStream().start();
>>>>>  query.awaitTermination();
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to