Hi Satyam, I guess your destination database table doesn't have a primary key, right? If this is the case, I think maybe the upcoming 1.11 release with new sink interface (FLIP-95) can better resolve this.
In the new sink interface: - the primary key is always defined on Flink SQL DDL - the planner will not infer or validate the primary key of the query anymore. - you can get either the query contains UPDATE/DELETE changes or is an INSERT only query vis the parameter of `DynamicTableSink#getChangelogMode(queryChangeMode)` So if the `queryChangeMode` contains UPDATE changes, and DDL doesn't have any PK, you can set a flag in your sink to indicate it should work in "remove-insert" mode. Best, Jark On Mon, 8 Jun 2020 at 15:40, Satyam Shekhar <satyamshek...@gmail.com> wrote: > Hi Jark, > > I wish to atomically update the destination with remove-insert. To pick > that strategy, I need some "hint" from Flink that the output is a global > aggregation with no grouping key, and that appends should overwrite the > previous value. > > I am also exploring handling the issue in the upstream server (in query > generation layer) where I have this knowledge based on the context (similar > to what Arvid suggested). I may be able to get around this problem by > handling it upstream. > > Regards, > Satyam > > On Sun, Jun 7, 2020 at 8:05 PM Jark Wu <imj...@gmail.com> wrote: > >> Hi Satyam, >> >> Currently, `UpsertStreamTableSink` requires the query to contain a >> primary key, and the key will be set to >> `UpsertStreamTableSink#setKeyFields`. >> If there is no primary key in the query, an error will be thrown as you >> can see. >> >> It should work for all the group by queries (if no projection on the >> group by after the aggregation). >> Global aggregation is special, it doesn't have a primary key. But an >> upsert sink requires a primary key, otherwise it doesn't know which row to >> update. >> How would you write such a result into an external database if no primary >> key? Will you write them in append fashion, or remove-insert fashion? >> >> Best, >> Jark >> >> >> On Sat, 6 Jun 2020 at 04:32, Arvid Heise <ar...@ververica.com> wrote: >> >>> Instead of changing the query, I used to embed the query in a larger >>> context for similar works. >>> >>> So if you get an arbitrary query X which produces exactly one result >>> (e.g. X = select sum(revenue) from lineorder group by 1) then you can >>> craft a query where you add a dummy pk to the result. >>> >>> Table original = env.sqlQuery(X); >>> Table withDummy = original.select("'dummy' as pk, *'); >>> >>> On Fri, Jun 5, 2020 at 9:59 PM Satyam Shekhar <satyamshek...@gmail.com> >>> wrote: >>> >>>> Hey Arvid, >>>> >>>> Thanks for the reply. >>>> >>>> As you suggested, rewriting the query to add a dummy output and group >>>> by the clause - "select 1, sum(revenue) from lineorder group by 1" >>>> does add a unique key column to the output, and the pipeline succeeds. >>>> >>>> However, the application may get arbitrary SQL from the upstream >>>> server. This makes the solution tricky - I'd have to change the query to >>>> add dummy grouping key for all grouping nodes in the query and projection >>>> node to drop the dummy key. I can try to account for this upstream (in >>>> query generation layer) but it would prefer to have it solved within the >>>> execution engine itself. >>>> >>>> Regards, >>>> Satyam >>>> >>>> On Fri, Jun 5, 2020 at 11:59 AM Arvid Heise <ar...@ververica.com> >>>> wrote: >>>> >>>>> Hi Satyam, >>>>> >>>>> you are right, there seems to be a disconnect between javadoc and >>>>> implementation. Jark probably knows more. >>>>> >>>>> In your case, couldn't you just add a dummy column containing a >>>>> constant key? >>>>> >>>>> select 'revenue' AS name, sum(revenue) from lineorder >>>>> >>>>> and then set the dummy field as PK? >>>>> >>>>> On Fri, Jun 5, 2020 at 12:28 PM Satyam Shekhar < >>>>> satyamshek...@gmail.com> wrote: >>>>> >>>>>> Hello, >>>>>> >>>>>> I am using Flink as the query engine to build an alerting/monitoring >>>>>> application. One of the use cases in our product requires continuously >>>>>> tracking and charting the output of an aggregate only SQL query, >>>>>> for example, select sum(revenue) from lineorder. A desirable >>>>>> property from the output of Flink job for such a query is that there is >>>>>> always exactly 1 row in the result set (or that the number of rows does >>>>>> not >>>>>> fall to 0 due to retractions for previous output). In other words, I >>>>>> need >>>>>> upsert "like" semantics for the output of the query. >>>>>> >>>>>> I was hopeful after reading comments in UpsertStreamTableSink.java >>>>>> that this condition is accounted for in the implementation, however, a >>>>>> pipeline with above query writing to a concrete UpsertStreamTableSink >>>>>> fails >>>>>> with the following error - "UpsertStreamTableSink requires that >>>>>> Table has" + " a full primary keys if it is updated." Here are the >>>>>> relevant comments from UpsertStreamTableSink.java for reference - >>>>>> >>>>>> ``` >>>>>> Configures the unique key fields of the {@link Table} to write. The >>>>>> method is called after {@link TableSink#configure(String[], >>>>>> TypeInformation[])}. >>>>>> >>>>>> <p>The keys array might be empty, if the table consists of a single >>>>>> (updated) record. If the table does not have a key and is append-only, >>>>>> the >>>>>> keys attribute is null. >>>>>> >>>>>> @param keys the field names of the table's keys, an empty array if >>>>>> the table has a single row, and null if the table is append-only and has >>>>>> no >>>>>> key. >>>>>> void setKeyFields(String[] keys); >>>>>> ``` >>>>>> >>>>>> The code in StreamExec(Legacy)Sink.scala appears to conform to >>>>>> observed failure and does not match the comment about "empty key array if >>>>>> the table consists of a single record". >>>>>> >>>>>> With that context, I have the following questions - >>>>>> >>>>>> 1. Is the UpsertStreamTableSink expected to consume the output of >>>>>> such aggregate only queries? Or is my interpretation of the code and >>>>>> comment wrong and I have misconfigured UpsertStreamTableSink? >>>>>> 2. If the answer to (1) is no, are there any recommended patterns for >>>>>> solving this use-case such that the client never observes an empty result >>>>>> set for the output of this query? >>>>>> >>>>>> Regards, >>>>>> Satyam >>>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> Arvid Heise | Senior Java Developer >>>>> >>>>> <https://www.ververica.com/> >>>>> >>>>> Follow us @VervericaData >>>>> >>>>> -- >>>>> >>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>>> Conference >>>>> >>>>> Stream Processing | Event Driven | Real Time >>>>> >>>>> -- >>>>> >>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>> >>>>> -- >>>>> Ververica GmbH >>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, >>>>> Ji (Toni) Cheng >>>>> >>>> >>> >>> -- >>> >>> Arvid Heise | Senior Java Developer >>> >>> <https://www.ververica.com/> >>> >>> Follow us @VervericaData >>> >>> -- >>> >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> >>> Stream Processing | Event Driven | Real Time >>> >>> -- >>> >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> >>> -- >>> Ververica GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>> (Toni) Cheng >>> >>