Hi Lake,

This is not a problem of HBaseUpsertTableSink.
This is because the query loses primary key (e.g. concat(key1, key2) will
lose the primary key information [key1, key2] currently.),
but the validation of inserting checks the upsert query should have a
primary key. That’s why the exception is thrown.

IMO, in order to fix this problem, we need to enrich the primary key
inference to support all kinds of built-in function/operators.
But this is a large work which means it may not happen in 1.9.1.

Regards,
Jark

On Thu, 12 Sep 2019 at 14:39, LakeShen <shenleifight...@gmail.com> wrote:

> Hi community , when I create the hbase sink table  in my  flink ddl sql
> ,just like this:
>
>
>
>
>
> *create table sink_hbase_table(    rowkey                 VARCHAR,    cf
>                   row(    kdt_it_count  bigint )) with (xxxxxx);*
>
> and I run my flink task , it throws the exception like this :
> *UpsertStreamTableSink requires that Table has a full primary keys if it is
> updated.*
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:115)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
> at
>
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
> at
>
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>
> I saw the flink source code , I find that in HBaseUpsertTableSink , the
> method setKeyFields doesnt' has any code content,in StreamExecSink class,I
> saw the code content like this :
> *//TODO UpsertStreamTableSink setKeyFields interface should be
> Array[Array[String]]*
> but now the  UpsertStreamTableSink setKeyFields interface is Array[String],
> it seems like the conflict with the above content.
> Could we use HBaseUpsertTableSink in our flink task?Thanks your reply.
>

Reply via email to