Hi Amit; �
Thank you for your prompt reply and kind help. Wonder how to set the scheduler to FAIR mode in python. Following code seems to me does not work out. � conf = SparkConf().setMaster("local").setAppName("HSMSTest1") sc = SparkContext(conf=conf) sc.setLocalProperty('spark.scheduler.mode', 'FAIR') spark = SparkSession.builder.appName("HSMSStructedStreaming1").getOrCreate() � by the way, as I am using nc -lk 9999 to input the stream, will it cause by the reason as the input stream can only be consumed by one query as mentioned in below post as; � https://stackoverflow.com/questions/45618489/executing-separate-streaming-queries-in-spark-structured-streaming � appreciate your further help/support. � Best Regards, � Jian Xu � From: Amit Joshi <mailtojoshia...@gmail.com> Sent: Friday, May 21, 2021 12:52 PM To: jia...@xtronica.no Cc: user@spark.apache.org Subject: Re: multiple query with structured streaming in spark does not work � Hi Jian, � You have to use same spark session to run all the queries. And use the following to wait for termination. � q1 = writestream.start q2 = writstream2.start spark.streams.awaitAnyTermination � And also set the scheduler in the spark config to FAIR scheduler. � � Regards Amit Joshi � On Saturday, May 22, 2021, <jia...@xtronica.no <mailto:jia...@xtronica.no> > wrote: Hi There; � I am new to spark. We are using spark to develop our app for data streaming with sensor readings. � I am having trouble to get two queries with structured streaming working concurrently. � Following is the code. It can only work with one of them. Wonder if there is any way to get it doing. Appreciate help from the team. � Regards, � Jian Xu � � hostName = 'localhost' portNumber= 9999 wSize= '10 seconds' sSize ='2 seconds' � def wnq_fb_func(batch_df, batch_id): � � � print("batch is processed from time:{}".format(datetime.now())) � � � print(batch_df.collect()) � � � batch_df.show(10,False,False) � � � lines = spark.readStream.format('socket').option('host', hostName).option('port', portNumber).option('includeTimestamp', True).load() � nSensors=3 � scols = split(lines.value, ',').cast(ArrayType(FloatType())) sensorCols = [] for i in range(nSensors): � � � sensorCols.append(scols.getItem(i).alias('sensor'+ str(i))) � � � nlines=lines.select(lines.timestamp,lines.value, *sensorCols) nlines.printSchema() � wnlines =nlines.select(window(nlines.timestamp, wSize, sSize).alias('TimeWindow'), *lines.columns) wnquery= wnlines.writeStream.trigger(processingTime=sSize)\ .outputMode('append').foreachBatch(wnq_fb_func).start() � nquery=nlines.writeStream.outputMode('append').format('console').start() nquery.awaitTermination() wnquery.awaitTermination() � � �