Hi Jark, I wish to atomically update the destination with remove-insert. To pick that strategy, I need some "hint" from Flink that the output is a global aggregation with no grouping key, and that appends should overwrite the previous value.
I am also exploring handling the issue in the upstream server (in query generation layer) where I have this knowledge based on the context (similar to what Arvid suggested). I may be able to get around this problem by handling it upstream. Regards, Satyam On Sun, Jun 7, 2020 at 8:05 PM Jark Wu <imj...@gmail.com> wrote: > Hi Satyam, > > Currently, `UpsertStreamTableSink` requires the query to contain a primary > key, and the key will be set to `UpsertStreamTableSink#setKeyFields`. > If there is no primary key in the query, an error will be thrown as you > can see. > > It should work for all the group by queries (if no projection on the group > by after the aggregation). > Global aggregation is special, it doesn't have a primary key. But an > upsert sink requires a primary key, otherwise it doesn't know which row to > update. > How would you write such a result into an external database if no primary > key? Will you write them in append fashion, or remove-insert fashion? > > Best, > Jark > > > On Sat, 6 Jun 2020 at 04:32, Arvid Heise <ar...@ververica.com> wrote: > >> Instead of changing the query, I used to embed the query in a larger >> context for similar works. >> >> So if you get an arbitrary query X which produces exactly one result >> (e.g. X = select sum(revenue) from lineorder group by 1) then you can >> craft a query where you add a dummy pk to the result. >> >> Table original = env.sqlQuery(X); >> Table withDummy = original.select("'dummy' as pk, *'); >> >> On Fri, Jun 5, 2020 at 9:59 PM Satyam Shekhar <satyamshek...@gmail.com> >> wrote: >> >>> Hey Arvid, >>> >>> Thanks for the reply. >>> >>> As you suggested, rewriting the query to add a dummy output and group by >>> the clause - "select 1, sum(revenue) from lineorder group by 1" does >>> add a unique key column to the output, and the pipeline succeeds. >>> >>> However, the application may get arbitrary SQL from the upstream server. >>> This makes the solution tricky - I'd have to change the query to add dummy >>> grouping key for all grouping nodes in the query and projection node to >>> drop the dummy key. I can try to account for this upstream (in query >>> generation layer) but it would prefer to have it solved within the >>> execution engine itself. >>> >>> Regards, >>> Satyam >>> >>> On Fri, Jun 5, 2020 at 11:59 AM Arvid Heise <ar...@ververica.com> wrote: >>> >>>> Hi Satyam, >>>> >>>> you are right, there seems to be a disconnect between javadoc and >>>> implementation. Jark probably knows more. >>>> >>>> In your case, couldn't you just add a dummy column containing a >>>> constant key? >>>> >>>> select 'revenue' AS name, sum(revenue) from lineorder >>>> >>>> and then set the dummy field as PK? >>>> >>>> On Fri, Jun 5, 2020 at 12:28 PM Satyam Shekhar <satyamshek...@gmail.com> >>>> wrote: >>>> >>>>> Hello, >>>>> >>>>> I am using Flink as the query engine to build an alerting/monitoring >>>>> application. One of the use cases in our product requires continuously >>>>> tracking and charting the output of an aggregate only SQL query, >>>>> for example, select sum(revenue) from lineorder. A desirable property >>>>> from the output of Flink job for such a query is that there is always >>>>> exactly 1 row in the result set (or that the number of rows does not fall >>>>> to 0 due to retractions for previous output). In other words, I need >>>>> upsert "like" semantics for the output of the query. >>>>> >>>>> I was hopeful after reading comments in UpsertStreamTableSink.java >>>>> that this condition is accounted for in the implementation, however, a >>>>> pipeline with above query writing to a concrete UpsertStreamTableSink >>>>> fails >>>>> with the following error - "UpsertStreamTableSink requires that >>>>> Table has" + " a full primary keys if it is updated." Here are the >>>>> relevant comments from UpsertStreamTableSink.java for reference - >>>>> >>>>> ``` >>>>> Configures the unique key fields of the {@link Table} to write. The >>>>> method is called after {@link TableSink#configure(String[], >>>>> TypeInformation[])}. >>>>> >>>>> <p>The keys array might be empty, if the table consists of a single >>>>> (updated) record. If the table does not have a key and is append-only, the >>>>> keys attribute is null. >>>>> >>>>> @param keys the field names of the table's keys, an empty array if the >>>>> table has a single row, and null if the table is append-only and has no >>>>> key. >>>>> void setKeyFields(String[] keys); >>>>> ``` >>>>> >>>>> The code in StreamExec(Legacy)Sink.scala appears to conform to >>>>> observed failure and does not match the comment about "empty key array if >>>>> the table consists of a single record". >>>>> >>>>> With that context, I have the following questions - >>>>> >>>>> 1. Is the UpsertStreamTableSink expected to consume the output of such >>>>> aggregate only queries? Or is my interpretation of the code and comment >>>>> wrong and I have misconfigured UpsertStreamTableSink? >>>>> 2. If the answer to (1) is no, are there any recommended patterns for >>>>> solving this use-case such that the client never observes an empty result >>>>> set for the output of this query? >>>>> >>>>> Regards, >>>>> Satyam >>>>> >>>> >>>> >>>> -- >>>> >>>> Arvid Heise | Senior Java Developer >>>> >>>> <https://www.ververica.com/> >>>> >>>> Follow us @VervericaData >>>> >>>> -- >>>> >>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>> Conference >>>> >>>> Stream Processing | Event Driven | Real Time >>>> >>>> -- >>>> >>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>> >>>> -- >>>> Ververica GmbH >>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>>> (Toni) Cheng >>>> >>> >> >> -- >> >> Arvid Heise | Senior Java Developer >> >> <https://www.ververica.com/> >> >> Follow us @VervericaData >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> Ververica GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >> (Toni) Cheng >> >