Hi Satyam, you are right, there seems to be a disconnect between javadoc and implementation. Jark probably knows more.
In your case, couldn't you just add a dummy column containing a constant key? select 'revenue' AS name, sum(revenue) from lineorder and then set the dummy field as PK? On Fri, Jun 5, 2020 at 12:28 PM Satyam Shekhar <satyamshek...@gmail.com> wrote: > Hello, > > I am using Flink as the query engine to build an alerting/monitoring > application. One of the use cases in our product requires continuously > tracking and charting the output of an aggregate only SQL query, > for example, select sum(revenue) from lineorder. A desirable property > from the output of Flink job for such a query is that there is always > exactly 1 row in the result set (or that the number of rows does not fall > to 0 due to retractions for previous output). In other words, I need > upsert "like" semantics for the output of the query. > > I was hopeful after reading comments in UpsertStreamTableSink.java that > this condition is accounted for in the implementation, however, a pipeline > with above query writing to a concrete UpsertStreamTableSink fails with the > following error - "UpsertStreamTableSink requires that Table has" + " a > full primary keys if it is updated." Here are the relevant comments from > UpsertStreamTableSink.java for reference - > > ``` > Configures the unique key fields of the {@link Table} to write. The method > is called after {@link TableSink#configure(String[], TypeInformation[])}. > > <p>The keys array might be empty, if the table consists of a single > (updated) record. If the table does not have a key and is append-only, the > keys attribute is null. > > @param keys the field names of the table's keys, an empty array if the > table has a single row, and null if the table is append-only and has no key. > void setKeyFields(String[] keys); > ``` > > The code in StreamExec(Legacy)Sink.scala appears to conform to observed > failure and does not match the comment about "empty key array if the table > consists of a single record". > > With that context, I have the following questions - > > 1. Is the UpsertStreamTableSink expected to consume the output of such > aggregate only queries? Or is my interpretation of the code and comment > wrong and I have misconfigured UpsertStreamTableSink? > 2. If the answer to (1) is no, are there any recommended patterns for > solving this use-case such that the client never observes an empty result > set for the output of this query? > > Regards, > Satyam > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng