最后,在hive shell中执行 “msck repair table team;” 命令后就能查询到写的数据了,难道flink hive streaming不能自动注册hive分区吗?还是我使用的姿势不对?
陈帅 <[email protected]> 于2020年11月1日周日 下午5:24写道: > 改用 TEXTFILE 存储hive表数据以便下载hive文件观察内容 > ") STORED AS TEXTFILE TBLPROPERTIES (" > > 这是生成的hive表建表语句 > > hive> show create table team; > OK > CREATE TABLE `team`( > `team_id` int, > `team_name` string, > `create_time` string, > `update_time` string, > `op` string) > PARTITIONED BY ( > `dt` string, > `hr` string, > `mi` string) > ROW FORMAT SERDE > 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' > STORED AS INPUTFORMAT > 'org.apache.hadoop.mapred.TextInputFormat' > OUTPUTFORMAT > 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat' > LOCATION > 'hdfs://localhost:9000/user/hive/warehouse/ods.db/team' > TBLPROPERTIES ( > 'is_generic'='false', > 'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00', > 'sink.partition-commit.delay'='1 min', > 'sink.partition-commit.policy.kind'='metastore,success-file', > 'sink.partition-commit.trigger'='partition-time', > 'transient_lastDdlTime'='1604222266') > Time taken: 0.252 seconds, Fetched: 25 row(s) > > 另外,下载了hive文件内容如下 > 1001<0x01>Sun<0x01>2020-10-31 11:25:38<0x01>2020-10-31 11:25:38<0x01>INSERT > > 还是查询不到结果 > hive> select * from team; > OK > Time taken: 0.326 seconds > > 陈帅 <[email protected]> 于2020年11月1日周日 下午5:10写道: > >> >> 之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。 >> 生成的hive分区文件路径类似于 /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/ >> part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3 >> >> 陈帅 <[email protected]> 于2020年11月1日周日 下午4:43写道: >> >>> 我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive >>> shell查不到数据。 >>> >>> import com.alibaba.fastjson.JSON; >>> import com.alibaba.fastjson.JSONObject; >>> import org.apache.flink.api.common.serialization.SimpleStringSchema; >>> import org.apache.flink.api.common.typeinfo.TypeInformation; >>> import org.apache.flink.api.common.typeinfo.Types; >>> import org.apache.flink.api.java.typeutils.RowTypeInfo; >>> import org.apache.flink.streaming.api.CheckpointingMode; >>> import org.apache.flink.streaming.api.TimeCharacteristic; >>> import org.apache.flink.streaming.api.datastream.DataStream; >>> import >>> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; >>> import >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>> import >>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; >>> import org.apache.flink.streaming.api.windowing.time.Time; >>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; >>> import >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; >>> import org.apache.flink.table.api.EnvironmentSettings; >>> import org.apache.flink.table.api.SqlDialect; >>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; >>> import org.apache.flink.table.catalog.hive.HiveCatalog; >>> import org.apache.flink.types.Row; >>> import org.apache.flink.types.RowKind; >>> >>> import java.time.Duration; >>> import java.time.Instant; >>> import java.time.LocalDateTime; >>> import java.time.ZoneId; >>> import java.time.format.DateTimeFormatter; >>> import java.util.Properties; >>> >>> public class MysqlCDC2Hive { >>> >>> private static final DateTimeFormatter dtf = >>> DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); >>> >>> public static void main(String[] args) throws Exception { >>> StreamExecutionEnvironment streamEnv = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> >>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>> streamEnv.setParallelism(3); >>> streamEnv.enableCheckpointing(60000); >>> >>> EnvironmentSettings tableEnvSettings = >>> EnvironmentSettings.newInstance() >>> .useBlinkPlanner() >>> .inStreamingMode() >>> .build(); >>> StreamTableEnvironment tableEnv = >>> StreamTableEnvironment.create(streamEnv, tableEnvSettings); >>> >>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, >>> CheckpointingMode.EXACTLY_ONCE); >>> >>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, >>> Duration.ofMinutes(1)); >>> >>> String catalogName = "hive_catalog"; >>> HiveCatalog catalog = new HiveCatalog( >>> catalogName, >>> "default", >>> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", >>> "2.3.4" >>> ); >>> tableEnv.registerCatalog(catalogName, catalog); >>> tableEnv.useCatalog(catalogName); >>> >>> MyDateFormat2 myDateFormat = new MyDateFormat2(); >>> tableEnv.registerFunction("my_date_format", myDateFormat); >>> >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); >>> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); >>> tableEnv.executeSql("CREATE TABLE cdc.team(\n" + >>> " team_id INT,\n" + >>> " team_name STRING,\n" + >>> " create_time TIMESTAMP,\n" + >>> " update_time TIMESTAMP,\n" + >>> " proctime as proctime()\n" + >>> ") WITH (\n" + >>> " 'connector' = 'mysql-cdc',\n" + >>> " 'hostname' = 'localhost',\n" + >>> " 'port' = '3306',\n" + >>> " 'username' = 'root',\n" + >>> " 'password' = 'root',\n" + >>> " 'database-name' = 'test',\n" + >>> " 'table-name' = 'team'\n" + >>> ")"); >>> >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); >>> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); >>> tableEnv.executeSql("CREATE TABLE kafka.team (\n" + >>> " team_id INT,\n" + >>> " team_name STRING,\n" + >>> " create_time TIMESTAMP,\n" + >>> " update_time TIMESTAMP\n" + >>> ") WITH (\n" + >>> " 'connector' = 'kafka',\n" + >>> " 'topic' = 'team',\n" + >>> " 'scan.startup.mode' = 'earliest-offset',\n" + >>> " 'properties.bootstrap.servers' = 'localhost:9092',\n" >>> + >>> " 'format' = 'changelog-json'\n" + >>> ")"); >>> >>> tableEnv.executeSql("INSERT INTO kafka.team \n" + >>> "SELECT team_id, team_name, create_time, update_time \n" >>> + >>> "FROM cdc.team"); >>> >>> // 定义带op字段的stream >>> Properties properties = new Properties(); >>> properties.setProperty("bootstrap.servers", "localhost:9092"); >>> properties.setProperty("group.id", "test1`"); >>> >>> FlinkKafkaConsumerBase<String> consumer = new >>> FlinkKafkaConsumer<>( >>> "team", >>> new SimpleStringSchema(), >>> properties >>> ).setStartFromEarliest(); >>> >>> DataStream<String> ds = streamEnv.addSource(consumer); >>> >>> String[] fieldNames = {"team_id", "team_name", "create_time", >>> "update_time", "op"}; >>> TypeInformation[] types = {Types.INT, Types.STRING, >>> Types.STRING, Types.STRING, Types.STRING}; >>> DataStream<Row> ds2 = ds.map(str -> { >>> JSONObject jsonObject = JSON.parseObject(str); >>> String op = jsonObject.getString("op"); >>> JSONObject data = jsonObject.getJSONObject("data"); >>> int arity = fieldNames.length; >>> Row row = new Row(arity); >>> row.setField(0, data.get("team_id")); >>> row.setField(1, data.get("team_name")); >>> row.setField(2, data.get("create_time")); >>> row.setField(3, data.get("update_time")); >>> String operation = getOperation(op); >>> row.setField(4, operation); >>> >>> return row; >>> }, new RowTypeInfo(types, fieldNames)) >>> >>> >>> >>> >>> >>> >>> >>> >>> *.assignTimestampsAndWatermarks(new >>> BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(1)) { >>> @Override public long extractTimestamp(Row row) { >>> String dt = (String) row.getField(2); LocalDateTime ldt = >>> LocalDateTime.parse(dt, dtf); Instant instant = >>> ldt.atZone(ZoneId.systemDefault()).toInstant(); long >>> timeInMillis = instant.toEpochMilli(); return timeInMillis; >>> } });* >>> >>> tableEnv.registerDataStream("merged_team", ds2); >>> >>> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); >>> >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); >>> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); >>> >>> tableEnv.executeSql("CREATE TABLE ods.team (\n" + >>> " team_id INT,\n" + >>> " team_name STRING,\n" + >>> " create_time STRING,\n" + >>> " update_time STRING,\n" + >>> " op STRING\n" + >>> ") PARTITIONED BY (\n" + >>> " dt STRING,\n" + >>> " hr STRING,\n" + >>> " mi STRING\n" + >>> ") STORED AS PARQUET TBLPROPERTIES (\n" + >>> " 'sink.partition-commit.trigger' = >>> 'partition-time',\n" + >>> " 'sink.partition-commit.delay' = '1 min',\n" + >>> " 'sink.partition-commit.policy.kind' = >>> 'metastore,success-file',\n" + >>> " 'partition.time-extractor.timestamp-pattern' = '$dt >>> $hr:$mi:00'\n" + >>> ")"); >>> >>> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); >>> tableEnv.executeSql("INSERT INTO ods.team \n" + >>> "SELECT team_id, team_name, create_time, update_time, >>> op, \n" + >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>> HH:mm:ss'), 'yyyyMMdd') as dt, \n" + >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>> HH:mm:ss'), 'HH') as hr, \n" + >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>> HH:mm:ss'), 'mm') as mi \n" + >>> "FROM merged_team"); >>> tableEnv.execute("MysqlCDC2Hive2"); >>> >>> streamEnv.execute(""); >>> } >>> >>> private static String getOperation(String op) { >>> String operation = "INSERT"; >>> for (RowKind rk : RowKind.values()) { >>> if (rk.shortString().equals(op)) { >>> switch (rk) { >>> case UPDATE_BEFORE: >>> case UPDATE_AFTER: >>> operation = "UPDATE"; >>> break; >>> case DELETE: >>> operation = "DELETE"; >>> break; >>> case INSERT: >>> default: >>> operation = "INSERT"; >>> break; >>> } >>> break; >>> } >>> } >>> return operation; >>> } >>> } >>> >>> Jark Wu <[email protected]> 于2020年11月1日周日 上午11:04写道: >>> >>>> 你检查一下 hive 文件是否正常生成了? >>>> >>>> 我看你上面的代码,kafka->hive 流程中是没有 watermark 的,而"partition-time" 的 trigger >>>> policy 是基于 watermark 驱动的,所以可能是这个原因导致 hive 中没有数据。 >>>> >>>> Best, >>>> Jark >>>> >>>> >>>> [1]: >>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#sink-partition-commit-trigger >>>> >>>> On Sat, 31 Oct 2020 at 17:25, 陈帅 <[email protected]> wrote: >>>> >>>>> 谢谢Jark细致解答,我按照你给的思路试了下。遇到一个问题是,在不开hive分区的情况下写入和读取是没有问题的,但在开启hive表时间分区后,写入是成功了,然而通过hive >>>>> shell查不到数据,表结构是正确的。(代码我注释掉了) 能帮忙看下是哪里写得不对吗? >>>>> >>>>> cdc -> kafka示例消息如下 >>>>> {"data":{"team_id":1001,"team_name":"Sun","create_time":"2020-10-31 >>>>> 11:25:38","update_time":"2020-10-31 11:25:38"},"op":"+I"} >>>>> >>>>> import com.alibaba.fastjson.JSON; >>>>> import com.alibaba.fastjson.JSONObject; >>>>> import org.apache.flink.api.common.serialization.SimpleStringSchema; >>>>> import org.apache.flink.api.common.typeinfo.TypeInformation; >>>>> import org.apache.flink.api.common.typeinfo.Types; >>>>> import org.apache.flink.api.java.typeutils.RowTypeInfo; >>>>> import org.apache.flink.streaming.api.CheckpointingMode; >>>>> import org.apache.flink.streaming.api.TimeCharacteristic; >>>>> import org.apache.flink.streaming.api.datastream.DataStream; >>>>> import >>>>> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; >>>>> import >>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; >>>>> import >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; >>>>> import org.apache.flink.table.api.EnvironmentSettings; >>>>> import org.apache.flink.table.api.SqlDialect; >>>>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; >>>>> import org.apache.flink.table.catalog.hive.HiveCatalog; >>>>> import org.apache.flink.types.Row; >>>>> import org.apache.flink.types.RowKind; >>>>> >>>>> import java.time.Duration; >>>>> import java.util.Properties; >>>>> >>>>> public class MysqlCDC2Hive { >>>>> public static void main(String[] args) throws Exception { >>>>> StreamExecutionEnvironment streamEnv = >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>> >>>>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>>>> streamEnv.setParallelism(3); >>>>> streamEnv.enableCheckpointing(60000); >>>>> >>>>> EnvironmentSettings tableEnvSettings = >>>>> EnvironmentSettings.newInstance() >>>>> .useBlinkPlanner() >>>>> .inStreamingMode() >>>>> .build(); >>>>> StreamTableEnvironment tableEnv = >>>>> StreamTableEnvironment.create(streamEnv, tableEnvSettings); >>>>> >>>>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, >>>>> CheckpointingMode.EXACTLY_ONCE); >>>>> >>>>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, >>>>> Duration.ofMinutes(1)); >>>>> >>>>> String catalogName = "hive_catalog"; >>>>> HiveCatalog catalog = new HiveCatalog( >>>>> catalogName, >>>>> "default", >>>>> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", >>>>> "2.3.4" >>>>> ); >>>>> tableEnv.registerCatalog(catalogName, catalog); >>>>> tableEnv.useCatalog(catalogName); >>>>> >>>>> MyDateFormat2 myDateFormat = new MyDateFormat2(); >>>>> tableEnv.registerFunction("my_date_format", myDateFormat); >>>>> >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); >>>>> tableEnv.executeSql("CREATE TABLE cdc.team(\n" + >>>>> " team_id INT,\n" + >>>>> " team_name STRING,\n" + >>>>> " create_time TIMESTAMP,\n" + >>>>> " update_time TIMESTAMP,\n" + >>>>> " proctime as proctime()\n" + >>>>> ") WITH (\n" + >>>>> " 'connector' = 'mysql-cdc',\n" + >>>>> " 'hostname' = 'localhost',\n" + >>>>> " 'port' = '3306',\n" + >>>>> " 'username' = 'root',\n" + >>>>> " 'password' = 'root',\n" + >>>>> " 'database-name' = 'test',\n" + >>>>> " 'table-name' = 'team'\n" + >>>>> ")"); >>>>> >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); >>>>> tableEnv.executeSql("CREATE TABLE kafka.team (\n" + >>>>> " team_id INT,\n" + >>>>> " team_name STRING,\n" + >>>>> " create_time TIMESTAMP,\n" + >>>>> " update_time TIMESTAMP\n" + >>>>> ") WITH (\n" + >>>>> " 'connector' = 'kafka',\n" + >>>>> " 'topic' = 'team',\n" + >>>>> " 'scan.startup.mode' = 'earliest-offset',\n" + >>>>> " 'properties.bootstrap.servers' = >>>>> 'localhost:9092',\n" + >>>>> " 'format' = 'changelog-json'\n" + >>>>> ")"); >>>>> >>>>> tableEnv.executeSql("INSERT INTO kafka.team \n" + >>>>> "SELECT team_id, team_name, create_time, update_time >>>>> \n" + >>>>> "FROM cdc.team"); >>>>> >>>>> // 定义带op字段的stream >>>>> Properties properties = new Properties(); >>>>> properties.setProperty("bootstrap.servers", "localhost:9092"); >>>>> properties.setProperty("group.id", "test"); >>>>> >>>>> FlinkKafkaConsumerBase<String> consumer = new >>>>> FlinkKafkaConsumer<>( >>>>> "team", >>>>> new SimpleStringSchema(), >>>>> properties >>>>> ).setStartFromEarliest(); >>>>> >>>>> DataStream<String> ds = streamEnv.addSource(consumer); >>>>> >>>>> String[] fieldNames = {"team_id", "team_name", "create_time", >>>>> "update_time", "op"}; >>>>> TypeInformation[] types = {Types.INT, Types.STRING, >>>>> Types.STRING, Types.STRING, Types.STRING}; >>>>> DataStream<Row> ds2 = ds.map(str -> { >>>>> JSONObject jsonObject = JSON.parseObject(str); >>>>> String op = jsonObject.getString("op"); >>>>> JSONObject data = jsonObject.getJSONObject("data"); >>>>> int arity = fieldNames.length; >>>>> Row row = new Row(arity); >>>>> row.setField(0, data.get("team_id")); >>>>> row.setField(1, data.get("team_name")); >>>>> row.setField(2, data.get("create_time")); >>>>> row.setField(3, data.get("update_time")); >>>>> String operation = getOperation(op); >>>>> row.setField(4, operation); >>>>> >>>>> return row; >>>>> }, new RowTypeInfo(types, fieldNames)); >>>>> >>>>> tableEnv.registerDataStream("merged_team", ds2); >>>>> >>>>> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); >>>>> >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); >>>>> >>>>> tableEnv.executeSql("CREATE TABLE ods.team (\n" + >>>>> " team_id INT,\n" + >>>>> " team_name STRING,\n" + >>>>> " create_time STRING,\n" + >>>>> " update_time STRING,\n" + >>>>> " op STRING\n" + >>>>> // ") PARTITIONED BY (\n" + >>>>> // " ts_date STRING,\n" + >>>>> // " ts_hour STRING,\n" + >>>>> // " ts_minute STRING\n" + >>>>> ") STORED AS PARQUET TBLPROPERTIES (\n" + >>>>> " 'sink.partition-commit.trigger' = >>>>> 'partition-time',\n" + >>>>> " 'sink.partition-commit.delay' = '1 min',\n" + >>>>> " 'sink.partition-commit.policy.kind' = >>>>> 'metastore,success-file',\n" + >>>>> " 'partition.time-extractor.timestamp-pattern' = >>>>> '$ts_date $ts_hour:$ts_minute:00'\n" + >>>>> ")"); >>>>> >>>>> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); >>>>> tableEnv.executeSql("INSERT INTO ods.team \n" + >>>>> "SELECT team_id, team_name, create_time, update_time, >>>>> op \n" + >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>>>> HH:mm:ss'), 'yyyyMMdd') as ts_date, \n" + >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>>>> HH:mm:ss'), 'HH') as ts_hour, \n" + >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>>>> HH:mm:ss'), 'mm') as ts_minute \n" + >>>>> "FROM merged_team"); >>>>> tableEnv.execute("MysqlCDC2Hive2"); >>>>> >>>>> streamEnv.execute(""); >>>>> } >>>>> >>>>> private static String getOperation(String op) { >>>>> String operation = "INSERT"; >>>>> for (RowKind rk : RowKind.values()) { >>>>> if (rk.shortString().equals(op)) { >>>>> switch (rk) { >>>>> case UPDATE_BEFORE: >>>>> case UPDATE_AFTER: >>>>> operation = "UPDATE"; >>>>> break; >>>>> case DELETE: >>>>> operation = "DELETE"; >>>>> break; >>>>> case INSERT: >>>>> default: >>>>> operation = "INSERT"; >>>>> break; >>>>> } >>>>> break; >>>>> } >>>>> } >>>>> return operation; >>>>> } >>>>> } >>>>> >>>>> Jark Wu <[email protected]> 于2020年10月31日周六 下午1:45写道: >>>>> >>>>>> 1. 是的。目前 Hive不支持直接消费 changlog ,这个主要原因是 hive 对 cdc 的支持不是很好。即使是 hive >>>>>> ACID/transaction 功能,由于其与其他计算引擎集成的不好,也鲜有人用。 >>>>>> >>>>>> 2. cdc -> kafka -> hive streaming 的方案是可行的,不过 kafka -> hive streaming >>>>>> 相当于原始数据同步,到 hive 中仍然是 cdc logs 内容,并没有实时合并,需要用户自己写 query 在 hive >>>>>> 中进行合并。merge过程可以参考这篇文章[1]。 >>>>>> >>>>>> 3. 你可以 ts + INTERVAL '8' HOUR >>>>>> >>>>>> PS: 在1.12中,我们计划让 hive 也能直接写 changelog 数据,这样的话 cdc 可以直接 -> hive >>>>>> streaming,不需要中间的 kafka。 不过到了 hive 中后,仍然需要另外写 query 将数据做实时merge。 >>>>>> >>>>>> Best, >>>>>> Jark >>>>>> >>>>>> On Sat, 31 Oct 2020 at 13:26, 罗显宴 <[email protected]> wrote: >>>>>> >>>>>>> hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。 >>>>>>> >>>>>>> >>>>>>> | | >>>>>>> 罗显宴 >>>>>>> | >>>>>>> | >>>>>>> 邮箱:[email protected] >>>>>>> | >>>>>>> >>>>>>> 签名由 网易邮箱大师 定制 >>>>>>> >>>>>>> 在2020年10月31日 12:06,陈帅 写道: >>>>>>> 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛 >>>>>>> >>>>>>> Exception in thread "main" org.apache.flink.table.api.TableException: >>>>>>> AppendStreamTableSink doesn't support consuming update and delete >>>>>>> changes >>>>>>> which is produced by node TableSourceScan(table=[[hive_catalog, cdc, >>>>>>> team]], fields=[team_id, team_name, create_time, update_time]) >>>>>>> >>>>>>> 我的问题: >>>>>>> 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢? >>>>>>> 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> >>>>>>> kafka,然后kafka >>>>>>> -> hive streaming? 谢谢! >>>>>>> 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么? >>>>>>> >>>>>>> sql语句如下 >>>>>>> >>>>>>> CREATE DATABASE IF NOT EXISTS cdc >>>>>>> >>>>>>> DROP TABLE IF EXISTS cdc.team >>>>>>> >>>>>>> CREATE TABLE team( >>>>>>> team_id BIGINT, >>>>>>> team_name STRING, >>>>>>> create_time TIMESTAMP, >>>>>>> update_time TIMESTAMP, >>>>>>> proctime as proctime() >>>>>>> ) WITH ( >>>>>>> 'connector' = 'mysql-cdc', >>>>>>> 'hostname' = 'localhost', >>>>>>> 'port' = '3306', >>>>>>> 'username' = 'root', >>>>>>> 'password' = 'root', >>>>>>> 'database-name' = 'test', >>>>>>> 'table-name' = 'team' >>>>>>> ) >>>>>>> >>>>>>> CREATE DATABASE IF NOT EXISTS ods >>>>>>> >>>>>>> DROP TABLE IF EXISTS ods.team >>>>>>> >>>>>>> CREATE TABLE ods.team ( >>>>>>> team_id BIGINT, >>>>>>> team_name STRING, >>>>>>> create_time TIMESTAMP, >>>>>>> update_time TIMESTAMP, >>>>>>> ) PARTITIONED BY ( >>>>>>> ts_date STRING, >>>>>>> ts_hour STRING, >>>>>>> ts_minute STRING, >>>>>>> ) STORED AS PARQUET TBLPROPERTIES ( >>>>>>> 'sink.partition-commit.trigger' = 'partition-time', >>>>>>> 'sink.partition-commit.delay' = '1 min', >>>>>>> 'sink.partition-commit.policy.kind' = 'metastore,success-file', >>>>>>> 'partition.time-extractor.timestamp-pattern' = '$ts_date >>>>>>> $ts_hour:$ts_minute:00' >>>>>>> ) >>>>>>> >>>>>>> INSERT INTO ods.team >>>>>>> SELECT team_id, team_name, create_time, update_time, >>>>>>> my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'), >>>>>>> my_date_format(create_time,'HH', 'Asia/Shanghai'), >>>>>>> my_date_format(create_time,'mm', 'Asia/Shanghai') >>>>>>> FROM cdc.team >>>>>>> >>>>>>
