Re: UpsertStreamTableSink for Aggregate Only Query

2020-06-08 Thread Jark Wu
Hi Satyam, I guess your destination database table doesn't have a primary key, right? If this is the case, I think maybe the upcoming 1.11 release with new sink interface (FLIP-95) can better resolve this. In the new sink interface: - the primary key is always defined on Flink SQL DDL - the plann

Re: UpsertStreamTableSink for Aggregate Only Query

2020-06-08 Thread Satyam Shekhar
Hi Jark, I wish to atomically update the destination with remove-insert. To pick that strategy, I need some "hint" from Flink that the output is a global aggregation with no grouping key, and that appends should overwrite the previous value. I am also exploring handling the issue in the upstream

Re: UpsertStreamTableSink for Aggregate Only Query

2020-06-07 Thread Jark Wu
Hi Satyam, Currently, `UpsertStreamTableSink` requires the query to contain a primary key, and the key will be set to `UpsertStreamTableSink#setKeyFields`. If there is no primary key in the query, an error will be thrown as you can see. It should work for all the group by queries (if no projectio

Re: UpsertStreamTableSink for Aggregate Only Query

2020-06-05 Thread Arvid Heise
Instead of changing the query, I used to embed the query in a larger context for similar works. So if you get an arbitrary query X which produces exactly one result (e.g. X = select sum(revenue) from lineorder group by 1) then you can craft a query where you add a dummy pk to the result. Table or

Re: UpsertStreamTableSink for Aggregate Only Query

2020-06-05 Thread Satyam Shekhar
Hey Arvid, Thanks for the reply. As you suggested, rewriting the query to add a dummy output and group by the clause - "select 1, sum(revenue) from lineorder group by 1" does add a unique key column to the output, and the pipeline succeeds. However, the application may get arbitrary SQL from the

Re: UpsertStreamTableSink for Aggregate Only Query

2020-06-05 Thread Arvid Heise
Hi Satyam, you are right, there seems to be a disconnect between javadoc and implementation. Jark probably knows more. In your case, couldn't you just add a dummy column containing a constant key? select 'revenue' AS name, sum(revenue) from lineorder and then set the dummy field as PK? On Fri,

UpsertStreamTableSink for Aggregate Only Query

2020-06-05 Thread Satyam Shekhar
Hello, I am using Flink as the query engine to build an alerting/monitoring application. One of the use cases in our product requires continuously tracking and charting the output of an aggregate only SQL query, for example, select sum(revenue) from lineorder. A desirable property from the output