And also is query.stop() is graceful stop operation?what happens to already
received data will it be processed ?
On Tue, Aug 15, 2017 at 7:21 PM purna pradeep
wrote:
> Ok thanks
>
> Few more
>
> 1.when I looked into the documentation it says onQueryprogress is not
> threadsafe ,So Is this method
Ok thanks
Few more
1.when I looked into the documentation it says onQueryprogress is not
threadsafe ,So Is this method would be the right place to refresh cache?and
no need to restart query if I choose listener ?
The methods are not thread-safe as they may be called from different
threads.
htt
Both works. The asynchronous method with listener will have less of down
time, just that the first trigger/batch after the asynchronous
unpersist+persist will probably take longer as it has to reload the data.
On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep
wrote:
> Thanks tathagata das actually
Thanks tathagata das actually I'm planning to something like this
activeQuery.stop()
//unpersist and persist cached data frame
df.unpersist()
//read the updated data //data size of df is around 100gb
df.persist()
activeQuery = startQuery()
the cached data frame size around 100gb ,so th
You can do something like this.
def startQuery(): StreamingQuery = {
// create your streaming dataframes
// start the query with the same checkpoint directory}
// handle to the active queryvar activeQuery: StreamingQuery = null
while(!stopped) {
if (activeQuery = null) { // if quer
Thanks Michael
I guess my question is little confusing ..let me try again
I would like to restart streaming query programmatically while my streaming
application is running based on a condition and why I want to do this
I want to refresh a cached data frame based on a condition and the best way
See
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing
Though I think that this currently doesn't work with the console sink.
On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep
wrote:
> Hi,
>
>>
>> I'm trying to restart a str
Hi,
>
> I'm trying to restart a streaming query to refresh cached data frame
>
> Where and how should I restart streaming query
>
val sparkSes = SparkSession
.builder
.config("spark.master", "local")
.appName("StreamingCahcePoc")
.getOrCreate()
import sparkSes.