Hi Lake, This is not a problem of HBaseUpsertTableSink. This is because the query loses primary key (e.g. concat(key1, key2) will lose the primary key information [key1, key2] currently.), but the validation of inserting checks the upsert query should have a primary key. That’s why the exception is thrown.
IMO, in order to fix this problem, we need to enrich the primary key inference to support all kinds of built-in function/operators. But this is a large work which means it may not happen in 1.9.1. Regards, Jark On Thu, 12 Sep 2019 at 14:39, LakeShen <shenleifight...@gmail.com> wrote: > Hi community , when I create the hbase sink table in my flink ddl sql > ,just like this: > > > > > > *create table sink_hbase_table( rowkey VARCHAR, cf > row( kdt_it_count bigint )) with (xxxxxx);* > > and I run my flink task , it throws the exception like this : > *UpsertStreamTableSink requires that Table has a full primary keys if it is > updated.* > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:115) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50) > at > > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50) > at > > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61) > at > > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > I saw the flink source code , I find that in HBaseUpsertTableSink , the > method setKeyFields doesnt' has any code content,in StreamExecSink class,I > saw the code content like this : > *//TODO UpsertStreamTableSink setKeyFields interface should be > Array[Array[String]]* > but now the UpsertStreamTableSink setKeyFields interface is Array[String], > it seems like the conflict with the above content. > Could we use HBaseUpsertTableSink in our flink task?Thanks your reply. >