Instead of changing the query, I used to embed the query in a larger context for similar works.
So if you get an arbitrary query X which produces exactly one result (e.g. X = select sum(revenue) from lineorder group by 1) then you can craft a query where you add a dummy pk to the result. Table original = env.sqlQuery(X); Table withDummy = original.select("'dummy' as pk, *'); On Fri, Jun 5, 2020 at 9:59 PM Satyam Shekhar <satyamshek...@gmail.com> wrote: > Hey Arvid, > > Thanks for the reply. > > As you suggested, rewriting the query to add a dummy output and group by > the clause - "select 1, sum(revenue) from lineorder group by 1" does add > a unique key column to the output, and the pipeline succeeds. > > However, the application may get arbitrary SQL from the upstream server. > This makes the solution tricky - I'd have to change the query to add dummy > grouping key for all grouping nodes in the query and projection node to > drop the dummy key. I can try to account for this upstream (in query > generation layer) but it would prefer to have it solved within the > execution engine itself. > > Regards, > Satyam > > On Fri, Jun 5, 2020 at 11:59 AM Arvid Heise <ar...@ververica.com> wrote: > >> Hi Satyam, >> >> you are right, there seems to be a disconnect between javadoc and >> implementation. Jark probably knows more. >> >> In your case, couldn't you just add a dummy column containing a constant >> key? >> >> select 'revenue' AS name, sum(revenue) from lineorder >> >> and then set the dummy field as PK? >> >> On Fri, Jun 5, 2020 at 12:28 PM Satyam Shekhar <satyamshek...@gmail.com> >> wrote: >> >>> Hello, >>> >>> I am using Flink as the query engine to build an alerting/monitoring >>> application. One of the use cases in our product requires continuously >>> tracking and charting the output of an aggregate only SQL query, >>> for example, select sum(revenue) from lineorder. A desirable property >>> from the output of Flink job for such a query is that there is always >>> exactly 1 row in the result set (or that the number of rows does not fall >>> to 0 due to retractions for previous output). In other words, I need >>> upsert "like" semantics for the output of the query. >>> >>> I was hopeful after reading comments in UpsertStreamTableSink.java that >>> this condition is accounted for in the implementation, however, a pipeline >>> with above query writing to a concrete UpsertStreamTableSink fails with the >>> following error - "UpsertStreamTableSink requires that Table has" + " >>> a full primary keys if it is updated." Here are the relevant comments >>> from UpsertStreamTableSink.java for reference - >>> >>> ``` >>> Configures the unique key fields of the {@link Table} to write. The >>> method is called after {@link TableSink#configure(String[], >>> TypeInformation[])}. >>> >>> <p>The keys array might be empty, if the table consists of a single >>> (updated) record. If the table does not have a key and is append-only, the >>> keys attribute is null. >>> >>> @param keys the field names of the table's keys, an empty array if the >>> table has a single row, and null if the table is append-only and has no key. >>> void setKeyFields(String[] keys); >>> ``` >>> >>> The code in StreamExec(Legacy)Sink.scala appears to conform to observed >>> failure and does not match the comment about "empty key array if the table >>> consists of a single record". >>> >>> With that context, I have the following questions - >>> >>> 1. Is the UpsertStreamTableSink expected to consume the output of such >>> aggregate only queries? Or is my interpretation of the code and comment >>> wrong and I have misconfigured UpsertStreamTableSink? >>> 2. If the answer to (1) is no, are there any recommended patterns for >>> solving this use-case such that the client never observes an empty result >>> set for the output of this query? >>> >>> Regards, >>> Satyam >>> >> >> >> -- >> >> Arvid Heise | Senior Java Developer >> >> <https://www.ververica.com/> >> >> Follow us @VervericaData >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> Ververica GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >> (Toni) Cheng >> > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng