1. 是的。目前 Hive不支持直接消费 changlog ,这个主要原因是 hive 对 cdc 的支持不是很好。即使是 hive ACID/transaction 功能,由于其与其他计算引擎集成的不好,也鲜有人用。
2. cdc -> kafka -> hive streaming 的方案是可行的,不过 kafka -> hive streaming 相当于原始数据同步,到 hive 中仍然是 cdc logs 内容,并没有实时合并,需要用户自己写 query 在 hive 中进行合并。merge过程可以参考这篇文章[1]。 3. 你可以 ts + INTERVAL '8' HOUR PS: 在1.12中,我们计划让 hive 也能直接写 changelog 数据,这样的话 cdc 可以直接 -> hive streaming,不需要中间的 kafka。 不过到了 hive 中后,仍然需要另外写 query 将数据做实时merge。 Best, Jark On Sat, 31 Oct 2020 at 13:26, 罗显宴 <[email protected]> wrote: > hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。 > > > | | > 罗显宴 > | > | > 邮箱:[email protected] > | > > 签名由 网易邮箱大师 定制 > > 在2020年10月31日 12:06,陈帅 写道: > 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛 > > Exception in thread "main" org.apache.flink.table.api.TableException: > AppendStreamTableSink doesn't support consuming update and delete changes > which is produced by node TableSourceScan(table=[[hive_catalog, cdc, > team]], fields=[team_id, team_name, create_time, update_time]) > > 我的问题: > 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢? > 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> kafka,然后kafka > -> hive streaming? 谢谢! > 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么? > > sql语句如下 > > CREATE DATABASE IF NOT EXISTS cdc > > DROP TABLE IF EXISTS cdc.team > > CREATE TABLE team( > team_id BIGINT, > team_name STRING, > create_time TIMESTAMP, > update_time TIMESTAMP, > proctime as proctime() > ) WITH ( > 'connector' = 'mysql-cdc', > 'hostname' = 'localhost', > 'port' = '3306', > 'username' = 'root', > 'password' = 'root', > 'database-name' = 'test', > 'table-name' = 'team' > ) > > CREATE DATABASE IF NOT EXISTS ods > > DROP TABLE IF EXISTS ods.team > > CREATE TABLE ods.team ( > team_id BIGINT, > team_name STRING, > create_time TIMESTAMP, > update_time TIMESTAMP, > ) PARTITIONED BY ( > ts_date STRING, > ts_hour STRING, > ts_minute STRING, > ) STORED AS PARQUET TBLPROPERTIES ( > 'sink.partition-commit.trigger' = 'partition-time', > 'sink.partition-commit.delay' = '1 min', > 'sink.partition-commit.policy.kind' = 'metastore,success-file', > 'partition.time-extractor.timestamp-pattern' = '$ts_date > $ts_hour:$ts_minute:00' > ) > > INSERT INTO ods.team > SELECT team_id, team_name, create_time, update_time, > my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'), > my_date_format(create_time,'HH', 'Asia/Shanghai'), > my_date_format(create_time,'mm', 'Asia/Shanghai') > FROM cdc.team >
