自实现了kudu connector报错:
2020-09-09 18:34:59,442 WARN org.apache.flink.table.client.cli.CliClient
[] - Could not execute SQL statement.
org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL
statement.
at
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:579)
~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:515)
~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:596)
~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:315)
~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_262]
at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212)
[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142)
[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114)
[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
Caused by: org.apache.flink.table.api.TableException: UpsertStreamTableSink
requires that Table has a full primary keys if it is updated.
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93)
~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
~[flink-table_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1256)
~[flink-table_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.getPipeline(StreamTableEnvironmentImpl.java:327)
~[flink-table_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$createPipeline$1(ExecutionContext.java:284)
~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:255)
~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.table.client.gateway.local.ExecutionContext.createPipeline(ExecutionContext.java:281)
~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
... 8 more
希望是想调用DynamicTableSink,但是却调用老的 UpsertStreamTableSink了