Hi, I use flink 1.9.1, sql as follows, ---------------------------------------- INSERT INTO a SELECT c1, c2, c3, c4 FROM ( SELECT *,ROW_NUMBER() OVER (PARTITION BY c1, c2, c3 ORDER BY c4 DESC) AS rownum" + FROM t) WHERE rownum <= 1 ---------------------------------------- This sql,returns RetractStream, but flink JDBCUpsertTableSink is UpsertStreamTableSink, so this sql throw Exception as follows: ---------------------------------------- Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114)
---------------------------------------- My problem is how to insert to mysql. polaris...@gmail.com