Instead of changing the query, I used to embed the query in a larger
context for similar works.

So if you get an arbitrary query X which produces exactly one result (e.g.
X = select sum(revenue) from lineorder group by 1) then you can craft a
query where you add a dummy pk to the result.

Table original = env.sqlQuery(X);
Table withDummy = original.select("'dummy' as pk, *');

On Fri, Jun 5, 2020 at 9:59 PM Satyam Shekhar <satyamshek...@gmail.com>
wrote:

> Hey Arvid,
>
> Thanks for the reply.
>
> As you suggested, rewriting the query to add a dummy output and group by
> the clause - "select 1, sum(revenue) from lineorder group by 1" does add
> a unique key column to the output, and the pipeline succeeds.
>
> However, the application may get arbitrary SQL from the upstream server.
> This makes the solution tricky - I'd have to change the query to add dummy
> grouping key for all grouping nodes in the query and projection node to
> drop the dummy key. I can try to account for this upstream (in query
> generation layer) but it would prefer to have it solved within the
> execution engine itself.
>
> Regards,
> Satyam
>
> On Fri, Jun 5, 2020 at 11:59 AM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Satyam,
>>
>> you are right, there seems to be a disconnect between javadoc and
>> implementation. Jark probably knows more.
>>
>> In your case, couldn't you just add a dummy column containing a constant
>> key?
>>
>> select 'revenue' AS name, sum(revenue) from lineorder
>>
>> and then set the dummy field as PK?
>>
>> On Fri, Jun 5, 2020 at 12:28 PM Satyam Shekhar <satyamshek...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I am using Flink as the query engine to build an alerting/monitoring
>>> application. One of the use cases in our product requires continuously
>>> tracking and charting the output of an aggregate only SQL query,
>>> for example, select sum(revenue) from lineorder. A desirable property
>>> from the output of Flink job for such a query is that there is always
>>> exactly 1 row in the result set (or that the number of rows does not fall
>>> to 0 due to retractions for previous output).  In other words, I need
>>> upsert "like" semantics for the output of the query.
>>>
>>> I was hopeful after reading comments in UpsertStreamTableSink.java that
>>> this condition is accounted for in the implementation, however, a pipeline
>>> with above query writing to a concrete UpsertStreamTableSink fails with the
>>> following error  - "UpsertStreamTableSink requires that Table has" + "
>>> a full primary keys if it is updated." Here are the relevant comments
>>> from UpsertStreamTableSink.java for reference -
>>>
>>> ```
>>> Configures the unique key fields of the {@link Table} to write. The
>>> method is called after {@link TableSink#configure(String[],
>>> TypeInformation[])}.
>>>
>>> <p>The keys array might be empty, if the table consists of a single
>>> (updated) record. If the table does not have a key and is append-only, the
>>> keys attribute is null.
>>>
>>> @param keys the field names of the table's keys, an empty array if the
>>> table has a single row, and null if the table is append-only and has no key.
>>> void setKeyFields(String[] keys);
>>> ```
>>>
>>> The code in StreamExec(Legacy)Sink.scala appears to conform to observed
>>> failure and does not match the comment about "empty key array if the table
>>> consists of a single record".
>>>
>>>  With that context, I have the following questions -
>>>
>>> 1. Is the UpsertStreamTableSink expected to consume the output of such
>>> aggregate only queries? Or is my interpretation of the code and comment
>>> wrong and I have misconfigured UpsertStreamTableSink?
>>> 2. If the answer to (1) is no, are there any recommended patterns for
>>> solving this use-case such that the client never observes an empty result
>>> set for the output of this query?
>>>
>>> Regards,
>>> Satyam
>>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to