That is not the write way to use watermark + append output mode. The
`withWatermark` must be before the aggregation. Something like this.

df.withWatermark("timestamp", "1 hour")
  .groupBy(window("timestamp", "30 seconds"))
  .agg(...)

Read more here -
https://databricks.com/blog/2017/05/08/event-time-aggregation-watermarking-apache-sparks-structured-streaming.html


On Mon, Jun 19, 2017 at 7:27 PM, kaniska Mandal <kaniska.man...@gmail.com>
wrote:

> Hi Burak,
>
> Per your suggestion, I have specified
> > deviceBasicAgg.withWatermark("eventtime", "30 seconds");
> before invoking deviceBasicAgg.writeStream()...
>
> But I am still facing ~
>
> org.apache.spark.sql.AnalysisException: Append output mode not supported
> when there are streaming aggregations on streaming DataFrames/DataSets;
>
> I am Ok with 'complete' output mode.
>
> =================================================
>
> I tried another approach - Creating parquet file from the in-memory
> dataset ~ which seems to work.
>
> But I need only the delta, not the cumulative count. Since 'append' mode
> not supporting streaming Aggregation, I have to use 'complete' outputMode.
>
> StreamingQuery streamingQry = deviceBasicAgg.writeStream()
>
>               .format("memory")
>
>               .trigger(ProcessingTime.create("5 seconds"))
>
>               .queryName("deviceBasicAggSummary")
>
>               .outputMode("complete")
>
>               .option("checkpointLocation", "/tmp/parquet/checkpoints/")
>
>               .start();
>
> streamingQry.awaitTermination();
>
> Thread.sleep(5000);
>
> while(true) {
>
> Dataset<Row> deviceBasicAggSummaryData = spark.table("
> deviceBasicAggSummary");
>
> deviceBasicAggSummaryData.toDF().write().parquet("/data/summary/devices/"+
> new Date().getTime()+"/");
>
> }
>
> =================================================
>
> So whats the best practice for 'low latency query on distributed data'
> using Spark SQL and Structured Streaming ?
>
>
> Thanks
>
> Kaniska
>
>
>
> On Mon, Jun 19, 2017 at 11:55 AM, Burak Yavuz <brk...@gmail.com> wrote:
>
>> Hi Kaniska,
>>
>> In order to use append mode with aggregations, you need to set an event
>> time watermark (using `withWatermark`). Otherwise, Spark doesn't know when
>> to output an aggregation result as "final".
>>
>> Best,
>> Burak
>>
>> On Mon, Jun 19, 2017 at 11:03 AM, kaniska Mandal <
>> kaniska.man...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> My goal is to ~
>>> (1) either chain streaming aggregations in a single query OR
>>> (2) run multiple streaming aggregations and save data in some meaningful
>>> format to execute low latency / failsafe OLAP queries
>>>
>>> So my first choice is parquet format , but I failed to make it work !
>>>
>>> I am using spark-streaming_2.11-2.1.1
>>>
>>> I am facing the following error -
>>> org.apache.spark.sql.AnalysisException: Append output mode not
>>> supported when there are streaming aggregations on streaming
>>> DataFrames/DataSets;
>>>
>>> - for the following syntax
>>>
>>>  StreamingQuery streamingQry = tagBasicAgg.writeStream()
>>>
>>>               .format("parquet")
>>>
>>>               .trigger(ProcessingTime.create("10 seconds"))
>>>
>>>               .queryName("tagAggSummary")
>>>
>>>               .outputMode("append")
>>>
>>>               .option("checkpointLocation", "/tmp/summary/checkpoints/")
>>>
>>>               .option("path", "/data/summary/tags/")
>>>
>>>               .start();
>>> But, parquet doesn't support 'complete' outputMode.
>>>
>>> So is parquet supported only for batch queries , NOT for streaming
>>> queries ?
>>>
>>> - note that console outputmode working fine !
>>>
>>> Any help will be much appreciated.
>>>
>>> Thanks
>>> Kaniska
>>>
>>>
>>
>

Reply via email to