Hi Jark,

I wish to atomically update the destination with remove-insert. To pick
that strategy, I need some "hint" from Flink that the output is a global
aggregation with no grouping key, and that appends should overwrite the
previous value.

I am also exploring handling the issue in the upstream server (in query
generation layer) where I have this knowledge based on the context (similar
to what Arvid suggested). I may be able to get around this problem by
handling it upstream.

Regards,
Satyam

On Sun, Jun 7, 2020 at 8:05 PM Jark Wu <imj...@gmail.com> wrote:

> Hi Satyam,
>
> Currently, `UpsertStreamTableSink` requires the query to contain a primary
> key, and the key will be set to `UpsertStreamTableSink#setKeyFields`.
> If there is no primary key in the query, an error will be thrown as you
> can see.
>
> It should work for all the group by queries (if no projection on the group
> by after the aggregation).
> Global aggregation is special, it doesn't have a primary key. But an
> upsert sink requires a primary key, otherwise it doesn't know which row to
> update.
> How would you write such a result into an external database if no primary
> key? Will you write them in append fashion, or remove-insert fashion?
>
> Best,
> Jark
>
>
> On Sat, 6 Jun 2020 at 04:32, Arvid Heise <ar...@ververica.com> wrote:
>
>> Instead of changing the query, I used to embed the query in a larger
>> context for similar works.
>>
>> So if you get an arbitrary query X which produces exactly one result
>> (e.g. X = select sum(revenue) from lineorder group by 1) then you can
>> craft a query where you add a dummy pk to the result.
>>
>> Table original = env.sqlQuery(X);
>> Table withDummy = original.select("'dummy' as pk, *');
>>
>> On Fri, Jun 5, 2020 at 9:59 PM Satyam Shekhar <satyamshek...@gmail.com>
>> wrote:
>>
>>> Hey Arvid,
>>>
>>> Thanks for the reply.
>>>
>>> As you suggested, rewriting the query to add a dummy output and group by
>>> the clause - "select 1, sum(revenue) from lineorder group by 1" does
>>> add a unique key column to the output, and the pipeline succeeds.
>>>
>>> However, the application may get arbitrary SQL from the upstream server.
>>> This makes the solution tricky - I'd have to change the query to add dummy
>>> grouping key for all grouping nodes in the query and projection node to
>>> drop the dummy key. I can try to account for this upstream (in query
>>> generation layer) but it would prefer to have it solved within the
>>> execution engine itself.
>>>
>>> Regards,
>>> Satyam
>>>
>>> On Fri, Jun 5, 2020 at 11:59 AM Arvid Heise <ar...@ververica.com> wrote:
>>>
>>>> Hi Satyam,
>>>>
>>>> you are right, there seems to be a disconnect between javadoc and
>>>> implementation. Jark probably knows more.
>>>>
>>>> In your case, couldn't you just add a dummy column containing a
>>>> constant key?
>>>>
>>>> select 'revenue' AS name, sum(revenue) from lineorder
>>>>
>>>> and then set the dummy field as PK?
>>>>
>>>> On Fri, Jun 5, 2020 at 12:28 PM Satyam Shekhar <satyamshek...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I am using Flink as the query engine to build an alerting/monitoring
>>>>> application. One of the use cases in our product requires continuously
>>>>> tracking and charting the output of an aggregate only SQL query,
>>>>> for example, select sum(revenue) from lineorder. A desirable property
>>>>> from the output of Flink job for such a query is that there is always
>>>>> exactly 1 row in the result set (or that the number of rows does not fall
>>>>> to 0 due to retractions for previous output).  In other words, I need
>>>>> upsert "like" semantics for the output of the query.
>>>>>
>>>>> I was hopeful after reading comments in UpsertStreamTableSink.java
>>>>> that this condition is accounted for in the implementation, however, a
>>>>> pipeline with above query writing to a concrete UpsertStreamTableSink 
>>>>> fails
>>>>> with the following error  - "UpsertStreamTableSink requires that
>>>>> Table has" + " a full primary keys if it is updated." Here are the
>>>>> relevant comments from UpsertStreamTableSink.java for reference -
>>>>>
>>>>> ```
>>>>> Configures the unique key fields of the {@link Table} to write. The
>>>>> method is called after {@link TableSink#configure(String[],
>>>>> TypeInformation[])}.
>>>>>
>>>>> <p>The keys array might be empty, if the table consists of a single
>>>>> (updated) record. If the table does not have a key and is append-only, the
>>>>> keys attribute is null.
>>>>>
>>>>> @param keys the field names of the table's keys, an empty array if the
>>>>> table has a single row, and null if the table is append-only and has no 
>>>>> key.
>>>>> void setKeyFields(String[] keys);
>>>>> ```
>>>>>
>>>>> The code in StreamExec(Legacy)Sink.scala appears to conform to
>>>>> observed failure and does not match the comment about "empty key array if
>>>>> the table consists of a single record".
>>>>>
>>>>>  With that context, I have the following questions -
>>>>>
>>>>> 1. Is the UpsertStreamTableSink expected to consume the output of such
>>>>> aggregate only queries? Or is my interpretation of the code and comment
>>>>> wrong and I have misconfigured UpsertStreamTableSink?
>>>>> 2. If the answer to (1) is no, are there any recommended patterns for
>>>>> solving this use-case such that the client never observes an empty result
>>>>> set for the output of this query?
>>>>>
>>>>> Regards,
>>>>> Satyam
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Arvid Heise | Senior Java Developer
>>>>
>>>> <https://www.ververica.com/>
>>>>
>>>> Follow us @VervericaData
>>>>
>>>> --
>>>>
>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>> Conference
>>>>
>>>> Stream Processing | Event Driven | Real Time
>>>>
>>>> --
>>>>
>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>
>>>> --
>>>> Ververica GmbH
>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>>> (Toni) Cheng
>>>>
>>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>

Reply via email to