[ https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17477570#comment-17477570 ]
Martijn Visser commented on FLINK-25560: ---------------------------------------- [~Bruce Wong] If I look at your PR, there have already been two reviewers. You should reach consensus with them on the requested changes and not ask for more reviewers. > Add "sink.delete.mode" in HBase sql connector for retracting the latest > version or all versions in changelog mode > ----------------------------------------------------------------------------------------------------------------- > > Key: FLINK-25560 > URL: https://issues.apache.org/jira/browse/FLINK-25560 > Project: Flink > Issue Type: Improvement > Components: Connectors / HBase > Reporter: Bruce Wong > Assignee: Jing Ge > Priority: Major > Labels: pull-request-available > Attachments: image-2022-01-11-20-02-17-780.png, > image-2022-01-11-20-04-48-299.png, image-2022-01-11-20-05-53-217.png, > image-2022-01-11-20-07-43-900.png, image-2022-01-11-20-09-29-074.png > > > h1. Motivation > When we synchronize data from mysql to HBase, we find that when deleting data > from mysql, HBase cannot delete all versions, which leads to incorrect > semantics. So we want to add a parameter to control deleting the latest > version or deleting all versions. > h1. Usage > The test code is as follows. > {code:java} > package com.bruce; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.TableConfig; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import static > org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER; > public class KafkaToHBase { > public static void main(String[] args) { > Configuration cfg = new Configuration(); > cfg.setBoolean(LOCAL_START_WEBSERVER, true); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg); > env.setParallelism(1); > EnvironmentSettings envSettings = EnvironmentSettings.newInstance() > .inStreamingMode() > .build(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, > envSettings); > // TableConfig config = tEnv.getConfig(); > // config.setIdleStateRetention(Duration.ofHours(2)); > String source = "CREATE TEMPORARY TABLE IF NOT EXISTS > kafka_llspay_bundles(\n" + > " id STRING,\n" + > " category_id STRING,\n" + > " upc STRING,\n" + > " `name` STRING,\n" + > " price_cents STRING,\n" + > " original_price_cents STRING,\n" + > " short_desc STRING,\n" + > " desc STRING,\n" + > " cover_url STRING,\n" + > " created_at STRING,\n" + > " updated_at STRING,\n" + > " deleted_at STRING,\n" + > " extra STRING,\n" + > " status STRING,\n" + > " scholarship_cents STRING,\n" + > " is_payback STRING,\n" + > " is_support_iap STRING,\n" + > " iap_product_id STRING,\n" + > " neo_product_code STRING,\n" + > " paid_redirect_url STRING,\n" + > " subscription_type STRING\n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = 'dim-bundles',\n" + > " 'properties.bootstrap.servers' = 'localhost:9092',\n" + > " 'properties.group.id' = 'vvp_dev',\n" + > " 'scan.startup.mode' = 'latest-offset',\n" + > " 'value.debezium-json.schema-include' = 'true',\n" + > " 'value.format' = 'debezium-json',\n" + > " 'value.debezium-json.ignore-parse-errors' = 'true'\n" + > ")"; > String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" + > " rowkey STRING,\n" + > " cf ROW<id STRING, category_id STRING, upc STRING, `name` > STRING, price_cents STRING, original_price_cents STRING, short_desc STRING, > `desc` STRING, cover_url STRING, created_at STRING, updated_at STRING, > deleted_at STRING, extra STRING, status STRING, scholarship_cents STRING, > is_payback STRING, is_support_iap STRING, iap_product_id STRING, > neo_product_code STRING, paid_redirect_url STRING, subscription_type > STRING>\n" + > ") with (\n" + > " 'connector'='hbase-2.2',\n" + > " 'table-name'='dim_hbase',\n" + > " 'zookeeper.quorum'='localhost:2181',\n" + > " 'sink.buffer-flush.max-size' = '0',\n" + > " 'sink.buffer-flush.max-rows' = '1',\n" + > " 'sink.delete.mode' = 'all-versions'\n" + > ")"; > String dml = "INSERT INTO dim_hbase\n" + > "SELECT \n" + > " upc as rowkey,\n" + > " ROW(\n" + > " id, category_id, upc, `name`, price_cents, > original_price_cents, short_desc, `desc` , cover_url , created_at, > updated_at, deleted_at, extra , status , scholarship_cents , is_payback , > is_support_iap , iap_product_id , neo_product_code , paid_redirect_url , > subscription_type)\n" + > "FROM kafka_llspay_bundles"; > tEnv.executeSql(source); > tEnv.executeSql(sink); > tEnv.executeSql(dml); > } > } {code} > h1. Test > # After the test, we found that the deleted CF was the CF specified in the > Flink SQL DML statement, without affecting other CF. > !image-2022-01-11-20-02-17-780.png|width=793,height=294! > !image-2022-01-11-20-04-48-299.png|width=793,height=338! > !image-2022-01-11-20-05-53-217.png|width=856,height=479! > Two CFs to one CF. > !image-2022-01-11-20-07-43-900.png|width=859,height=422! > The data of cf2 is still there. So the deleted CF was the CF specified in the > Flink SQL DML statement, without affecting other CF. > !image-2022-01-11-20-09-29-074.png|width=877,height=495! > h1. Reference > Please look at the following task link. > FLINK-25330 > -- This message was sent by Atlassian Jira (v8.20.1#820001)