That is not the write way to use watermark + append output mode. The `withWatermark` must be before the aggregation. Something like this.
df.withWatermark("timestamp", "1 hour") .groupBy(window("timestamp", "30 seconds")) .agg(...) Read more here - https://databricks.com/blog/2017/05/08/event-time-aggregation-watermarking-apache-sparks-structured-streaming.html On Mon, Jun 19, 2017 at 7:27 PM, kaniska Mandal <kaniska.man...@gmail.com> wrote: > Hi Burak, > > Per your suggestion, I have specified > > deviceBasicAgg.withWatermark("eventtime", "30 seconds"); > before invoking deviceBasicAgg.writeStream()... > > But I am still facing ~ > > org.apache.spark.sql.AnalysisException: Append output mode not supported > when there are streaming aggregations on streaming DataFrames/DataSets; > > I am Ok with 'complete' output mode. > > ================================================= > > I tried another approach - Creating parquet file from the in-memory > dataset ~ which seems to work. > > But I need only the delta, not the cumulative count. Since 'append' mode > not supporting streaming Aggregation, I have to use 'complete' outputMode. > > StreamingQuery streamingQry = deviceBasicAgg.writeStream() > > .format("memory") > > .trigger(ProcessingTime.create("5 seconds")) > > .queryName("deviceBasicAggSummary") > > .outputMode("complete") > > .option("checkpointLocation", "/tmp/parquet/checkpoints/") > > .start(); > > streamingQry.awaitTermination(); > > Thread.sleep(5000); > > while(true) { > > Dataset<Row> deviceBasicAggSummaryData = spark.table(" > deviceBasicAggSummary"); > > deviceBasicAggSummaryData.toDF().write().parquet("/data/summary/devices/"+ > new Date().getTime()+"/"); > > } > > ================================================= > > So whats the best practice for 'low latency query on distributed data' > using Spark SQL and Structured Streaming ? > > > Thanks > > Kaniska > > > > On Mon, Jun 19, 2017 at 11:55 AM, Burak Yavuz <brk...@gmail.com> wrote: > >> Hi Kaniska, >> >> In order to use append mode with aggregations, you need to set an event >> time watermark (using `withWatermark`). Otherwise, Spark doesn't know when >> to output an aggregation result as "final". >> >> Best, >> Burak >> >> On Mon, Jun 19, 2017 at 11:03 AM, kaniska Mandal < >> kaniska.man...@gmail.com> wrote: >> >>> Hi, >>> >>> My goal is to ~ >>> (1) either chain streaming aggregations in a single query OR >>> (2) run multiple streaming aggregations and save data in some meaningful >>> format to execute low latency / failsafe OLAP queries >>> >>> So my first choice is parquet format , but I failed to make it work ! >>> >>> I am using spark-streaming_2.11-2.1.1 >>> >>> I am facing the following error - >>> org.apache.spark.sql.AnalysisException: Append output mode not >>> supported when there are streaming aggregations on streaming >>> DataFrames/DataSets; >>> >>> - for the following syntax >>> >>> StreamingQuery streamingQry = tagBasicAgg.writeStream() >>> >>> .format("parquet") >>> >>> .trigger(ProcessingTime.create("10 seconds")) >>> >>> .queryName("tagAggSummary") >>> >>> .outputMode("append") >>> >>> .option("checkpointLocation", "/tmp/summary/checkpoints/") >>> >>> .option("path", "/data/summary/tags/") >>> >>> .start(); >>> But, parquet doesn't support 'complete' outputMode. >>> >>> So is parquet supported only for batch queries , NOT for streaming >>> queries ? >>> >>> - note that console outputmode working fine ! >>> >>> Any help will be much appreciated. >>> >>> Thanks >>> Kaniska >>> >>> >> >