Hi Jian,

I found this link that could be useful.
https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application

By the way you can try once giving enough resources to run both jobs
without defining the scheduler.
I mean run the queries with default scheduler, but provide enough memory in
the spark cluster to run both.


Regards
Amit Joshi



On Sat, May 22, 2021 at 5:41 AM <jia...@xtronica.no> wrote:

> Hi Amit;
>
>
>
> Thank you for your prompt reply and kind help. Wonder how to set the
> scheduler to FAIR mode in python. Following code seems to me does not work
> out.
>
>
>
> conf = SparkConf().setMaster("local").setAppName("HSMSTest1")
>
> sc = SparkContext(conf=conf)
>
> sc.setLocalProperty('spark.scheduler.mode', 'FAIR')
>
> spark =
> SparkSession.builder.appName("HSMSStructedStreaming1").getOrCreate()
>
>
>
> by the way, as I am using nc -lk 9999 to input the stream, will it cause
> by the reason as the input stream can only be consumed by one query as
> mentioned in below post as;
>
>
>
>
> https://stackoverflow.com/questions/45618489/executing-separate-streaming-queries-in-spark-structured-streaming
>
>
>
> appreciate your further help/support.
>
>
>
> Best Regards,
>
>
>
> Jian Xu
>
>
>
> *From:* Amit Joshi <mailtojoshia...@gmail.com>
> *Sent:* Friday, May 21, 2021 12:52 PM
> *To:* jia...@xtronica.no
> *Cc:* user@spark.apache.org
> *Subject:* Re: multiple query with structured streaming in spark does not
> work
>
>
>
> Hi Jian,
>
>
>
> You have to use same spark session to run all the queries.
>
> And use the following to wait for termination.
>
>
>
> q1 = writestream.start
>
> q2 = writstream2.start
>
> spark.streams.awaitAnyTermination
>
>
>
> And also set the scheduler in the spark config to FAIR scheduler.
>
>
>
>
>
> Regards
>
> Amit Joshi
>
>
>
>
>
> On Saturday, May 22, 2021, <jia...@xtronica.no> wrote:
>
> Hi There;
>
>
>
> I am new to spark. We are using spark to develop our app for data
> streaming with sensor readings.
>
>
>
> I am having trouble to get two queries with structured streaming working
> concurrently.
>
>
>
> Following is the code. It can only work with one of them. Wonder if there
> is any way to get it doing. Appreciate help from the team.
>
>
>
> Regards,
>
>
>
> Jian Xu
>
>
>
>
>
> hostName = 'localhost'
>
> portNumber= 9999
>
> wSize= '10 seconds'
>
> sSize ='2 seconds'
>
>
>
> def wnq_fb_func(batch_df, batch_id):
>
>     print("batch is processed from time:{}".format(datetime.now()))
>
>     print(batch_df.collect())
>
>     batch_df.show(10,False,False)
>
>
>
> lines = spark.readStream.format('socket').option('host',
> hostName).option('port', portNumber).option('includeTimestamp', True).load()
>
>
>
> nSensors=3
>
>
>
> scols = split(lines.value, ',').cast(ArrayType(FloatType()))
>
> sensorCols = []
>
> for i in range(nSensors):
>
>     sensorCols.append(scols.getItem(i).alias('sensor'+ str(i)))
>
>
>
> nlines=lines.select(lines.timestamp,lines.value, *sensorCols)
>
> nlines.printSchema()
>
>
>
> wnlines =nlines.select(window(nlines.timestamp, wSize,
> sSize).alias('TimeWindow'), *lines.columns)
>
> wnquery= wnlines.writeStream.trigger(processingTime=sSize)\
>
> .outputMode('append').foreachBatch(wnq_fb_func).start()
>
>
>
> nquery=nlines.writeStream.outputMode('append').format('console').start()
>
> nquery.awaitTermination()
>
> wnquery.awaitTermination()
>
>
>
>
>
>
>
>

Reply via email to