[ 
https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17477570#comment-17477570
 ] 

Martijn Visser commented on FLINK-25560:
----------------------------------------

[~Bruce Wong] If I look at your PR, there have already been two reviewers. You 
should reach consensus with them on the requested changes and not ask for more 
reviewers. 

> Add "sink.delete.mode" in HBase sql connector for retracting the latest 
> version or all versions in changelog mode
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-25560
>                 URL: https://issues.apache.org/jira/browse/FLINK-25560
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / HBase
>            Reporter: Bruce Wong
>            Assignee: Jing Ge
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: image-2022-01-11-20-02-17-780.png, 
> image-2022-01-11-20-04-48-299.png, image-2022-01-11-20-05-53-217.png, 
> image-2022-01-11-20-07-43-900.png, image-2022-01-11-20-09-29-074.png
>
>
> h1. Motivation
> When we synchronize data from mysql to HBase, we find that when deleting data 
> from mysql, HBase cannot delete all versions, which leads to incorrect 
> semantics. So we want to add a parameter to control deleting the latest 
> version or deleting all versions.
> h1. Usage
> The test code is as follows.
> {code:java}
> package com.bruce;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.TableConfig;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import static 
> org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER;
> public class KafkaToHBase {
>     public static void main(String[] args) {
>         Configuration cfg = new Configuration();
>         cfg.setBoolean(LOCAL_START_WEBSERVER, true);
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg);
>         env.setParallelism(1);
>         EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
>                 .inStreamingMode()
>                 .build();
>         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
> envSettings);
> //        TableConfig config = tEnv.getConfig();
> //        config.setIdleStateRetention(Duration.ofHours(2));
>         String source = "CREATE TEMPORARY TABLE IF NOT EXISTS 
> kafka_llspay_bundles(\n" +
>                 "  id                   STRING,\n" +
>                 "  category_id           STRING,\n" +
>                 "  upc                   STRING,\n" +
>                 "  `name`                STRING,\n" +
>                 "  price_cents           STRING,\n" +
>                 "  original_price_cents  STRING,\n" +
>                 "  short_desc            STRING,\n" +
>                 "  desc                  STRING,\n" +
>                 "  cover_url             STRING,\n" +
>                 "  created_at            STRING,\n" +
>                 "  updated_at            STRING,\n" +
>                 "  deleted_at            STRING,\n" +
>                 "  extra                 STRING,\n" +
>                 "  status                STRING,\n" +
>                 "  scholarship_cents     STRING,\n" +
>                 "  is_payback            STRING,\n" +
>                 "  is_support_iap        STRING,\n" +
>                 "  iap_product_id        STRING,\n" +
>                 "  neo_product_code      STRING,\n" +
>                 "  paid_redirect_url     STRING,\n" +
>                 "  subscription_type     STRING\n" +
>                 ") WITH (\n" +
>                 "  'connector' = 'kafka',\n" +
>                 "  'topic' = 'dim-bundles',\n" +
>                 "  'properties.bootstrap.servers' = 'localhost:9092',\n" +
>                 "  'properties.group.id' = 'vvp_dev',\n" +
>                 "  'scan.startup.mode' = 'latest-offset',\n" +
>                 "  'value.debezium-json.schema-include' = 'true',\n" +
>                 "  'value.format' = 'debezium-json',\n" +
>                 "  'value.debezium-json.ignore-parse-errors' = 'true'\n" +
>                 ")";
>         String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" +
>                 "    rowkey      STRING,\n" +
>                 "    cf ROW<id STRING, category_id STRING, upc STRING, `name` 
> STRING, price_cents STRING, original_price_cents STRING, short_desc STRING, 
> `desc` STRING, cover_url STRING, created_at STRING, updated_at STRING, 
> deleted_at STRING, extra STRING, status STRING, scholarship_cents STRING, 
> is_payback STRING, is_support_iap STRING, iap_product_id STRING, 
> neo_product_code STRING, paid_redirect_url STRING, subscription_type 
> STRING>\n" +
>                 ") with (\n" +
>                 "  'connector'='hbase-2.2',\n" +
>                 "  'table-name'='dim_hbase',\n" +
>                 "  'zookeeper.quorum'='localhost:2181',\n" +
>                 "  'sink.buffer-flush.max-size' = '0',\n" +
>                 "  'sink.buffer-flush.max-rows' = '1',\n" +
>                 "  'sink.delete.mode' = 'all-versions'\n" +
>                 ")";
>         String dml = "INSERT INTO dim_hbase\n" +
>                 "SELECT \n" +
>                 "  upc as rowkey,\n" +
>                 "  ROW(\n" +
>                 "    id, category_id, upc, `name`, price_cents, 
> original_price_cents, short_desc, `desc` , cover_url , created_at, 
> updated_at, deleted_at, extra , status , scholarship_cents , is_payback , 
> is_support_iap , iap_product_id , neo_product_code , paid_redirect_url , 
> subscription_type)\n" +
>                 "FROM kafka_llspay_bundles";
>         tEnv.executeSql(source);
>         tEnv.executeSql(sink);
>         tEnv.executeSql(dml);
>     }
> } {code}
> h1. Test
>  # After the test, we found that the deleted CF was the CF specified in the 
> Flink SQL DML statement, without affecting other CF.
> !image-2022-01-11-20-02-17-780.png|width=793,height=294!
> !image-2022-01-11-20-04-48-299.png|width=793,height=338!
> !image-2022-01-11-20-05-53-217.png|width=856,height=479!
> Two CFs to one CF. 
> !image-2022-01-11-20-07-43-900.png|width=859,height=422!
> The data of cf2 is still there. So the deleted CF was the CF specified in the 
> Flink SQL DML statement, without affecting other CF.
> !image-2022-01-11-20-09-29-074.png|width=877,height=495!
> h1. Reference
> Please look at the following task link.
> FLINK-25330
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to