Hi Jark,

I wish to atomically update the destination with remove-insert. To pick
that strategy, I need some "hint" from Flink that the output is a global
aggregation with no grouping key, and that appends should overwrite the
previous value.

I am also exploring handling the issue in the upstream server (in query
generation layer) where I have this knowledge based on the context (similar
to what Arvid suggested). I may be able to get around this problem by
handling it upstream.


On Sun, Jun 7, 2020 at 8:05 PM Jark Wu <imj...@gmail.com> wrote:

> Hi Satyam,
> Currently, `UpsertStreamTableSink` requires the query to contain a primary
> key, and the key will be set to `UpsertStreamTableSink#setKeyFields`.
> If there is no primary key in the query, an error will be thrown as you
> can see.
> It should work for all the group by queries (if no projection on the group
> by after the aggregation).
> Global aggregation is special, it doesn't have a primary key. But an
> upsert sink requires a primary key, otherwise it doesn't know which row to
> update.
> How would you write such a result into an external database if no primary
> key? Will you write them in append fashion, or remove-insert fashion?
> Best,
> Jark
> On Sat, 6 Jun 2020 at 04:32, Arvid Heise <ar...@ververica.com> wrote:
>> Instead of changing the query, I used to embed the query in a larger
>> context for similar works.
>> So if you get an arbitrary query X which produces exactly one result
>> (e.g. X = select sum(revenue) from lineorder group by 1) then you can
>> craft a query where you add a dummy pk to the result.
>> Table original = env.sqlQuery(X);
>> Table withDummy = original.select("'dummy' as pk, *');
>> On Fri, Jun 5, 2020 at 9:59 PM Satyam Shekhar <satyamshek...@gmail.com>
>> wrote:
>>> Hey Arvid,
>>> Thanks for the reply.
>>> As you suggested, rewriting the query to add a dummy output and group by
>>> the clause - "select 1, sum(revenue) from lineorder group by 1" does
>>> add a unique key column to the output, and the pipeline succeeds.
>>> However, the application may get arbitrary SQL from the upstream server.
>>> This makes the solution tricky - I'd have to change the query to add dummy
>>> grouping key for all grouping nodes in the query and projection node to
>>> drop the dummy key. I can try to account for this upstream (in query
>>> generation layer) but it would prefer to have it solved within the
>>> execution engine itself.
>>> Regards,
>>> Satyam
>>> On Fri, Jun 5, 2020 at 11:59 AM Arvid Heise <ar...@ververica.com> wrote:
>>>> Hi Satyam,
>>>> you are right, there seems to be a disconnect between javadoc and
>>>> implementation. Jark probably knows more.
>>>> In your case, couldn't you just add a dummy column containing a
>>>> constant key?
>>>> select 'revenue' AS name, sum(revenue) from lineorder
>>>> and then set the dummy field as PK?
>>>> On Fri, Jun 5, 2020 at 12:28 PM Satyam Shekhar <satyamshek...@gmail.com>
>>>> wrote:
>>>>> Hello,
>>>>> I am using Flink as the query engine to build an alerting/monitoring
>>>>> application. One of the use cases in our product requires continuously
>>>>> tracking and charting the output of an aggregate only SQL query,
>>>>> for example, select sum(revenue) from lineorder. A desirable property
>>>>> from the output of Flink job for such a query is that there is always
>>>>> exactly 1 row in the result set (or that the number of rows does not fall
>>>>> to 0 due to retractions for previous output).  In other words, I need
>>>>> upsert "like" semantics for the output of the query.
>>>>> I was hopeful after reading comments in UpsertStreamTableSink.java
>>>>> that this condition is accounted for in the implementation, however, a
>>>>> pipeline with above query writing to a concrete UpsertStreamTableSink 
>>>>> fails
>>>>> with the following error  - "UpsertStreamTableSink requires that
>>>>> Table has" + " a full primary keys if it is updated." Here are the
>>>>> relevant comments from UpsertStreamTableSink.java for reference -
>>>>> ```
>>>>> Configures the unique key fields of the {@link Table} to write. The
>>>>> method is called after {@link TableSink#configure(String[],
>>>>> TypeInformation[])}.
>>>>> <p>The keys array might be empty, if the table consists of a single
>>>>> (updated) record. If the table does not have a key and is append-only, the
>>>>> keys attribute is null.
>>>>> @param keys the field names of the table's keys, an empty array if the
>>>>> table has a single row, and null if the table is append-only and has no 
>>>>> key.
>>>>> void setKeyFields(String[] keys);
>>>>> ```
>>>>> The code in StreamExec(Legacy)Sink.scala appears to conform to
>>>>> observed failure and does not match the comment about "empty key array if
>>>>> the table consists of a single record".
>>>>>  With that context, I have the following questions -
>>>>> 1. Is the UpsertStreamTableSink expected to consume the output of such
>>>>> aggregate only queries? Or is my interpretation of the code and comment
>>>>> wrong and I have misconfigured UpsertStreamTableSink?
>>>>> 2. If the answer to (1) is no, are there any recommended patterns for
>>>>> solving this use-case such that the client never observes an empty result
>>>>> set for the output of this query?
>>>>> Regards,
>>>>> Satyam
>>>> --
>>>> Arvid Heise | Senior Java Developer
>>>> <https://www.ververica.com/>
>>>> Follow us @VervericaData
>>>> --
>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>> Conference
>>>> Stream Processing | Event Driven | Real Time
>>>> --
>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>> --
>>>> Ververica GmbH
>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>>> (Toni) Cheng
>> --
>> Arvid Heise | Senior Java Developer
>> <https://www.ververica.com/>
>> Follow us @VervericaData
>> --
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>> Stream Processing | Event Driven | Real Time
>> --
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng

Reply via email to