Hi Polarisary, Look at the semantics your SQL wants to express: Top N, More practically is: Top 1. - Top N produce stream with primary keys contains row number, but your sql didn't select row number, so there is not primary key. - UpsertStreamTableSink requires primary key, So there is an exception.
The solution I suggest is: - Solution 1: select row number too. So this Top N sql will produce primary key. - Solution 2: add "select c1, c2, c3, c4 from 'Top N SQL' group by c1, c2, c3, c4" to convert this sql to sql with primary key. - Solution 3: Optimize blink planner to produce primary key without row number in special Top 1 SQL. I created [1] to improve it. [1] https://issues.apache.org/jira/browse/FLINK-15678 Best, Jingsong Lee On Mon, Jan 20, 2020 at 9:58 AM Polarisary <polaris...@gmail.com> wrote: > Hi, > I use flink 1.9.1, sql as follows, > ---------------------------------------- > INSERT INTO a > SELECT c1, c2, c3, c4 > FROM ( > SELECT *,ROW_NUMBER() OVER (PARTITION BY c1, c2, c3 ORDER BY c4 DESC) AS > rownum" + > FROM t) > WHERE rownum <= 1 > ---------------------------------------- > This sql,returns RetractStream, but flink JDBCUpsertTableSink is > UpsertStreamTableSink, so this sql throw Exception as follows: > ---------------------------------------- > Exception in thread "main" org.apache.flink.table.api.TableException: > UpsertStreamTableSink requires that Table has a full primary keys if it is > updated. > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114) > > ---------------------------------------- > My problem is how to insert to mysql. > > polaris...@gmail.com > > > > > -- Best, Jingsong Lee