Hi Satyam,

Currently, `UpsertStreamTableSink` requires the query to contain a primary
key, and the key will be set to `UpsertStreamTableSink#setKeyFields`.
If there is no primary key in the query, an error will be thrown as you can
see.

It should work for all the group by queries (if no projection on the group
by after the aggregation).
Global aggregation is special, it doesn't have a primary key. But an upsert
sink requires a primary key, otherwise it doesn't know which row to update.
How would you write such a result into an external database if no primary
key? Will you write them in append fashion, or remove-insert fashion?

Best,
Jark


On Sat, 6 Jun 2020 at 04:32, Arvid Heise <ar...@ververica.com> wrote:

> Instead of changing the query, I used to embed the query in a larger
> context for similar works.
>
> So if you get an arbitrary query X which produces exactly one result (e.g.
> X = select sum(revenue) from lineorder group by 1) then you can craft a
> query where you add a dummy pk to the result.
>
> Table original = env.sqlQuery(X);
> Table withDummy = original.select("'dummy' as pk, *');
>
> On Fri, Jun 5, 2020 at 9:59 PM Satyam Shekhar <satyamshek...@gmail.com>
> wrote:
>
>> Hey Arvid,
>>
>> Thanks for the reply.
>>
>> As you suggested, rewriting the query to add a dummy output and group by
>> the clause - "select 1, sum(revenue) from lineorder group by 1" does add
>> a unique key column to the output, and the pipeline succeeds.
>>
>> However, the application may get arbitrary SQL from the upstream server.
>> This makes the solution tricky - I'd have to change the query to add dummy
>> grouping key for all grouping nodes in the query and projection node to
>> drop the dummy key. I can try to account for this upstream (in query
>> generation layer) but it would prefer to have it solved within the
>> execution engine itself.
>>
>> Regards,
>> Satyam
>>
>> On Fri, Jun 5, 2020 at 11:59 AM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Hi Satyam,
>>>
>>> you are right, there seems to be a disconnect between javadoc and
>>> implementation. Jark probably knows more.
>>>
>>> In your case, couldn't you just add a dummy column containing a constant
>>> key?
>>>
>>> select 'revenue' AS name, sum(revenue) from lineorder
>>>
>>> and then set the dummy field as PK?
>>>
>>> On Fri, Jun 5, 2020 at 12:28 PM Satyam Shekhar <satyamshek...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I am using Flink as the query engine to build an alerting/monitoring
>>>> application. One of the use cases in our product requires continuously
>>>> tracking and charting the output of an aggregate only SQL query,
>>>> for example, select sum(revenue) from lineorder. A desirable property
>>>> from the output of Flink job for such a query is that there is always
>>>> exactly 1 row in the result set (or that the number of rows does not fall
>>>> to 0 due to retractions for previous output).  In other words, I need
>>>> upsert "like" semantics for the output of the query.
>>>>
>>>> I was hopeful after reading comments in UpsertStreamTableSink.java that
>>>> this condition is accounted for in the implementation, however, a pipeline
>>>> with above query writing to a concrete UpsertStreamTableSink fails with the
>>>> following error  - "UpsertStreamTableSink requires that Table has" + "
>>>> a full primary keys if it is updated." Here are the relevant comments
>>>> from UpsertStreamTableSink.java for reference -
>>>>
>>>> ```
>>>> Configures the unique key fields of the {@link Table} to write. The
>>>> method is called after {@link TableSink#configure(String[],
>>>> TypeInformation[])}.
>>>>
>>>> <p>The keys array might be empty, if the table consists of a single
>>>> (updated) record. If the table does not have a key and is append-only, the
>>>> keys attribute is null.
>>>>
>>>> @param keys the field names of the table's keys, an empty array if the
>>>> table has a single row, and null if the table is append-only and has no 
>>>> key.
>>>> void setKeyFields(String[] keys);
>>>> ```
>>>>
>>>> The code in StreamExec(Legacy)Sink.scala appears to conform to observed
>>>> failure and does not match the comment about "empty key array if the table
>>>> consists of a single record".
>>>>
>>>>  With that context, I have the following questions -
>>>>
>>>> 1. Is the UpsertStreamTableSink expected to consume the output of such
>>>> aggregate only queries? Or is my interpretation of the code and comment
>>>> wrong and I have misconfigured UpsertStreamTableSink?
>>>> 2. If the answer to (1) is no, are there any recommended patterns for
>>>> solving this use-case such that the client never observes an empty result
>>>> set for the output of this query?
>>>>
>>>> Regards,
>>>> Satyam
>>>>
>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Reply via email to