Hi Burak,

Per your suggestion, I have specified
> deviceBasicAgg.withWatermark("eventtime", "30 seconds");
before invoking deviceBasicAgg.writeStream()...

But I am still facing ~

org.apache.spark.sql.AnalysisException: Append output mode not supported
when there are streaming aggregations on streaming DataFrames/DataSets;

I am Ok with 'complete' output mode.

=================================================

I tried another approach - Creating parquet file from the in-memory dataset
~ which seems to work.

But I need only the delta, not the cumulative count. Since 'append' mode
not supporting streaming Aggregation, I have to use 'complete' outputMode.

StreamingQuery streamingQry = deviceBasicAgg.writeStream()

              .format("memory")

              .trigger(ProcessingTime.create("5 seconds"))

              .queryName("deviceBasicAggSummary")

              .outputMode("complete")

              .option("checkpointLocation", "/tmp/parquet/checkpoints/")

              .start();

streamingQry.awaitTermination();

Thread.sleep(5000);

while(true) {

Dataset<Row> deviceBasicAggSummaryData = spark.table("deviceBasicAggSummary"
);

deviceBasicAggSummaryData.toDF().write().parquet("/data/summary/devices/"+
new Date().getTime()+"/");

}

=================================================

So whats the best practice for 'low latency query on distributed data'
using Spark SQL and Structured Streaming ?


Thanks

Kaniska



On Mon, Jun 19, 2017 at 11:55 AM, Burak Yavuz <brk...@gmail.com> wrote:

> Hi Kaniska,
>
> In order to use append mode with aggregations, you need to set an event
> time watermark (using `withWatermark`). Otherwise, Spark doesn't know when
> to output an aggregation result as "final".
>
> Best,
> Burak
>
> On Mon, Jun 19, 2017 at 11:03 AM, kaniska Mandal <kaniska.man...@gmail.com
> > wrote:
>
>> Hi,
>>
>> My goal is to ~
>> (1) either chain streaming aggregations in a single query OR
>> (2) run multiple streaming aggregations and save data in some meaningful
>> format to execute low latency / failsafe OLAP queries
>>
>> So my first choice is parquet format , but I failed to make it work !
>>
>> I am using spark-streaming_2.11-2.1.1
>>
>> I am facing the following error -
>> org.apache.spark.sql.AnalysisException: Append output mode not supported
>> when there are streaming aggregations on streaming DataFrames/DataSets;
>>
>> - for the following syntax
>>
>>  StreamingQuery streamingQry = tagBasicAgg.writeStream()
>>
>>               .format("parquet")
>>
>>               .trigger(ProcessingTime.create("10 seconds"))
>>
>>               .queryName("tagAggSummary")
>>
>>               .outputMode("append")
>>
>>               .option("checkpointLocation", "/tmp/summary/checkpoints/")
>>
>>               .option("path", "/data/summary/tags/")
>>
>>               .start();
>> But, parquet doesn't support 'complete' outputMode.
>>
>> So is parquet supported only for batch queries , NOT for streaming
>> queries ?
>>
>> - note that console outputmode working fine !
>>
>> Any help will be much appreciated.
>>
>> Thanks
>> Kaniska
>>
>>
>

Reply via email to