JankoWilliam created FLINK-33566:
------------------------------------

             Summary: HBase sql-connector needs overwrite the rowKey
                 Key: FLINK-33566
                 URL: https://issues.apache.org/jira/browse/FLINK-33566
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / HBase
    Affects Versions: 1.18.0
         Environment: flink: 1.18.0

hbase: 2.2.3

{{{}flink-connector-hbase-2.2:{}}}{{{}3.0.0-1.18{}}}{{{}{}}}
            Reporter: JankoWilliam
             Fix For: hbase-3.0.0


When I want to write label values of 50+to a rowkey column family in HBase (all 
values are label values of 0/1), for example:
{"id":"1111","q1":"0","q2":"1","q3":"0","q4":"1",...,"q49":"0","q50":"1"}
Here are four label values for example:
{"id":"1111","q1":"0","q2":"1","q3":"0","q4":"1"}
{code:java}
--source:
CREATE TABLE kafka_table_(
 `id` STRING,
 `q1` STRING,
 `q2` STRING,
 `q3` STRING,
 `q4` STRING
)  WITH (
...
   'connector'='kafka',
   'format'='json',
...
);
--sink:
CREATE TABLE hbase_table_ (
 rowkey  STRING,
 cf ROW<q1 INT,q2 INT,q3 INT,q4 INT>,
 PRIMARY KEY (rowkey ) NOT ENFORCED
) WITH (
  'connector' = 'my-hbase-2.2',
  'table-name' = 'test_table',
  'zookeeper.quorum' = '127.0.0.1'
);
--insert:
insert into hbase_table_ 
select 
 id AS rowkey ,
 ROW( cast(q1 as INT),cast(q2 as INT),cast(q3 as INT),cast(q4 as INT)) as cf
 from kafka_table_  ;{code}
hbase:
hbase(main):016:0> scan 'test_table'
ROW                                                        COLUMN+CELL          
                                                                                
                                                                      
 1111                                                      column=cf:q1, 
timestamp=0000000000001, value=\x00\x00\x00\x00                                 
                                                                             
 1111                                                      column=cf:q2, 
timestamp=0000000000001, value=\x00\x00\x00\x01                                 
                                                                             
 1111                                                      column=cf:q3, 
timestamp=0000000000001, value=\x00\x00\x00\x00                                 
                                                                             
 1111                                                      column=cf:q4, 
timestamp=0000000000001, value=\x00\x00\x00\x01 
 
Upstream data has a fixed value of 50+k-v data, among which very few value 
values are 1 (the default label value is 0). For example, only 1 or 2 values 
are 1: q2=1, q4=1, so I want HBase to store the following values:
hbase(main):016:0> scan 'test_table'
ROW                                                        COLUMN+CELL          
                                                                                
                                                                      
 1111                                                      column=cf:q2, 
timestamp=0000000000001, value=\x00\x00\x00\x01                                 
                                                                            
 1111                                                      column=cf:q4, 
timestamp=0000000000001, value=\x00\x00\x00\x01 
 
When I use the "sink. ignore null value" keyword here, It just don't update the 
null value, and downstream third parties will still read all the values (such 
as 50+), but there are only 2 values that are truly 1:
{code:java}
--sink:
CREATE TABLE hbase_table_ (
 rowkey  STRING,
 cf ROW<q1 INT,q2 INT,q3 INT,q4 INT>,
 PRIMARY KEY (rowkey ) NOT ENFORCED
) WITH (
  'connector' = 'my-hbase-2.2',
  'table-name' = 'test_table',
  'sink.ignore-null-value' = 'true',
  'zookeeper.quorum' = '127.0.0.1'
); 
--insert:
insert into hbase_table_ 
select 
 id AS rowkey ,
 ROW(
 case when q1 <> '0' then cast(q1 as INT) else null end,
 case when q2 <> '0' then cast(q2 as INT) else null end,
 case when q3 <> '0' then cast(q3 as INT) else null end,
 case when q4 <> '0' then cast(q4 as INT) else null end 
) as cf
 from kafka_table_ ; {code}
hbase(main):016:0> scan 'test_table'
ROW                                                        COLUMN+CELL          
                                                                                
                                                                      
 1111                                                      column=cf:q1, 
timestamp=0000000000001, value=\x00\x00\x00\x00                                 
                                                                             
 1111                                                      column=cf:q2, 
timestamp=0000000000002, value=\x00\x00\x00\x01                                 
                                                                             
 1111                                                      column=cf:q3, 
timestamp=0000000000001, value=\x00\x00\x00\x00                                 
                                                                             
 1111                                                      column=cf:q4, 
timestamp=0000000000002, value=\x00\x00\x00\x01 
 
There are no other configurations available, so I hope to have the function of 
overwriting and writing rowKey, that is, deleting the rowkey before adding new 
data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to