LiuZeshan created FLINK-32139: --------------------------------- Summary: Data accidentally deleted and not deleted when upsert sink to hbase Key: FLINK-32139 URL: https://issues.apache.org/jira/browse/FLINK-32139 Project: Flink Issue Type: Bug Components: Connectors / HBase Reporter: LiuZeshan
h4. *Problem background* We meet data accidental deletion and non deletion issues when synchronizing MySQL to HBase using MySQL-CDC and HBase connectors. h3. Reproduction steps 1、The Flink job with 1 parallelism synchronize a MySQL table into HBase. SinkMaterializer is tunned off by setting {{table.exec.sink.upsert-materialize = 'NONE'}}。 MySQL table schema is as follows。 CREATE TABLE `source_sample_1001` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(200) DEFAULT NULL, `age` int(11) DEFAULT NULL, `weight` float DEFAULT NULL, PRIMARY KEY (`id`) ); The source table definition in Flink is as follows. CREATE TABLE `source_sample_1001` ( `id` bigint, `name` String, `age` bigint, `weight` float, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc' , 'hostname' = '${ip}', 'port' = '3306', 'username' = '${user}', 'password' = '${password}', 'database-name' = 'testdb_0010', 'table-name' = 'source_sample_1001' ); HBase sink table are created in {{testdb_0011}} namespace. CREATE 'testdb_0011:source_sample_1001', 'data' describe 'testdb_0011:source_sample_1001' # describe output Table testdb_0011:source_sample_1001 is ENABLED testdb_0011:source_sample_1001 COLUMN FAMILIES DESCRIPTION {NAME => 'data', BLOOMFILTER => 'ROW', IN_MEMORY => 'false', VERSIONS => '1', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', COMPRESSION => 'NONE', TTL => 'FOREVER', MIN_VERSIONS => '0' , BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'} The sink table definition in Flink. CREATE TABLE `hbase_sink1` ( `id` STRING COMMENT 'unique id', `data` ROW< `name` string, `age` bigint, `weight` float >, primary key(`id`) not enforced ) WITH ( 'connector' = 'hbase-2.2', 'table-name' = 'testdb_0011:source_sample_1001', 'zookeeper.quorum' = '${hbase.zookeeper.quorum}' ); DML in flink to synchronize data. INSERT INTO `hbase_sink1` SELECT REVERSE(CONCAT_WS('', CAST(`id` AS VARCHAR))) as `id`, ROW(`name`, `age`, `weight`) FROM `source_sample_1001`; 2、Another flink job sink datagen data to the MySQL table {{source_sample_1001}} 。id range from 1 to 10_000, that means source_sample_1001 will have at most 10_000 records。 CREATE TABLE datagen_source ( `id` int, `name` String, `age` int, `weight` float ) WITH ( 'connector' = 'datagen', 'fields.id.kind' = 'random', 'fields.id.min' = '1', 'fields.id.max' = '10000', 'fields.name.length' = '20', 'rows-per-second' = '5000' ); CREATE TABLE `source_sample_1001` ( `id` bigint, `name` String, `age` bigint, `weight` float, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://${ip}:3306/testdb_0010?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', 'table-name' = 'source_sample_1001', 'username' = '${user}', 'password' = '${password}', 'sink.buffer-flush.max-rows' = '500', 'sink.buffer-flush.interval' = '1s' ); -- dml INSERT INTO `source_sample_1001` SELECT * FROM `datagen_source`; 3、A bash script deletes the MySQL table {{source_sample_1001}} with batch 10. #!/bin/bash mysql1="mysql -h${ip} -u${user} -p${password}" batch=10 for ((i=1; ;i++)); do echo "iteration $i start" for ((j=1; j<=10000; j+=10)); do $mysql1 -e "delete from testdb_0010.source_sample_1001 where id >= $j and id < $((j+10))" done echo "iteration $i end" sleep 10 done 4、Start the above two flink jobs and the bash script. Wait for several minutes, usually 5 minutes is enough. Please note that deleting data bash script is necessary for reproduce the problem. 5、Stop the bash script, and waiting for MySQL table to fill up with 10_000 data by the datagen flink job。And then stop datagen flink job. Waiting for the sink hbase job to read all the binlog of MySQL table {{source_sample_1001}}. 6、Check the hbase table and reproduce the issue of data loss. As shown below, 67 records of data were lost in a test. hbase(main):006:0> count 'testdb_0011:source_sample_1001' 9933 row(s) Took 0.8724 seconds => 9933 Find out a missing record of data and check the raw data in HBase. hbase(main):008:0> get 'testdb_0011:source_sample_1001', '24' COLUMN CELL 0 row(s) Took 0.0029 seconds hbase(main):009:0> scan 'testdb_0011:source_sample_1001', \{RAW => true, VERSIONS => 1000, STARTROW => '24', STOPROW => '24'} ROW COLUMN+CELL 24 column=data:name, timestamp=2023-05-20T21:17:44.884, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:44.884, value=3a8f571c25a9d9040ef3 24 column=data:name, timestamp=2023-05-20T21:17:43.769, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:43.769, value=5aada98281ee0a961841 24 column=data:name, timestamp=2023-05-20T21:17:42.902, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:42.902, value=599790a9a641e6121ab3 24 column=data:name, timestamp=2023-05-20T21:17:41.614, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:41.614, value=4ece6410d32959457f80 24 column=data:name, timestamp=2023-05-20T21:17:40.885, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:40.885, value=9edcfcf1c958a7e4ae2a 24 column=data:name, timestamp=2023-05-20T21:17:40.841, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:40.841, value=3d82dcf982d5bcd5b6b7 24 column=data:name, timestamp=2023-05-20T21:17:39.788, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:39.788, value=2888a338b65caaf15b30 24 column=data:name, timestamp=2023-05-20T21:17:35.799, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:35.799, value=a8d7549e18ef0c0e8674 24 column=data:name, timestamp=2023-05-20T21:17:35.688, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:35.688, value=ada7237e52d030dcef7a 24 column=data:name, timestamp=2023-05-20T21:17:35.650, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:35.650, value=482feed26918dcdc911e 24 column=data:name, timestamp=2023-05-20T21:17:34.885, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:34.885, value=36d6bdd585dbb65dedb7 24 column=data:name, timestamp=2023-05-20T21:17:33.905, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:33.905, value=6e15c4462f8435040700 24 column=data:name, timestamp=2023-05-20T21:17:33.803, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:33.803, value=d122df5afd4eac32da72 24 column=data:name, timestamp=2023-05-20T21:17:33.693, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:33.693, value=ed603d47fedb3852b520 24 column=data:name, timestamp=2023-05-20T21:17:31.784, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:31.784, value=1ebdd5fe6310850b8098 24 column=data:name, timestamp=2023-05-20T21:17:30.684, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:30.684, value=cc628ba45d1ad07fce2f 24 column=data:name, timestamp=2023-05-20T21:17:29.812, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:29.812, value=c1d4df6e987bdb3cd0a3 24 column=data:name, timestamp=2023-05-20T21:17:29.590, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:29.590, value=535557700ca01c6b6b1e 24 column=data:name, timestamp=2023-05-20T21:17:28.876, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:28.876, value=a63c2ebfefc82eab4bcf 24 column=data:name, timestamp=2023-05-20T21:17:28.565, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:28.565, value=dd2b24ff0dfa672c49ba 24 column=data:name, timestamp=2023-05-20T21:17:27.879, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:27.879, value=69dbe1287c2bc54781ab 24 column=data:name, timestamp=2023-05-20T21:17:27.699, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:27.699, value=775d06dcbf1148e665ee 24 column=data:name, timestamp=2023-05-20T21:17:24.209, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:24.209, value=e23c010ab06125c88870 24 column=data:name, timestamp=2023-05-20T21:17:22.480, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:20.716, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:18.678, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:17.720, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:16.858, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:16.682, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:15.753, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:14.571, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:11.572, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:09.681, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:08.792, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:05.888, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:05.754, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:03.626, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:02.652, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:01.790, type=Delete 24 column=data:name, timestamp=2023-05-20T21:17:00.986, type=Delete 24 column=data:name, timestamp=2023-05-20T21:16:59.797, type=Delete 24 column=data:name, timestamp=2023-05-20T21:16:58.982, type=Delete 24 column=data:name, timestamp=2023-05-20T21:16:58.781, type=Delete 24 column=data:name, timestamp=2023-05-20T21:16:58.626, type=Delete 24 column=data:name, timestamp=2023-05-20T21:16:58.149, type=Delete 24 column=data:name, timestamp=2023-05-20T21:16:56.610, type=Delete 24 column=data:name, timestamp=2023-05-20T21:16:51.655, type=Delete 24 column=data:name, timestamp=2023-05-20T21:16:51.458, type=Delete 24 column=data:name, timestamp=2023-05-20T21:16:44.860, type=Delete 1 row(s) Took 0.1466 seconds 7、Start the bash script to delete all data of the MySQL table. Waiting for the sink hbase job to read all the binlog of MySQL table {{source_sample_1001}}. 6、Check the hbase table and reproduce the issue of data no delete. As shown below, 6 records of data were not deleted in the test. hbase(main):012:0> count 'testdb_0011:source_sample_1001' 6 row(s) Took 0.5121 seconds => 6 Check the raw data of a record in HBase. hbase(main):013:0> get 'testdb_0011:source_sample_1001', '3668' COLUMN CELL data:name timestamp=2023-05-20T21:17:26.714, value=ebb15f905622340d0351 1 row(s) Took 0.0037 seconds hbase(main):014:0> scan 'testdb_0011:source_sample_1001', \{RAW => true, VERSIONS => 1000, STARTROW => '3668', STOPROW => '3668'} ROW COLUMN+CELL 3668 column=data:name, timestamp=2023-05-20T21:17:45.728, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:45.728, value=c675a12c7cbed27599c3 3668 column=data:name, timestamp=2023-05-20T21:17:44.693, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:44.693, value=413921aa1ac44f545954 3668 column=data:name, timestamp=2023-05-20T21:17:43.854, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:43.854, value=7d44b0efc0923e4035b7 3668 column=data:name, timestamp=2023-05-20T21:17:41.721, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:41.721, value=60bfaef81bf8efdf781a 3668 column=data:name, timestamp=2023-05-20T21:17:40.763, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:40.763, value=2c371f9cd3909dd3b3f8 3668 column=data:name, timestamp=2023-05-20T21:17:37.872, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:37.872, value=9e32087cb39065976e50 3668 column=data:name, timestamp=2023-05-20T21:17:32.573, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:32.573, value=708364bf84dad4a04170 3668 column=data:name, timestamp=2023-05-20T21:17:26.811, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:26.811, value=c0e8e11eed3f8410dea9 3668 column=data:name, timestamp=2023-05-20T21:17:26.714, value=ebb15f905622340d0351 3668 column=data:name, timestamp=2023-05-20T21:17:24.310, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:24.310, value=21681a161ed2ccbe884e 3668 column=data:name, timestamp=2023-05-20T21:17:23.508, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:23.508, value=a1ef547a9efd57a7a0e2 3668 column=data:name, timestamp=2023-05-20T21:17:22.788, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:22.788, value=34e688060e6c40f4f83b 3668 column=data:name, timestamp=2023-05-20T21:17:21.746, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:17.761, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:12.610, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:11.909, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:07.846, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:06.901, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:06.758, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:06.569, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:02.689, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:17:00.344, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:16:59.961, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:16:59.415, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:16:58.916, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:16:58.781, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:16:58.718, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:16:58.339, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:16:56.340, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:16:55.883, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:16:55.683, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:16:55.056, type=Delete 3668 column=data:name, timestamp=2023-05-20T21:16:46.845, type=Delete 1 row(s) Took 0.0457 seconds h4. *Reason for the problem* The [HBase connector|https://github.com/apache/flink/blob/06688f345f6793a8964ec00002175f44cda13c33/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java#L177] use the [Delete key type|https://github.com/apache/hbase/blob/c05ee564d3026688bcfdc456071059c7c8409694/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java#L380] [without timestamp|https://github.com/apache/flink/blob/06688f345f6793a8964ec00002175f44cda13c33/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java#L168] to {{delete the latest version of the specified column. This is an expensive call in that on the server-side, it first does a get to find the latest versions timestamp. Then it adds a delete using the fetched cells timestamp}}. Causing the following issues: Problem 1: When writing update data, the timestamp of -U and +U added by the hbase server to the update message may be the same, and -U deleted the latest version of +U data, resulting in accidental deletion of the data. The problem reported by https://issues.apache.org/jira/browse/FLINK-28910 Problem 2: When there are multiple versions of HBase data, deleting the data will exposes earlier versions of the data, and resulting in the issue of data no delete. h4. *Solution proposal* Use the [DeleteColumn key type|https://github.com/apache/hbase/blob/c05ee564d3026688bcfdc456071059c7c8409694/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java#L322] and set strongly increasing timestamp for [put|https://github.com/lzshlzsh/flink/blob/a2341810a244b97a3af32951e17efbc49f570cdd/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java#L138] and [delete|https://github.com/lzshlzsh/flink/blob/a2341810a244b97a3af32951e17efbc49f570cdd/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java#L170] mutation. The delete mutation will delete all versions of the specified column with a timestamp less than or equal to the specified. I have test the proposed solution for seval days, and neither the data accidental deletion nor non deletion issues happen. -- This message was sent by Atlassian Jira (v8.20.10#820010)