我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛

Exception in thread "main" org.apache.flink.table.api.TableException:
AppendStreamTableSink doesn't support consuming update and delete changes
which is produced by node TableSourceScan(table=[[hive_catalog, cdc,
team]], fields=[team_id, team_name, create_time, update_time])

我的问题:
1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢?
2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> kafka,然后kafka
-> hive streaming? 谢谢!
3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么?

sql语句如下

CREATE DATABASE IF NOT EXISTS cdc

DROP TABLE IF EXISTS cdc.team

CREATE TABLE team(
    team_id BIGINT,
    team_name STRING,
    create_time TIMESTAMP,
    update_time TIMESTAMP,
proctime as proctime()
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = 'root',
  'database-name' = 'test',
  'table-name' = 'team'
)

CREATE DATABASE IF NOT EXISTS ods

DROP TABLE IF EXISTS ods.team

CREATE TABLE ods.team (
  team_id BIGINT,
  team_name STRING,
  create_time TIMESTAMP,
  update_time TIMESTAMP,
) PARTITIONED BY (
  ts_date STRING,
  ts_hour STRING,
  ts_minute STRING,
) STORED AS PARQUET TBLPROPERTIES (
  'sink.partition-commit.trigger' = 'partition-time',
  'sink.partition-commit.delay' = '1 min',
  'sink.partition-commit.policy.kind' = 'metastore,success-file',
  'partition.time-extractor.timestamp-pattern' = '$ts_date
$ts_hour:$ts_minute:00'
)

INSERT INTO ods.team
SELECT team_id, team_name, create_time, update_time,
  my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'),
  my_date_format(create_time,'HH', 'Asia/Shanghai'),
  my_date_format(create_time,'mm', 'Asia/Shanghai')
FROM cdc.team

回复