CDC format like debezium-json and canal-json support read ROWKIND metadata.


1. Our first scenario is syncing data of operational tables into our streaming 
warehouse. 
All operational data in mysql should NOT be physically deleted, so we use 
"is_deleted" column to do logical delete, and there should NOT be any delete 
operations happen on our streaming warehouse. 
But as data grows up quickly we need to delete old data such as half year ago 
in operational table to keep table size manageable and ensure the query 
performance not to be decreased. These deleted records for maintain purpose 
should be not synced into our streaming warehouse. So we have to filter our 
them in our flink sql jobs. But currently it is not convenient to do ROWKIND 
filtering. That is why I ask flink support read ROWKIND metadata by ROW_KIND() 
function. Then we can use the following flink sql to do filtering. For example:
   
    create table customer_source (
      id BIGINT PRIMARY KEY NOT ENFORCED,
      name STRING,
      region STRING
   ) with (
      'connector' = 'kafka',
      'format' = 'canal-json',
      ...
   );


   create table customer_sink (
      id BIGINT PRIMARY KEY NOT ENFORCED,
      name STRING,
      region STRING
   ) with (
      'connector' = 'paimon'
      ...
   );


  INSERT INTO customer_sink SELECT * FROM customer_source WHERE ROW_KIND() <> 
'-D';


2. Out secondary scenario is we need sink aggregation result into MQ which does 
NOT support retract data. Although flink provide upsert kafka connector, but 
unfortunetly our sink system is NOT kafka, so we have to write customized 
connector like upsert-kafka again. If flink sql support filter data by ROWKIND, 
we don't need write any more upsert-xxx connector. For example,
  
   create table customer_source (
      id BIGINT PRIMARY KEY NOT ENFORCED,
      name STRING,
      region STRING
   ) with (
      'connector' = 'kafka',
      'format' = 'canal-json',
      ...
   );


   create table customer_agg_sink (
      region STRING,
      cust_count BIGINT
   ) with (
      'connector' = 'MQ',
      'format' = 'json',
      ...
   );


   INSERT INTO customer_agg_sink SELECT * FROM (SELECT region, count(1) as 
cust_count  from customer_source group by region) t WHERE ROW_KIND() <> '-U' 
AND         ROW_KIND() <>     '-D';


How do you think? Looking forward to your feedback, thanks!

Reply via email to