Hello, I am using Flink as the query engine to build an alerting/monitoring application. One of the use cases in our product requires continuously tracking and charting the output of an aggregate only SQL query, for example, select sum(revenue) from lineorder. A desirable property from the output of Flink job for such a query is that there is always exactly 1 row in the result set (or that the number of rows does not fall to 0 due to retractions for previous output). In other words, I need upsert "like" semantics for the output of the query.
I was hopeful after reading comments in UpsertStreamTableSink.java that this condition is accounted for in the implementation, however, a pipeline with above query writing to a concrete UpsertStreamTableSink fails with the following error - "UpsertStreamTableSink requires that Table has" + " a full primary keys if it is updated." Here are the relevant comments from UpsertStreamTableSink.java for reference - ``` Configures the unique key fields of the {@link Table} to write. The method is called after {@link TableSink#configure(String[], TypeInformation[])}. <p>The keys array might be empty, if the table consists of a single (updated) record. If the table does not have a key and is append-only, the keys attribute is null. @param keys the field names of the table's keys, an empty array if the table has a single row, and null if the table is append-only and has no key. void setKeyFields(String[] keys); ``` The code in StreamExec(Legacy)Sink.scala appears to conform to observed failure and does not match the comment about "empty key array if the table consists of a single record". With that context, I have the following questions - 1. Is the UpsertStreamTableSink expected to consume the output of such aggregate only queries? Or is my interpretation of the code and comment wrong and I have misconfigured UpsertStreamTableSink? 2. If the answer to (1) is no, are there any recommended patterns for solving this use-case such that the client never observes an empty result set for the output of this query? Regards, Satyam