Hi,

What about a custom streaming Sink that would stop the query after addBatch
has been called?

Pozdrawiam,
Jacek Laskowski
----
https://about.me/JacekLaskowski
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

On Mon, Dec 11, 2017 at 9:15 AM, satyajit vegesna <
satyajit.apas...@gmail.com> wrote:

> Hi Jacek,
>
> For now , i am using Thread.sleep() on driver, to make sure my streaming
> query receives some data and and stop it, before the control reaches
> querying memory table.
> Let me know if there is any better way of handling it.
>
> Regards,
> Satyajit.
>
> On Sun, Dec 10, 2017 at 10:43 PM, satyajit vegesna <
> satyajit.apas...@gmail.com> wrote:
>
>> Hi Jacek,
>>
>> Thank you for responding back,
>>
>> i have tried memory sink, and below is what i did
>>
>>  val fetchValue = debeziumRecords.selectExpr("value").withColumn("tableName",
>> functions.get_json_object($"value".cast(StringType), "$.schema.name"))
>>     .withColumn("operation", 
>> functions.get_json_object($"value".cast(StringType),
>> "$.payload.op"))
>>     .withColumn("payloadAfterValue", 
>> split(substring_index(debeziumRecords("value"),
>> "\"after\":" ,-1),",\"source\"").getItem(0))
>>     .drop("tableName").drop("operation").drop("value").as[String].writeStream
>>
>>     .outputMode(OutputMode.Append())
>>     .queryName("record")
>>     .format("memory")
>>     .start()
>>
>> spark.sql("select * from record").show(truncate = false) //i was
>> expecting to be able to use the record table to read the JSON string, but
>> the table is empty for the first call. And i do not see any dataframe
>> output after the first one
>>
>> *But yeah the above steps work good and i can do things that i need to,
>> in spark-shell, the problem is when i try to code in Intellij, because the
>> streaming query keeps running and i am not sure how to identify and stop
>> the streaming query and use record memory table.*
>>
>> So i would like to stop the streaming query once i know i have some data
>> in my record memory table(is there a way to do that), so i can stop the
>> streaming query and use the memory table, fetch my record.
>> Any help on how to approach the situation programmatically/any examples
>> pointed would highly be appreciated.
>>
>> Regards,
>> Satyajit.
>>
>>
>>
>> On Sun, Dec 10, 2017 at 9:52 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>>
>>> Hi,
>>>
>>> What about memory sink? That could work.
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> ----
>>> https://about.me/JacekLaskowski
>>> Spark Structured Streaming https://bit.ly/spark-structured-streaming
>>> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
>>> Follow me at https://twitter.com/jaceklaskowski
>>>
>>> On Mon, Dec 11, 2017 at 3:28 AM, satyajit vegesna <
>>> satyajit.apas...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I would like to infer JSON schema from a sample of data that i receive
>>>> from, Kafka Streams(specific topic), and i have to infer the schema as i am
>>>> going to receive random JSON string with different schema for each topic,
>>>> so i chose to go ahead with below steps,
>>>>
>>>> a. readStream from Kafka(latest offset), from a single Kafka topic.
>>>> b. Some how to store the JSON string into val and infer the schema.
>>>> c. stop the stream.
>>>> d.Create new readStream(smallest offset) and use the above inferred
>>>> schema to process the JSON using spark provided JSON support, like
>>>> from_json, json_object and others and run my actuall business logic.
>>>>
>>>> Now i am not sure how to be successful with step(b). Any help would be
>>>> appreciated.
>>>> And would also like to know if there is any better approach.
>>>>
>>>> Regards,
>>>> Satyajit.
>>>>
>>>
>>>
>>
>

Reply via email to