Hi Satyam,

you are right, there seems to be a disconnect between javadoc and
implementation. Jark probably knows more.

In your case, couldn't you just add a dummy column containing a constant
key?

select 'revenue' AS name, sum(revenue) from lineorder

and then set the dummy field as PK?

On Fri, Jun 5, 2020 at 12:28 PM Satyam Shekhar <satyamshek...@gmail.com>
wrote:

> Hello,
>
> I am using Flink as the query engine to build an alerting/monitoring
> application. One of the use cases in our product requires continuously
> tracking and charting the output of an aggregate only SQL query,
> for example, select sum(revenue) from lineorder. A desirable property
> from the output of Flink job for such a query is that there is always
> exactly 1 row in the result set (or that the number of rows does not fall
> to 0 due to retractions for previous output).  In other words, I need
> upsert "like" semantics for the output of the query.
>
> I was hopeful after reading comments in UpsertStreamTableSink.java that
> this condition is accounted for in the implementation, however, a pipeline
> with above query writing to a concrete UpsertStreamTableSink fails with the
> following error  - "UpsertStreamTableSink requires that Table has" + " a
> full primary keys if it is updated." Here are the relevant comments from
> UpsertStreamTableSink.java for reference -
>
> ```
> Configures the unique key fields of the {@link Table} to write. The method
> is called after {@link TableSink#configure(String[], TypeInformation[])}.
>
> <p>The keys array might be empty, if the table consists of a single
> (updated) record. If the table does not have a key and is append-only, the
> keys attribute is null.
>
> @param keys the field names of the table's keys, an empty array if the
> table has a single row, and null if the table is append-only and has no key.
> void setKeyFields(String[] keys);
> ```
>
> The code in StreamExec(Legacy)Sink.scala appears to conform to observed
> failure and does not match the comment about "empty key array if the table
> consists of a single record".
>
>  With that context, I have the following questions -
>
> 1. Is the UpsertStreamTableSink expected to consume the output of such
> aggregate only queries? Or is my interpretation of the code and comment
> wrong and I have misconfigured UpsertStreamTableSink?
> 2. If the answer to (1) is no, are there any recommended patterns for
> solving this use-case such that the client never observes an empty result
> set for the output of this query?
>
> Regards,
> Satyam
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to