[ https://issues.apache.org/jira/browse/FLINK-26123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Carl updated FLINK-26123: ------------------------- Component/s: Connectors / Kafka > The data of the upsert-kafka source cannot be written to HBase under sql > where conditions > ----------------------------------------------------------------------------------------- > > Key: FLINK-26123 > URL: https://issues.apache.org/jira/browse/FLINK-26123 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Table SQL / API, Table SQL / Planner > Affects Versions: 1.13.1 > Reporter: Carl > Priority: Major > Attachments: image-2022-02-14-16-43-13-172.png, > image-2022-02-14-16-47-26-820.png, image-2022-02-14-17-14-43-158.png, > image-2022-02-14-17-15-27-611.png, image-2022-02-14-17-18-23-864.png, > image-2022-02-14-17-20-04-228.png, image-2022-02-14-17-30-02-525.png, > image-2022-02-14-17-32-40-475.png > > > *1. source table :* > *(1) kafka topic :* > ./kafka-topics.sh -create -zookeeper kafka01:2181,kafka02:2181,kafka03:2181 > -replication-factor 2 -partitions 2 -topic sink-hbase-where-01 > *(2) flink kafka table :* > create table hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source( > id string > , ver int > , dp_last_chg_time timestamp(3) > , kafka_ts timestamp(3) METADATA FROM 'timestamp' VIRTUAL > , load_ts AS PROCTIME() > , ts as dp_last_chg_time > , WATERMARK FOR dp_last_chg_time AS dp_last_chg_time - INTERVAL '20' > SECOND > , PRIMARY KEY (id) not enforced > )WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'sink-hbase-where-01', > 'properties.group.id' = 'sink-hbase-where-group-01', > 'properties.zookeeper.connect' = '...', > 'properties.bootstrap.servers' = '...', > 'key.format' = 'json', > 'key.json.ignore-parse-errors' = 'true', > 'value.format' = 'json', > 'value.json.ignore-parse-errors' = 'true', > 'value.fields-include' = 'ALL' > ); > > *2. sink table :* > CREATE TABLE hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink > ( > `pk` STRING > , info1 ROW<ver string, dp_last_chg_time string, kafka_ts string, load_ts > string> > , PRIMARY KEY (`pk`) NOT ENFORCED > ) WITH ( > 'connector' = 'hbase-2.2', > 'table-name' = 'sink-hbase-where', > 'sink.buffer-flush.max-size' = '0', > 'sink.buffer-flush.max-rows' ='0', > 'zookeeper.quorum' = '...' > ); > > *3. flink sql :* > insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink > select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts) > from ( > select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, > cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as > kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts > from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1 > *where cast(t1.id as string) = '555'* > ) > > *4. test* > *test 1 :* > (1) produce kafka message: > {"id":"{*}555{*}"},\{"id":"555","ver":1,"dp_last_chg_time":"2022-02-14 > 12:04:00"} > (2) scan hbase table : > !image-2022-02-14-17-14-43-158.png! > (3) produce kafka message: > {"id":"{*}555{*}"},{"id":"555","ver":{*}2{*},"dp_last_chg_time":"2022-02-14 > 12:04:00"} > (4) scan hbase table : > !image-2022-02-14-17-15-27-611.png! > (5) ** produce kafka message: > {"id":"{*}666{*}"},{"id":"666","ver":{*}1{*},"dp_last_chg_time":"2022-02-14 > 12:04:00"} > (6) scan hbase table : > !image-2022-02-14-17-18-23-864.png! > > *test 2 :* > (1) cancel the flink app in idea > (2) truncate hbase table : > !image-2022-02-14-16-47-26-820.png! > (3) run the flink app in idea > (4) scan hbase table : > No data was written to HBase. > !image-2022-02-14-17-20-04-228.png! > > *test 3 :* > (1) cancel the flink app in idea > (2) Delete where condition : > insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink > select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts) > from ( > select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, > cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as > kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts > from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1 > --{*}where cast(t1.id as string) = '555'{*} > ) > (3) run the flink app in idea > (4) scan hbase table : > the data was written to HBase : > !image-2022-02-14-17-30-02-525.png! > > *test 4 :* > (1) cancel the flink app in idea > (2) update the where condition : > insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink > select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts) > from ( > select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, > cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as > kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts > from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1 > --{*}where cast(t1.id as string) = '666'{*} > ) > (3) run the flink app in idea > (4) scan hbase table : > the data was written to HBase : > !image-2022-02-14-17-32-40-475.png! > > From the test results, > (1) there are two pieces of data with a primary key of '555' > {"id":"555"} > ,\{"id":"555","ver":1,"dp_last_chg_time":"2022-02-14 12:04:00"} > {"id":"555"} > ,\{"id":"555","ver":2,"dp_last_chg_time":"2022-02-14 12:04:00"} > (2) there is one piece of data with a primary key of '666' > {"id":"666"} > ,\{"id":"666","ver":1,"dp_last_chg_time":"2022-02-14 12:04:00"} > > *re-run the flink sql in idea, the following conclusions are drawn* > (1) With the following SQL, both data can be written in > insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink > select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts) > from ( > select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, > cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as > kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts > from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1 > ) > (2) With the following SQL, no data can be written in > insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink > select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts) > from ( > select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, > cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as > kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts > from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1 > *where cast(t1.id as string) = '555'* > ) > (2) With the following SQL, '666' data can be written in > insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink > select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts) > from ( > select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, > cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as > kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts > from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1 > *where cast(t1.id as string) = '666'* > ) > -- This message was sent by Atlassian Jira (v8.20.1#820001)