Hi community , when I create the hbase sink table in my flink ddl sql ,just like this:
*create table sink_hbase_table( rowkey VARCHAR, cf row( kdt_it_count bigint )) with (xxxxxx);* and I run my flink task , it throws the exception like this : *UpsertStreamTableSink requires that Table has a full primary keys if it is updated.* at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:115) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) I saw the flink source code , I find that in HBaseUpsertTableSink , the method setKeyFields doesnt' has any code content,in StreamExecSink class,I saw the code content like this : *//TODO UpsertStreamTableSink setKeyFields interface should be Array[Array[String]]* but now the UpsertStreamTableSink setKeyFields interface is Array[String], it seems like the conflict with the above content. Could we use HBaseUpsertTableSink in our flink task?Thanks your reply.