[ 
https://issues.apache.org/jira/browse/FLINK-26123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Carl updated FLINK-26123:
-------------------------
    Component/s: Connectors / Kafka

> The data of the upsert-kafka source cannot be written to HBase under sql 
> where conditions
> -----------------------------------------------------------------------------------------
>
>                 Key: FLINK-26123
>                 URL: https://issues.apache.org/jira/browse/FLINK-26123
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Table SQL / API, Table SQL / Planner
>    Affects Versions: 1.13.1
>            Reporter: Carl
>            Priority: Major
>         Attachments: image-2022-02-14-16-43-13-172.png, 
> image-2022-02-14-16-47-26-820.png, image-2022-02-14-17-14-43-158.png, 
> image-2022-02-14-17-15-27-611.png, image-2022-02-14-17-18-23-864.png, 
> image-2022-02-14-17-20-04-228.png, image-2022-02-14-17-30-02-525.png, 
> image-2022-02-14-17-32-40-475.png
>
>
> *1. source table :*
> *(1) kafka topic :*
> ./kafka-topics.sh -create -zookeeper kafka01:2181,kafka02:2181,kafka03:2181 
> -replication-factor 2 -partitions 2 -topic sink-hbase-where-01
> *(2) flink kafka table :*
> create table hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source(
>     id         string
>     , ver      int
>     , dp_last_chg_time    timestamp(3)
>     , kafka_ts            timestamp(3) METADATA FROM 'timestamp' VIRTUAL
>     , load_ts             AS PROCTIME()
>     , ts                  as dp_last_chg_time
>     , WATERMARK FOR dp_last_chg_time AS dp_last_chg_time - INTERVAL '20' 
> SECOND
>     , PRIMARY KEY (id) not enforced
> )WITH (
>     'connector' = 'upsert-kafka',
>     'topic' = 'sink-hbase-where-01',
>     'properties.group.id' = 'sink-hbase-where-group-01',
>     'properties.zookeeper.connect' = '...',
>     'properties.bootstrap.servers' = '...',
>     'key.format' = 'json',
>     'key.json.ignore-parse-errors' = 'true',
>     'value.format' = 'json',
>     'value.json.ignore-parse-errors' = 'true',
>     'value.fields-include' = 'ALL'
> );
>  
> *2. sink table :*
> CREATE TABLE hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink 
> (
>     `pk` STRING
>     , info1 ROW<ver string, dp_last_chg_time string, kafka_ts string, load_ts 
> string>
>     , PRIMARY KEY (`pk`) NOT ENFORCED
> ) WITH (
>     'connector' = 'hbase-2.2',
>     'table-name' = 'sink-hbase-where',
>     'sink.buffer-flush.max-size' = '0',
>     'sink.buffer-flush.max-rows' ='0',
>     'zookeeper.quorum' = '...'
> );
>  
> *3. flink sql :* 
> insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink
> select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts)
> from (
>     select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, 
> cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as 
> kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts
>     from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1
>     *where cast(t1.id as string) = '555'*
> )
>  
> *4. test*
> *test 1 :*
> (1) produce kafka message:
> {"id":"{*}555{*}"},\{"id":"555","ver":1,"dp_last_chg_time":"2022-02-14 
> 12:04:00"}
> (2) scan hbase table :
> !image-2022-02-14-17-14-43-158.png!
> (3) produce kafka message:
> {"id":"{*}555{*}"},{"id":"555","ver":{*}2{*},"dp_last_chg_time":"2022-02-14 
> 12:04:00"}
> (4) scan hbase table :
> !image-2022-02-14-17-15-27-611.png!
> (5) ** produce kafka message:
> {"id":"{*}666{*}"},{"id":"666","ver":{*}1{*},"dp_last_chg_time":"2022-02-14 
> 12:04:00"}
> (6) scan hbase table :
> !image-2022-02-14-17-18-23-864.png!
>  
> *test 2 :*
> (1) cancel the flink app in idea
> (2) truncate hbase table :
> !image-2022-02-14-16-47-26-820.png!
> (3) run the flink app in idea 
> (4) scan hbase table :
> No data was written to HBase.
> !image-2022-02-14-17-20-04-228.png!
>  
> *test 3 :*
> (1) cancel the flink app in idea
> (2) Delete where condition :
> insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink
> select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts)
> from (
>     select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, 
> cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as 
> kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts
>     from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1
>     --{*}where cast(t1.id as string) = '555'{*}
> )
> (3) run the flink app in idea 
> (4) scan hbase table :
> the data was written to HBase :
> !image-2022-02-14-17-30-02-525.png!
>  
> *test 4 :*
> (1) cancel the flink app in idea
> (2) update the where condition :
> insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink
> select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts)
> from (
>     select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, 
> cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as 
> kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts
>     from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1
>     --{*}where cast(t1.id as string) = '666'{*}
> )
> (3) run the flink app in idea 
> (4) scan hbase table :
> the data was written to HBase :
> !image-2022-02-14-17-32-40-475.png!
>  
> From the test results,
> (1) there are two pieces of data with a primary key of '555'
> {"id":"555"}
> ,\{"id":"555","ver":1,"dp_last_chg_time":"2022-02-14 12:04:00"}
> {"id":"555"}
> ,\{"id":"555","ver":2,"dp_last_chg_time":"2022-02-14 12:04:00"}
> (2) there is one piece of data with a primary key of '666'
> {"id":"666"}
> ,\{"id":"666","ver":1,"dp_last_chg_time":"2022-02-14 12:04:00"}
>  
> *re-run the flink sql in idea, the following conclusions are drawn*
> (1) With the following SQL, both data can be written in
> insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink
> select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts)
> from (
>     select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, 
> cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as 
> kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts
>     from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1
> )
> (2) With the following SQL, no data can be written in
> insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink
> select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts)
> from (
>     select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, 
> cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as 
> kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts
>     from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1
>     *where cast(t1.id as string) = '555'*
> )
> (2) With the following SQL, '666' data can be written in
> insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink
> select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts)
> from (
>     select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, 
> cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as 
> kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts
>     from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1
>     *where cast(t1.id as string) = '666'*
> )
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to