[ https://issues.apache.org/jira/browse/FLINK-22498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17334438#comment-17334438 ]
Jark Wu commented on FLINK-22498: --------------------------------- This is a problem on Kudu sink, the Kudu sink implementation should upgrade to new {{DynamicTableSink}}. > cast the primary key for source table that has a decimal primary key as > string, and then insert into a kudu table that has a string primary key throw > the exception : UpsertStreamTableSink requires that Table has a full primary > keys if it is updated > -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-22498 > URL: https://issues.apache.org/jira/browse/FLINK-22498 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.12.1 > Environment: flink 1.12.1 > jdk 1.8 > hive 2.1.1 > kudu 1.10.0 > kafka 2.0.0 > Reporter: Carl > Priority: Critical > Attachments: bug.rar > > Original Estimate: 240h > Remaining Estimate: 240h > > *1. source table:* > CREATE TABLE ddl_source ( > appl_seq DECIMAL(16,2), > name STRING, > PRIMARY KEY(appl_seq) NOT ENFORCED > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'ogg-json-03', > 'properties.bootstrap.servers' = 'xxxx:9092', > 'value.format' = 'canal-json' > ) > *2. sink table:*create the table use impala > create table rt_dwd.test_bug( > pk string , > name string , > primary key (pk) > ) partition by hash (pk) partitions 5 stored as kudu > TBLPROPERTIES ('kudu.master_addresses' = 'xxxx:7051'); > *3. execute sql:*use blink planner > insert into kuducatalog.default_database.`rt_dwd.test_bug` > select CAST(appl_seq AS STRING), name from ddl_source > > *throw an exception :* > Exception in thread "main" org.apache.flink.table.api.TableException: > UpsertStreamTableSink requires that Table has a full primary keys if it is > updated.Exception in thread "main" org.apache.flink.table.api.TableException: > UpsertStreamTableSink requires that Table has a full primary keys if it is > updated. at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.AbstractTraversable.map(Traversable.scala:104) at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:167) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666) > > *case A:if we use source table as follows, it will not throw the exception :* > CREATE TABLE ddl_source ( > appl_seq STRING, > name STRING, > PRIMARY KEY(appl_seq) NOT ENFORCED > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'ogg-json-03', > 'properties.bootstrap.servers' = 'xxxx:9092', > 'value.format' = 'canal-json' > ) > > *case B:or we ddl kudu table,and use sql as follows, it will not throw the > exception :* > _DDL:_ > create table rt_dwd.test_bug( > pk decimal(16,2), > name string , > primary key (pk) > ) partition by hash (pk) partitions 5 stored as kudu > TBLPROPERTIES ('kudu.master_addresses' = 'xxxx:7051'); > _DML:_ > insert into kuducatalog.default_database.`rt_dwd.test_bug` > select appl_seq, name from ddl_source > > *When debugging the source code, it may be related to SQL parsing engine* -- This message was sent by Atlassian Jira (v8.3.4#803005)