Hi There;

 �

I am new to spark. We are using spark to develop our app for data streaming 
with sensor readings. 

 �

I am having trouble to get two queries with structured streaming working 
concurrently.

 �

Following is the code. It can only work with one of them. Wonder if there is 
any way to get it doing. Appreciate help from the team.

 �

Regards,

 �

Jian Xu

 �

 �

hostName = 'localhost'

portNumber= 9999

wSize= '10 seconds' 

sSize ='2 seconds'

 �

def wnq_fb_func(batch_df, batch_id):

    print("batch is processed from time:{}".format(datetime.now()))

    print(batch_df.collect())

    batch_df.show(10,False,False)

    

lines = spark.readStream.format('socket').option('host', 
hostName).option('port', portNumber).option('includeTimestamp', True).load()

 �

nSensors=3

 �

scols = split(lines.value, ',').cast(ArrayType(FloatType()))

sensorCols = []

for i in range(nSensors):

    sensorCols.append(scols.getItem(i).alias('sensor'+ str(i)))

    

nlines=lines.select(lines.timestamp,lines.value, *sensorCols)

nlines.printSchema()

 �

wnlines =nlines.select(window(nlines.timestamp, wSize, 
sSize).alias('TimeWindow'), *lines.columns)

wnquery= wnlines.writeStream.trigger(processingTime=sSize)\

.outputMode('append').foreachBatch(wnq_fb_func).start()

 �

nquery=nlines.writeStream.outputMode('append').format('console').start()

nquery.awaitTermination()

wnquery.awaitTermination()

 �

 �

 �

Reply via email to