Hi Mich,

thnx, seems 'complete' mode is supported only if there are streaming
aggregations.
I get this error on changing the output mode.

pyspark.sql.utils.AnalysisException: Complete output mode not supported
when there are no streaming aggregations on streaming DataFrames/Datasets;

Project [value#8, topic#9, partition#10, timestamp#12]

On Tue, Feb 1, 2022 at 4:05 PM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> hm.
>
> I am trying to recall if I am correct  so you should try
> outpudeMode('complete') with format('console')
>
>             result = resultMF. \
>                      writeStream. \
>                      outputMode('complete'). \
>                      option("numRows", 1000). \
>                      option("truncate", "false"). \
>                      format('console'). \
>                      option('checkpointLocation', checkpoint_path). \
>                      queryName("temperature"). \
>                      start()
>
> On another example I have
>
>            result = streamingDataFrame.select( \
>                      col("parsed_value.rowkey").alias("rowkey") \
>                    , col("parsed_value.timeissued").alias("timeissued") \
>                    ,
> col("parsed_value.temperature").alias("temperature")). \
>                      writeStream. \
>                      outputMode('append'). \
>                      option("truncate", "false"). \
>                      foreachBatch(temperatures). \
>                      trigger(processingTime='60 seconds'). \
>                      option('checkpointLocation', checkpoint_path). \
>                      queryName("temperature"). \
>                      start()
>
> def temperatures(df, batchId):
>     if(len(df.take(1))) > 0:
>         df.show(100,False)
>         df. persist()
>         AvgTemp =
> df.select(round(F.avg(col("temperature")))).collect()[0][0]
>         df.unpersist()
>         now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
>         print(f"""Average temperature at {now} from batchId {batchId} is
> {AvgTemp} degrees""")
>     else:
>         print("DataFrame s empty")
>
> HTH
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 1 Feb 2022 at 23:45, karan alang <karan.al...@gmail.com> wrote:
>
>> Hello Spark Experts,
>>
>> I've a simple Structured Streaming program, which reads data from Kafka,
>> and writes on the console. This is working in batch mode (i.e spark.read or
>> df.write), not not working in streaming mode.
>>
>> Details are in the stackoverflow
>>
>>
>> https://stackoverflow.com/questions/70948967/structured-streaming-not-writing-records-to-console-when-using-writestream-ba
>>
>> Any inputs on how to fix/debug this ?
>> tia !
>>
>

Reply via email to