Hi Amit;

 �

Further to my last email, I managed to set the scheduler to fair via code conf 
= 
SparkConf().setMaster("local").setAppName("HSMSTest1").set("spark.scheduler.mode",
 "FAIR")

 �

I can see the mode is changed in web view. Though the result is same. This does 
not work out. And it might be the reason as stated in the post. My question is 
how to use socket to carry multiple queries originated with same input 
streaming. Or it is not applicable with socket streaming mode at all.

 �

Regards,

 �

Jian Xu

 �

From: jia...@xtronica.no <jia...@xtronica.no> 
Sent: Friday, May 21, 2021 5:10 PM
To: 'Amit Joshi' <mailtojoshia...@gmail.com>
Cc: user@spark.apache.org
Subject: RE: multiple query with structured streaming in spark does not work 

 �

Hi Amit;

 �

Thank you for your prompt reply and kind help. Wonder how to set the scheduler 
to FAIR mode in python. Following code seems to me does not work out.

 �

conf = SparkConf().setMaster("local").setAppName("HSMSTest1")

sc = SparkContext(conf=conf)

sc.setLocalProperty('spark.scheduler.mode', 'FAIR')

spark = SparkSession.builder.appName("HSMSStructedStreaming1").getOrCreate()

 �

by the way, as I am using nc -lk 9999 to input the stream, will it cause by the 
reason as the input stream can only be consumed by one query as mentioned in 
below post as;

 �

https://stackoverflow.com/questions/45618489/executing-separate-streaming-queries-in-spark-structured-streaming

 �

appreciate your further help/support.

 �

Best Regards,

 �

Jian Xu

 �

From: Amit Joshi <mailtojoshia...@gmail.com <mailto:mailtojoshia...@gmail.com> 
> 
Sent: Friday, May 21, 2021 12:52 PM
To: jia...@xtronica.no <mailto:jia...@xtronica.no> 
Cc: user@spark.apache.org <mailto:user@spark.apache.org> 
Subject: Re: multiple query with structured streaming in spark does not work

 �

Hi Jian,

 �

You have to use same spark session to run all the queries.

And use the following to wait for termination.

 �

q1 = writestream.start

q2 = writstream2.start

spark.streams.awaitAnyTermination

 �

And also set the scheduler in the spark config to FAIR scheduler.

 �

 �

Regards

Amit Joshi

 �



On Saturday, May 22, 2021, <jia...@xtronica.no <mailto:jia...@xtronica.no> > 
wrote:

Hi There;

 �

I am new to spark. We are using spark to develop our app for data streaming 
with sensor readings. 

 �

I am having trouble to get two queries with structured streaming working 
concurrently.

 �

Following is the code. It can only work with one of them. Wonder if there is 
any way to get it doing. Appreciate help from the team.

 �

Regards,

 �

Jian Xu

 �

 �

hostName = 'localhost'

portNumber= 9999

wSize= '10 seconds' 

sSize ='2 seconds'

 �

def wnq_fb_func(batch_df, batch_id):

 � � � print("batch is processed from time:{}".format(datetime.now()))

 � � � print(batch_df.collect())

 � � � batch_df.show(10,False,False)

 � � � 

lines = spark.readStream.format('socket').option('host', 
hostName).option('port', portNumber).option('includeTimestamp', True).load()

 �

nSensors=3

 �

scols = split(lines.value, ',').cast(ArrayType(FloatType()))

sensorCols = []

for i in range(nSensors):

 � � � sensorCols.append(scols.getItem(i).alias('sensor'+ str(i)))

 � � � 

nlines=lines.select(lines.timestamp,lines.value, *sensorCols)

nlines.printSchema()

 �

wnlines =nlines.select(window(nlines.timestamp, wSize, 
sSize).alias('TimeWindow'), *lines.columns)

wnquery= wnlines.writeStream.trigger(processingTime=sSize)\

.outputMode('append').foreachBatch(wnq_fb_func).start()

 �

nquery=nlines.writeStream.outputMode('append').format('console').start()

nquery.awaitTermination()

wnquery.awaitTermination()

 �

 �

 �

Reply via email to