Hi Polarisary,

Look at the semantics your SQL wants to express: Top N, More practically
is: Top 1.
- Top N produce stream with primary keys contains row number, but your sql
didn't select row number, so there is not primary key.
- UpsertStreamTableSink requires primary key, So there is an exception.

The solution I suggest is:
- Solution 1: select row number too. So this Top N sql will produce primary
key.
- Solution 2: add "select c1, c2, c3, c4 from 'Top N SQL' group by c1, c2,
c3, c4" to convert this sql to sql with primary key.
- Solution 3: Optimize blink planner to produce primary key without row
number in special Top 1 SQL. I created [1] to improve it.

[1] https://issues.apache.org/jira/browse/FLINK-15678

Best,
Jingsong Lee

On Mon, Jan 20, 2020 at 9:58 AM Polarisary <polaris...@gmail.com> wrote:

> Hi,
> I use flink 1.9.1, sql as follows,
> ----------------------------------------
> INSERT INTO a
> SELECT c1, c2, c3, c4
> FROM (
> SELECT *,ROW_NUMBER() OVER (PARTITION BY c1, c2, c3 ORDER BY c4 DESC) AS
> rownum" +
> FROM t)
> WHERE rownum <= 1
> ----------------------------------------
> This sql,returns RetractStream, but flink JDBCUpsertTableSink is
> UpsertStreamTableSink, so this sql throw Exception as follows:
> ----------------------------------------
> Exception in thread "main" org.apache.flink.table.api.TableException:
> UpsertStreamTableSink requires that Table has a full primary keys if it is
> updated.
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114)
>
> ----------------------------------------
> My problem is how to insert to mysql.
>
> polaris...@gmail.com
>
>
>
>
>

-- 
Best, Jingsong Lee

Reply via email to