Hi Satyam,

I guess your destination database table doesn't have a primary key, right?
If this is the case, I think maybe the upcoming 1.11 release with new sink
interface (FLIP-95) can better resolve this.

In the new sink interface:
- the primary key is always defined on Flink SQL DDL
- the planner will not infer or validate the primary key of the query
anymore.
- you can get either the query contains UPDATE/DELETE changes or is an
INSERT only query vis the parameter of
`DynamicTableSink#getChangelogMode(queryChangeMode)`

So if the `queryChangeMode` contains UPDATE changes, and DDL doesn't have
any PK, you can set a flag in your sink to indicate it should work in
"remove-insert" mode.

Best,
Jark

On Mon, 8 Jun 2020 at 15:40, Satyam Shekhar <satyamshek...@gmail.com> wrote:

> Hi Jark,
>
> I wish to atomically update the destination with remove-insert. To pick
> that strategy, I need some "hint" from Flink that the output is a global
> aggregation with no grouping key, and that appends should overwrite the
> previous value.
>
> I am also exploring handling the issue in the upstream server (in query
> generation layer) where I have this knowledge based on the context (similar
> to what Arvid suggested). I may be able to get around this problem by
> handling it upstream.
>
> Regards,
> Satyam
>
> On Sun, Jun 7, 2020 at 8:05 PM Jark Wu <imj...@gmail.com> wrote:
>
>> Hi Satyam,
>>
>> Currently, `UpsertStreamTableSink` requires the query to contain a
>> primary key, and the key will be set to
>> `UpsertStreamTableSink#setKeyFields`.
>> If there is no primary key in the query, an error will be thrown as you
>> can see.
>>
>> It should work for all the group by queries (if no projection on the
>> group by after the aggregation).
>> Global aggregation is special, it doesn't have a primary key. But an
>> upsert sink requires a primary key, otherwise it doesn't know which row to
>> update.
>> How would you write such a result into an external database if no primary
>> key? Will you write them in append fashion, or remove-insert fashion?
>>
>> Best,
>> Jark
>>
>>
>> On Sat, 6 Jun 2020 at 04:32, Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Instead of changing the query, I used to embed the query in a larger
>>> context for similar works.
>>>
>>> So if you get an arbitrary query X which produces exactly one result
>>> (e.g. X = select sum(revenue) from lineorder group by 1) then you can
>>> craft a query where you add a dummy pk to the result.
>>>
>>> Table original = env.sqlQuery(X);
>>> Table withDummy = original.select("'dummy' as pk, *');
>>>
>>> On Fri, Jun 5, 2020 at 9:59 PM Satyam Shekhar <satyamshek...@gmail.com>
>>> wrote:
>>>
>>>> Hey Arvid,
>>>>
>>>> Thanks for the reply.
>>>>
>>>> As you suggested, rewriting the query to add a dummy output and group
>>>> by the clause - "select 1, sum(revenue) from lineorder group by 1"
>>>> does add a unique key column to the output, and the pipeline succeeds.
>>>>
>>>> However, the application may get arbitrary SQL from the upstream
>>>> server. This makes the solution tricky - I'd have to change the query to
>>>> add dummy grouping key for all grouping nodes in the query and projection
>>>> node to drop the dummy key. I can try to account for this upstream (in
>>>> query generation layer) but it would prefer to have it solved within the
>>>> execution engine itself.
>>>>
>>>> Regards,
>>>> Satyam
>>>>
>>>> On Fri, Jun 5, 2020 at 11:59 AM Arvid Heise <ar...@ververica.com>
>>>> wrote:
>>>>
>>>>> Hi Satyam,
>>>>>
>>>>> you are right, there seems to be a disconnect between javadoc and
>>>>> implementation. Jark probably knows more.
>>>>>
>>>>> In your case, couldn't you just add a dummy column containing a
>>>>> constant key?
>>>>>
>>>>> select 'revenue' AS name, sum(revenue) from lineorder
>>>>>
>>>>> and then set the dummy field as PK?
>>>>>
>>>>> On Fri, Jun 5, 2020 at 12:28 PM Satyam Shekhar <
>>>>> satyamshek...@gmail.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I am using Flink as the query engine to build an alerting/monitoring
>>>>>> application. One of the use cases in our product requires continuously
>>>>>> tracking and charting the output of an aggregate only SQL query,
>>>>>> for example, select sum(revenue) from lineorder. A desirable
>>>>>> property from the output of Flink job for such a query is that there is
>>>>>> always exactly 1 row in the result set (or that the number of rows does 
>>>>>> not
>>>>>> fall to 0 due to retractions for previous output).  In other words, I 
>>>>>> need
>>>>>> upsert "like" semantics for the output of the query.
>>>>>>
>>>>>> I was hopeful after reading comments in UpsertStreamTableSink.java
>>>>>> that this condition is accounted for in the implementation, however, a
>>>>>> pipeline with above query writing to a concrete UpsertStreamTableSink 
>>>>>> fails
>>>>>> with the following error  - "UpsertStreamTableSink requires that
>>>>>> Table has" + " a full primary keys if it is updated." Here are the
>>>>>> relevant comments from UpsertStreamTableSink.java for reference -
>>>>>>
>>>>>> ```
>>>>>> Configures the unique key fields of the {@link Table} to write. The
>>>>>> method is called after {@link TableSink#configure(String[],
>>>>>> TypeInformation[])}.
>>>>>>
>>>>>> <p>The keys array might be empty, if the table consists of a single
>>>>>> (updated) record. If the table does not have a key and is append-only, 
>>>>>> the
>>>>>> keys attribute is null.
>>>>>>
>>>>>> @param keys the field names of the table's keys, an empty array if
>>>>>> the table has a single row, and null if the table is append-only and has 
>>>>>> no
>>>>>> key.
>>>>>> void setKeyFields(String[] keys);
>>>>>> ```
>>>>>>
>>>>>> The code in StreamExec(Legacy)Sink.scala appears to conform to
>>>>>> observed failure and does not match the comment about "empty key array if
>>>>>> the table consists of a single record".
>>>>>>
>>>>>>  With that context, I have the following questions -
>>>>>>
>>>>>> 1. Is the UpsertStreamTableSink expected to consume the output of
>>>>>> such aggregate only queries? Or is my interpretation of the code and
>>>>>> comment wrong and I have misconfigured UpsertStreamTableSink?
>>>>>> 2. If the answer to (1) is no, are there any recommended patterns for
>>>>>> solving this use-case such that the client never observes an empty result
>>>>>> set for the output of this query?
>>>>>>
>>>>>> Regards,
>>>>>> Satyam
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Arvid Heise | Senior Java Developer
>>>>>
>>>>> <https://www.ververica.com/>
>>>>>
>>>>> Follow us @VervericaData
>>>>>
>>>>> --
>>>>>
>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>> Conference
>>>>>
>>>>> Stream Processing | Event Driven | Real Time
>>>>>
>>>>> --
>>>>>
>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>
>>>>> --
>>>>> Ververica GmbH
>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>>>>> Ji (Toni) Cheng
>>>>>
>>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>

Reply via email to