Hi, What about a custom streaming Sink that would stop the query after addBatch has been called?
Pozdrawiam, Jacek Laskowski ---- https://about.me/JacekLaskowski Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Mon, Dec 11, 2017 at 9:15 AM, satyajit vegesna < satyajit.apas...@gmail.com> wrote: > Hi Jacek, > > For now , i am using Thread.sleep() on driver, to make sure my streaming > query receives some data and and stop it, before the control reaches > querying memory table. > Let me know if there is any better way of handling it. > > Regards, > Satyajit. > > On Sun, Dec 10, 2017 at 10:43 PM, satyajit vegesna < > satyajit.apas...@gmail.com> wrote: > >> Hi Jacek, >> >> Thank you for responding back, >> >> i have tried memory sink, and below is what i did >> >> val fetchValue = debeziumRecords.selectExpr("value").withColumn("tableName", >> functions.get_json_object($"value".cast(StringType), "$.schema.name")) >> .withColumn("operation", >> functions.get_json_object($"value".cast(StringType), >> "$.payload.op")) >> .withColumn("payloadAfterValue", >> split(substring_index(debeziumRecords("value"), >> "\"after\":" ,-1),",\"source\"").getItem(0)) >> .drop("tableName").drop("operation").drop("value").as[String].writeStream >> >> .outputMode(OutputMode.Append()) >> .queryName("record") >> .format("memory") >> .start() >> >> spark.sql("select * from record").show(truncate = false) //i was >> expecting to be able to use the record table to read the JSON string, but >> the table is empty for the first call. And i do not see any dataframe >> output after the first one >> >> *But yeah the above steps work good and i can do things that i need to, >> in spark-shell, the problem is when i try to code in Intellij, because the >> streaming query keeps running and i am not sure how to identify and stop >> the streaming query and use record memory table.* >> >> So i would like to stop the streaming query once i know i have some data >> in my record memory table(is there a way to do that), so i can stop the >> streaming query and use the memory table, fetch my record. >> Any help on how to approach the situation programmatically/any examples >> pointed would highly be appreciated. >> >> Regards, >> Satyajit. >> >> >> >> On Sun, Dec 10, 2017 at 9:52 PM, Jacek Laskowski <ja...@japila.pl> wrote: >> >>> Hi, >>> >>> What about memory sink? That could work. >>> >>> Pozdrawiam, >>> Jacek Laskowski >>> ---- >>> https://about.me/JacekLaskowski >>> Spark Structured Streaming https://bit.ly/spark-structured-streaming >>> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark >>> Follow me at https://twitter.com/jaceklaskowski >>> >>> On Mon, Dec 11, 2017 at 3:28 AM, satyajit vegesna < >>> satyajit.apas...@gmail.com> wrote: >>> >>>> Hi All, >>>> >>>> I would like to infer JSON schema from a sample of data that i receive >>>> from, Kafka Streams(specific topic), and i have to infer the schema as i am >>>> going to receive random JSON string with different schema for each topic, >>>> so i chose to go ahead with below steps, >>>> >>>> a. readStream from Kafka(latest offset), from a single Kafka topic. >>>> b. Some how to store the JSON string into val and infer the schema. >>>> c. stop the stream. >>>> d.Create new readStream(smallest offset) and use the above inferred >>>> schema to process the JSON using spark provided JSON support, like >>>> from_json, json_object and others and run my actuall business logic. >>>> >>>> Now i am not sure how to be successful with step(b). Any help would be >>>> appreciated. >>>> And would also like to know if there is any better approach. >>>> >>>> Regards, >>>> Satyajit. >>>> >>> >>> >> >