Hi community , when I create the hbase sink table  in my  flink ddl sql
,just like this:





*create table sink_hbase_table(    rowkey                 VARCHAR,    cf
                  row(    kdt_it_count  bigint )) with (xxxxxx);*

and I run my flink task , it throws the exception like this :
*UpsertStreamTableSink requires that Table has a full primary keys if it is
updated.*
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:115)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

I saw the flink source code , I find that in HBaseUpsertTableSink , the
method setKeyFields doesnt' has any code content,in StreamExecSink class,I
saw the code content like this :
*//TODO UpsertStreamTableSink setKeyFields interface should be
Array[Array[String]]*
but now the  UpsertStreamTableSink setKeyFields interface is Array[String],
it seems like the conflict with the above content.
Could we use HBaseUpsertTableSink in our flink task?Thanks your reply.

Reply via email to