JankoWilliam created FLINK-33566: ------------------------------------ Summary: HBase sql-connector needs overwrite the rowKey Key: FLINK-33566 URL: https://issues.apache.org/jira/browse/FLINK-33566 Project: Flink Issue Type: Improvement Components: Connectors / HBase Affects Versions: 1.18.0 Environment: flink: 1.18.0
hbase: 2.2.3 {{{}flink-connector-hbase-2.2:{}}}{{{}3.0.0-1.18{}}}{{{}{}}} Reporter: JankoWilliam Fix For: hbase-3.0.0 When I want to write label values of 50+to a rowkey column family in HBase (all values are label values of 0/1), for example: {"id":"1111","q1":"0","q2":"1","q3":"0","q4":"1",...,"q49":"0","q50":"1"} Here are four label values for example: {"id":"1111","q1":"0","q2":"1","q3":"0","q4":"1"} {code:java} --source: CREATE TABLE kafka_table_( `id` STRING, `q1` STRING, `q2` STRING, `q3` STRING, `q4` STRING ) WITH ( ... 'connector'='kafka', 'format'='json', ... ); --sink: CREATE TABLE hbase_table_ ( rowkey STRING, cf ROW<q1 INT,q2 INT,q3 INT,q4 INT>, PRIMARY KEY (rowkey ) NOT ENFORCED ) WITH ( 'connector' = 'my-hbase-2.2', 'table-name' = 'test_table', 'zookeeper.quorum' = '127.0.0.1' ); --insert: insert into hbase_table_ select id AS rowkey , ROW( cast(q1 as INT),cast(q2 as INT),cast(q3 as INT),cast(q4 as INT)) as cf from kafka_table_ ;{code} hbase: hbase(main):016:0> scan 'test_table' ROW COLUMN+CELL 1111 column=cf:q1, timestamp=0000000000001, value=\x00\x00\x00\x00 1111 column=cf:q2, timestamp=0000000000001, value=\x00\x00\x00\x01 1111 column=cf:q3, timestamp=0000000000001, value=\x00\x00\x00\x00 1111 column=cf:q4, timestamp=0000000000001, value=\x00\x00\x00\x01 Upstream data has a fixed value of 50+k-v data, among which very few value values are 1 (the default label value is 0). For example, only 1 or 2 values are 1: q2=1, q4=1, so I want HBase to store the following values: hbase(main):016:0> scan 'test_table' ROW COLUMN+CELL 1111 column=cf:q2, timestamp=0000000000001, value=\x00\x00\x00\x01 1111 column=cf:q4, timestamp=0000000000001, value=\x00\x00\x00\x01 When I use the "sink. ignore null value" keyword here, It just don't update the null value, and downstream third parties will still read all the values (such as 50+), but there are only 2 values that are truly 1: {code:java} --sink: CREATE TABLE hbase_table_ ( rowkey STRING, cf ROW<q1 INT,q2 INT,q3 INT,q4 INT>, PRIMARY KEY (rowkey ) NOT ENFORCED ) WITH ( 'connector' = 'my-hbase-2.2', 'table-name' = 'test_table', 'sink.ignore-null-value' = 'true', 'zookeeper.quorum' = '127.0.0.1' ); --insert: insert into hbase_table_ select id AS rowkey , ROW( case when q1 <> '0' then cast(q1 as INT) else null end, case when q2 <> '0' then cast(q2 as INT) else null end, case when q3 <> '0' then cast(q3 as INT) else null end, case when q4 <> '0' then cast(q4 as INT) else null end ) as cf from kafka_table_ ; {code} hbase(main):016:0> scan 'test_table' ROW COLUMN+CELL 1111 column=cf:q1, timestamp=0000000000001, value=\x00\x00\x00\x00 1111 column=cf:q2, timestamp=0000000000002, value=\x00\x00\x00\x01 1111 column=cf:q3, timestamp=0000000000001, value=\x00\x00\x00\x00 1111 column=cf:q4, timestamp=0000000000002, value=\x00\x00\x00\x01 There are no other configurations available, so I hope to have the function of overwriting and writing rowKey, that is, deleting the rowkey before adding new data. -- This message was sent by Atlassian Jira (v8.20.10#820010)