Hi,
I use flink 1.9.1, sql as follows,
----------------------------------------
INSERT INTO a
        SELECT c1, c2, c3, c4
               FROM (
                     SELECT *,ROW_NUMBER() OVER (PARTITION BY c1, c2, c3 ORDER 
BY c4 DESC) AS rownum" +
                         FROM t)
               WHERE rownum <= 1
----------------------------------------
This sql,returns RetractStream, but flink JDBCUpsertTableSink is 
UpsertStreamTableSink, so this sql throw Exception as follows:
----------------------------------------
Exception in thread "main" org.apache.flink.table.api.TableException: 
UpsertStreamTableSink requires that Table has a full primary keys if it is 
updated.
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114)

----------------------------------------
My problem is how to insert to mysql.

polaris...@gmail.com




Reply via email to