Hi Amit;
�
Thank you for your prompt reply and kind help. Wonder how to set the scheduler
to FAIR mode in python. Following code seems to me does not work out.
�
conf = SparkConf().setMaster("local").setAppName("HSMSTest1")
sc = SparkContext(conf=conf)
sc.setLocalProperty('spark.scheduler.mode', 'FAIR')
spark = SparkSession.builder.appName("HSMSStructedStreaming1").getOrCreate()
�
by the way, as I am using nc -lk 9999 to input the stream, will it cause by the
reason as the input stream can only be consumed by one query as mentioned in
below post as;
�
https://stackoverflow.com/questions/45618489/executing-separate-streaming-queries-in-spark-structured-streaming
�
appreciate your further help/support.
�
Best Regards,
�
Jian Xu
�
From: Amit Joshi <[email protected]>
Sent: Friday, May 21, 2021 12:52 PM
To: [email protected]
Cc: [email protected]
Subject: Re: multiple query with structured streaming in spark does not work
�
Hi Jian,
�
You have to use same spark session to run all the queries.
And use the following to wait for termination.
�
q1 = writestream.start
q2 = writstream2.start
spark.streams.awaitAnyTermination
�
And also set the scheduler in the spark config to FAIR scheduler.
�
�
Regards
Amit Joshi
�
On Saturday, May 22, 2021, <[email protected] <mailto:[email protected]> >
wrote:
Hi There;
�
I am new to spark. We are using spark to develop our app for data streaming
with sensor readings.
�
I am having trouble to get two queries with structured streaming working
concurrently.
�
Following is the code. It can only work with one of them. Wonder if there is
any way to get it doing. Appreciate help from the team.
�
Regards,
�
Jian Xu
�
�
hostName = 'localhost'
portNumber= 9999
wSize= '10 seconds'
sSize ='2 seconds'
�
def wnq_fb_func(batch_df, batch_id):
� � � print("batch is processed from time:{}".format(datetime.now()))
� � � print(batch_df.collect())
� � � batch_df.show(10,False,False)
� � �
lines = spark.readStream.format('socket').option('host',
hostName).option('port', portNumber).option('includeTimestamp', True).load()
�
nSensors=3
�
scols = split(lines.value, ',').cast(ArrayType(FloatType()))
sensorCols = []
for i in range(nSensors):
� � � sensorCols.append(scols.getItem(i).alias('sensor'+ str(i)))
� � �
nlines=lines.select(lines.timestamp,lines.value, *sensorCols)
nlines.printSchema()
�
wnlines =nlines.select(window(nlines.timestamp, wSize,
sSize).alias('TimeWindow'), *lines.columns)
wnquery= wnlines.writeStream.trigger(processingTime=sSize)\
.outputMode('append').foreachBatch(wnq_fb_func).start()
�
nquery=nlines.writeStream.outputMode('append').format('console').start()
nquery.awaitTermination()
wnquery.awaitTermination()
�
�
�