Hi Amit; �
Further to my last email, I managed to set the scheduler to fair via code conf = SparkConf().setMaster("local").setAppName("HSMSTest1").set("spark.scheduler.mode", "FAIR") � I can see the mode is changed in web view. Though the result is same. This does not work out. And it might be the reason as stated in the post. My question is how to use socket to carry multiple queries originated with same input streaming. Or it is not applicable with socket streaming mode at all. � Regards, � Jian Xu � From: jia...@xtronica.no <jia...@xtronica.no> Sent: Friday, May 21, 2021 5:10 PM To: 'Amit Joshi' <mailtojoshia...@gmail.com> Cc: user@spark.apache.org Subject: RE: multiple query with structured streaming in spark does not work � Hi Amit; � Thank you for your prompt reply and kind help. Wonder how to set the scheduler to FAIR mode in python. Following code seems to me does not work out. � conf = SparkConf().setMaster("local").setAppName("HSMSTest1") sc = SparkContext(conf=conf) sc.setLocalProperty('spark.scheduler.mode', 'FAIR') spark = SparkSession.builder.appName("HSMSStructedStreaming1").getOrCreate() � by the way, as I am using nc -lk 9999 to input the stream, will it cause by the reason as the input stream can only be consumed by one query as mentioned in below post as; � https://stackoverflow.com/questions/45618489/executing-separate-streaming-queries-in-spark-structured-streaming � appreciate your further help/support. � Best Regards, � Jian Xu � From: Amit Joshi <mailtojoshia...@gmail.com <mailto:mailtojoshia...@gmail.com> > Sent: Friday, May 21, 2021 12:52 PM To: jia...@xtronica.no <mailto:jia...@xtronica.no> Cc: user@spark.apache.org <mailto:user@spark.apache.org> Subject: Re: multiple query with structured streaming in spark does not work � Hi Jian, � You have to use same spark session to run all the queries. And use the following to wait for termination. � q1 = writestream.start q2 = writstream2.start spark.streams.awaitAnyTermination � And also set the scheduler in the spark config to FAIR scheduler. � � Regards Amit Joshi � On Saturday, May 22, 2021, <jia...@xtronica.no <mailto:jia...@xtronica.no> > wrote: Hi There; � I am new to spark. We are using spark to develop our app for data streaming with sensor readings. � I am having trouble to get two queries with structured streaming working concurrently. � Following is the code. It can only work with one of them. Wonder if there is any way to get it doing. Appreciate help from the team. � Regards, � Jian Xu � � hostName = 'localhost' portNumber= 9999 wSize= '10 seconds' sSize ='2 seconds' � def wnq_fb_func(batch_df, batch_id): � � � print("batch is processed from time:{}".format(datetime.now())) � � � print(batch_df.collect()) � � � batch_df.show(10,False,False) � � � lines = spark.readStream.format('socket').option('host', hostName).option('port', portNumber).option('includeTimestamp', True).load() � nSensors=3 � scols = split(lines.value, ',').cast(ArrayType(FloatType())) sensorCols = [] for i in range(nSensors): � � � sensorCols.append(scols.getItem(i).alias('sensor'+ str(i))) � � � nlines=lines.select(lines.timestamp,lines.value, *sensorCols) nlines.printSchema() � wnlines =nlines.select(window(nlines.timestamp, wSize, sSize).alias('TimeWindow'), *lines.columns) wnquery= wnlines.writeStream.trigger(processingTime=sSize)\ .outputMode('append').foreachBatch(wnq_fb_func).start() � nquery=nlines.writeStream.outputMode('append').format('console').start() nquery.awaitTermination() wnquery.awaitTermination() � � �