改用 TEXTFILE 存储hive表数据以便下载hive文件观察内容
") STORED AS TEXTFILE TBLPROPERTIES ("这是生成的hive表建表语句 hive> show create table team; OK CREATE TABLE `team`( `team_id` int, `team_name` string, `create_time` string, `update_time` string, `op` string) PARTITIONED BY ( `dt` string, `hr` string, `mi` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat' LOCATION 'hdfs://localhost:9000/user/hive/warehouse/ods.db/team' TBLPROPERTIES ( 'is_generic'='false', 'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00', 'sink.partition-commit.delay'='1 min', 'sink.partition-commit.policy.kind'='metastore,success-file', 'sink.partition-commit.trigger'='partition-time', 'transient_lastDdlTime'='1604222266') Time taken: 0.252 seconds, Fetched: 25 row(s) 另外,下载了hive文件内容如下 1001<0x01>Sun<0x01>2020-10-31 11:25:38<0x01>2020-10-31 11:25:38<0x01>INSERT 还是查询不到结果 hive> select * from team; OK Time taken: 0.326 seconds 陈帅 <[email protected]> 于2020年11月1日周日 下午5:10写道: > > 之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。 > 生成的hive分区文件路径类似于 /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/ > part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3 > > 陈帅 <[email protected]> 于2020年11月1日周日 下午4:43写道: > >> 我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive >> shell查不到数据。 >> >> import com.alibaba.fastjson.JSON; >> import com.alibaba.fastjson.JSONObject; >> import org.apache.flink.api.common.serialization.SimpleStringSchema; >> import org.apache.flink.api.common.typeinfo.TypeInformation; >> import org.apache.flink.api.common.typeinfo.Types; >> import org.apache.flink.api.java.typeutils.RowTypeInfo; >> import org.apache.flink.streaming.api.CheckpointingMode; >> import org.apache.flink.streaming.api.TimeCharacteristic; >> import org.apache.flink.streaming.api.datastream.DataStream; >> import >> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; >> import >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >> import >> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; >> import org.apache.flink.streaming.api.windowing.time.Time; >> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; >> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; >> import org.apache.flink.table.api.EnvironmentSettings; >> import org.apache.flink.table.api.SqlDialect; >> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; >> import org.apache.flink.table.catalog.hive.HiveCatalog; >> import org.apache.flink.types.Row; >> import org.apache.flink.types.RowKind; >> >> import java.time.Duration; >> import java.time.Instant; >> import java.time.LocalDateTime; >> import java.time.ZoneId; >> import java.time.format.DateTimeFormatter; >> import java.util.Properties; >> >> public class MysqlCDC2Hive { >> >> private static final DateTimeFormatter dtf = >> DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); >> >> public static void main(String[] args) throws Exception { >> StreamExecutionEnvironment streamEnv = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> streamEnv.setParallelism(3); >> streamEnv.enableCheckpointing(60000); >> >> EnvironmentSettings tableEnvSettings = >> EnvironmentSettings.newInstance() >> .useBlinkPlanner() >> .inStreamingMode() >> .build(); >> StreamTableEnvironment tableEnv = >> StreamTableEnvironment.create(streamEnv, tableEnvSettings); >> >> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, >> CheckpointingMode.EXACTLY_ONCE); >> >> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, >> Duration.ofMinutes(1)); >> >> String catalogName = "hive_catalog"; >> HiveCatalog catalog = new HiveCatalog( >> catalogName, >> "default", >> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", >> "2.3.4" >> ); >> tableEnv.registerCatalog(catalogName, catalog); >> tableEnv.useCatalog(catalogName); >> >> MyDateFormat2 myDateFormat = new MyDateFormat2(); >> tableEnv.registerFunction("my_date_format", myDateFormat); >> >> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); >> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); >> tableEnv.executeSql("CREATE TABLE cdc.team(\n" + >> " team_id INT,\n" + >> " team_name STRING,\n" + >> " create_time TIMESTAMP,\n" + >> " update_time TIMESTAMP,\n" + >> " proctime as proctime()\n" + >> ") WITH (\n" + >> " 'connector' = 'mysql-cdc',\n" + >> " 'hostname' = 'localhost',\n" + >> " 'port' = '3306',\n" + >> " 'username' = 'root',\n" + >> " 'password' = 'root',\n" + >> " 'database-name' = 'test',\n" + >> " 'table-name' = 'team'\n" + >> ")"); >> >> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); >> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); >> tableEnv.executeSql("CREATE TABLE kafka.team (\n" + >> " team_id INT,\n" + >> " team_name STRING,\n" + >> " create_time TIMESTAMP,\n" + >> " update_time TIMESTAMP\n" + >> ") WITH (\n" + >> " 'connector' = 'kafka',\n" + >> " 'topic' = 'team',\n" + >> " 'scan.startup.mode' = 'earliest-offset',\n" + >> " 'properties.bootstrap.servers' = 'localhost:9092',\n" + >> " 'format' = 'changelog-json'\n" + >> ")"); >> >> tableEnv.executeSql("INSERT INTO kafka.team \n" + >> "SELECT team_id, team_name, create_time, update_time \n" + >> "FROM cdc.team"); >> >> // 定义带op字段的stream >> Properties properties = new Properties(); >> properties.setProperty("bootstrap.servers", "localhost:9092"); >> properties.setProperty("group.id", "test1`"); >> >> FlinkKafkaConsumerBase<String> consumer = new >> FlinkKafkaConsumer<>( >> "team", >> new SimpleStringSchema(), >> properties >> ).setStartFromEarliest(); >> >> DataStream<String> ds = streamEnv.addSource(consumer); >> >> String[] fieldNames = {"team_id", "team_name", "create_time", >> "update_time", "op"}; >> TypeInformation[] types = {Types.INT, Types.STRING, Types.STRING, >> Types.STRING, Types.STRING}; >> DataStream<Row> ds2 = ds.map(str -> { >> JSONObject jsonObject = JSON.parseObject(str); >> String op = jsonObject.getString("op"); >> JSONObject data = jsonObject.getJSONObject("data"); >> int arity = fieldNames.length; >> Row row = new Row(arity); >> row.setField(0, data.get("team_id")); >> row.setField(1, data.get("team_name")); >> row.setField(2, data.get("create_time")); >> row.setField(3, data.get("update_time")); >> String operation = getOperation(op); >> row.setField(4, operation); >> >> return row; >> }, new RowTypeInfo(types, fieldNames)) >> >> >> >> >> >> >> >> >> *.assignTimestampsAndWatermarks(new >> BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(1)) { >> @Override public long extractTimestamp(Row row) { >> String dt = (String) row.getField(2); LocalDateTime ldt = >> LocalDateTime.parse(dt, dtf); Instant instant = >> ldt.atZone(ZoneId.systemDefault()).toInstant(); long >> timeInMillis = instant.toEpochMilli(); return timeInMillis; >> } });* >> >> tableEnv.registerDataStream("merged_team", ds2); >> >> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); >> >> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); >> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); >> >> tableEnv.executeSql("CREATE TABLE ods.team (\n" + >> " team_id INT,\n" + >> " team_name STRING,\n" + >> " create_time STRING,\n" + >> " update_time STRING,\n" + >> " op STRING\n" + >> ") PARTITIONED BY (\n" + >> " dt STRING,\n" + >> " hr STRING,\n" + >> " mi STRING\n" + >> ") STORED AS PARQUET TBLPROPERTIES (\n" + >> " 'sink.partition-commit.trigger' = 'partition-time',\n" >> + >> " 'sink.partition-commit.delay' = '1 min',\n" + >> " 'sink.partition-commit.policy.kind' = >> 'metastore,success-file',\n" + >> " 'partition.time-extractor.timestamp-pattern' = '$dt >> $hr:$mi:00'\n" + >> ")"); >> >> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); >> tableEnv.executeSql("INSERT INTO ods.team \n" + >> "SELECT team_id, team_name, create_time, update_time, op, >> \n" + >> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >> HH:mm:ss'), 'yyyyMMdd') as dt, \n" + >> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >> HH:mm:ss'), 'HH') as hr, \n" + >> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >> HH:mm:ss'), 'mm') as mi \n" + >> "FROM merged_team"); >> tableEnv.execute("MysqlCDC2Hive2"); >> >> streamEnv.execute(""); >> } >> >> private static String getOperation(String op) { >> String operation = "INSERT"; >> for (RowKind rk : RowKind.values()) { >> if (rk.shortString().equals(op)) { >> switch (rk) { >> case UPDATE_BEFORE: >> case UPDATE_AFTER: >> operation = "UPDATE"; >> break; >> case DELETE: >> operation = "DELETE"; >> break; >> case INSERT: >> default: >> operation = "INSERT"; >> break; >> } >> break; >> } >> } >> return operation; >> } >> } >> >> Jark Wu <[email protected]> 于2020年11月1日周日 上午11:04写道: >> >>> 你检查一下 hive 文件是否正常生成了? >>> >>> 我看你上面的代码,kafka->hive 流程中是没有 watermark 的,而"partition-time" 的 trigger >>> policy 是基于 watermark 驱动的,所以可能是这个原因导致 hive 中没有数据。 >>> >>> Best, >>> Jark >>> >>> >>> [1]: >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#sink-partition-commit-trigger >>> >>> On Sat, 31 Oct 2020 at 17:25, 陈帅 <[email protected]> wrote: >>> >>>> 谢谢Jark细致解答,我按照你给的思路试了下。遇到一个问题是,在不开hive分区的情况下写入和读取是没有问题的,但在开启hive表时间分区后,写入是成功了,然而通过hive >>>> shell查不到数据,表结构是正确的。(代码我注释掉了) 能帮忙看下是哪里写得不对吗? >>>> >>>> cdc -> kafka示例消息如下 >>>> {"data":{"team_id":1001,"team_name":"Sun","create_time":"2020-10-31 >>>> 11:25:38","update_time":"2020-10-31 11:25:38"},"op":"+I"} >>>> >>>> import com.alibaba.fastjson.JSON; >>>> import com.alibaba.fastjson.JSONObject; >>>> import org.apache.flink.api.common.serialization.SimpleStringSchema; >>>> import org.apache.flink.api.common.typeinfo.TypeInformation; >>>> import org.apache.flink.api.common.typeinfo.Types; >>>> import org.apache.flink.api.java.typeutils.RowTypeInfo; >>>> import org.apache.flink.streaming.api.CheckpointingMode; >>>> import org.apache.flink.streaming.api.TimeCharacteristic; >>>> import org.apache.flink.streaming.api.datastream.DataStream; >>>> import >>>> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; >>>> import >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; >>>> import >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; >>>> import org.apache.flink.table.api.EnvironmentSettings; >>>> import org.apache.flink.table.api.SqlDialect; >>>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; >>>> import org.apache.flink.table.catalog.hive.HiveCatalog; >>>> import org.apache.flink.types.Row; >>>> import org.apache.flink.types.RowKind; >>>> >>>> import java.time.Duration; >>>> import java.util.Properties; >>>> >>>> public class MysqlCDC2Hive { >>>> public static void main(String[] args) throws Exception { >>>> StreamExecutionEnvironment streamEnv = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> >>>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>>> streamEnv.setParallelism(3); >>>> streamEnv.enableCheckpointing(60000); >>>> >>>> EnvironmentSettings tableEnvSettings = >>>> EnvironmentSettings.newInstance() >>>> .useBlinkPlanner() >>>> .inStreamingMode() >>>> .build(); >>>> StreamTableEnvironment tableEnv = >>>> StreamTableEnvironment.create(streamEnv, tableEnvSettings); >>>> >>>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, >>>> CheckpointingMode.EXACTLY_ONCE); >>>> >>>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, >>>> Duration.ofMinutes(1)); >>>> >>>> String catalogName = "hive_catalog"; >>>> HiveCatalog catalog = new HiveCatalog( >>>> catalogName, >>>> "default", >>>> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", >>>> "2.3.4" >>>> ); >>>> tableEnv.registerCatalog(catalogName, catalog); >>>> tableEnv.useCatalog(catalogName); >>>> >>>> MyDateFormat2 myDateFormat = new MyDateFormat2(); >>>> tableEnv.registerFunction("my_date_format", myDateFormat); >>>> >>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); >>>> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); >>>> tableEnv.executeSql("CREATE TABLE cdc.team(\n" + >>>> " team_id INT,\n" + >>>> " team_name STRING,\n" + >>>> " create_time TIMESTAMP,\n" + >>>> " update_time TIMESTAMP,\n" + >>>> " proctime as proctime()\n" + >>>> ") WITH (\n" + >>>> " 'connector' = 'mysql-cdc',\n" + >>>> " 'hostname' = 'localhost',\n" + >>>> " 'port' = '3306',\n" + >>>> " 'username' = 'root',\n" + >>>> " 'password' = 'root',\n" + >>>> " 'database-name' = 'test',\n" + >>>> " 'table-name' = 'team'\n" + >>>> ")"); >>>> >>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); >>>> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); >>>> tableEnv.executeSql("CREATE TABLE kafka.team (\n" + >>>> " team_id INT,\n" + >>>> " team_name STRING,\n" + >>>> " create_time TIMESTAMP,\n" + >>>> " update_time TIMESTAMP\n" + >>>> ") WITH (\n" + >>>> " 'connector' = 'kafka',\n" + >>>> " 'topic' = 'team',\n" + >>>> " 'scan.startup.mode' = 'earliest-offset',\n" + >>>> " 'properties.bootstrap.servers' = >>>> 'localhost:9092',\n" + >>>> " 'format' = 'changelog-json'\n" + >>>> ")"); >>>> >>>> tableEnv.executeSql("INSERT INTO kafka.team \n" + >>>> "SELECT team_id, team_name, create_time, update_time >>>> \n" + >>>> "FROM cdc.team"); >>>> >>>> // 定义带op字段的stream >>>> Properties properties = new Properties(); >>>> properties.setProperty("bootstrap.servers", "localhost:9092"); >>>> properties.setProperty("group.id", "test"); >>>> >>>> FlinkKafkaConsumerBase<String> consumer = new >>>> FlinkKafkaConsumer<>( >>>> "team", >>>> new SimpleStringSchema(), >>>> properties >>>> ).setStartFromEarliest(); >>>> >>>> DataStream<String> ds = streamEnv.addSource(consumer); >>>> >>>> String[] fieldNames = {"team_id", "team_name", "create_time", >>>> "update_time", "op"}; >>>> TypeInformation[] types = {Types.INT, Types.STRING, >>>> Types.STRING, Types.STRING, Types.STRING}; >>>> DataStream<Row> ds2 = ds.map(str -> { >>>> JSONObject jsonObject = JSON.parseObject(str); >>>> String op = jsonObject.getString("op"); >>>> JSONObject data = jsonObject.getJSONObject("data"); >>>> int arity = fieldNames.length; >>>> Row row = new Row(arity); >>>> row.setField(0, data.get("team_id")); >>>> row.setField(1, data.get("team_name")); >>>> row.setField(2, data.get("create_time")); >>>> row.setField(3, data.get("update_time")); >>>> String operation = getOperation(op); >>>> row.setField(4, operation); >>>> >>>> return row; >>>> }, new RowTypeInfo(types, fieldNames)); >>>> >>>> tableEnv.registerDataStream("merged_team", ds2); >>>> >>>> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); >>>> >>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); >>>> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); >>>> >>>> tableEnv.executeSql("CREATE TABLE ods.team (\n" + >>>> " team_id INT,\n" + >>>> " team_name STRING,\n" + >>>> " create_time STRING,\n" + >>>> " update_time STRING,\n" + >>>> " op STRING\n" + >>>> // ") PARTITIONED BY (\n" + >>>> // " ts_date STRING,\n" + >>>> // " ts_hour STRING,\n" + >>>> // " ts_minute STRING\n" + >>>> ") STORED AS PARQUET TBLPROPERTIES (\n" + >>>> " 'sink.partition-commit.trigger' = >>>> 'partition-time',\n" + >>>> " 'sink.partition-commit.delay' = '1 min',\n" + >>>> " 'sink.partition-commit.policy.kind' = >>>> 'metastore,success-file',\n" + >>>> " 'partition.time-extractor.timestamp-pattern' = >>>> '$ts_date $ts_hour:$ts_minute:00'\n" + >>>> ")"); >>>> >>>> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); >>>> tableEnv.executeSql("INSERT INTO ods.team \n" + >>>> "SELECT team_id, team_name, create_time, update_time, >>>> op \n" + >>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>>> HH:mm:ss'), 'yyyyMMdd') as ts_date, \n" + >>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>>> HH:mm:ss'), 'HH') as ts_hour, \n" + >>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>>> HH:mm:ss'), 'mm') as ts_minute \n" + >>>> "FROM merged_team"); >>>> tableEnv.execute("MysqlCDC2Hive2"); >>>> >>>> streamEnv.execute(""); >>>> } >>>> >>>> private static String getOperation(String op) { >>>> String operation = "INSERT"; >>>> for (RowKind rk : RowKind.values()) { >>>> if (rk.shortString().equals(op)) { >>>> switch (rk) { >>>> case UPDATE_BEFORE: >>>> case UPDATE_AFTER: >>>> operation = "UPDATE"; >>>> break; >>>> case DELETE: >>>> operation = "DELETE"; >>>> break; >>>> case INSERT: >>>> default: >>>> operation = "INSERT"; >>>> break; >>>> } >>>> break; >>>> } >>>> } >>>> return operation; >>>> } >>>> } >>>> >>>> Jark Wu <[email protected]> 于2020年10月31日周六 下午1:45写道: >>>> >>>>> 1. 是的。目前 Hive不支持直接消费 changlog ,这个主要原因是 hive 对 cdc 的支持不是很好。即使是 hive >>>>> ACID/transaction 功能,由于其与其他计算引擎集成的不好,也鲜有人用。 >>>>> >>>>> 2. cdc -> kafka -> hive streaming 的方案是可行的,不过 kafka -> hive streaming >>>>> 相当于原始数据同步,到 hive 中仍然是 cdc logs 内容,并没有实时合并,需要用户自己写 query 在 hive >>>>> 中进行合并。merge过程可以参考这篇文章[1]。 >>>>> >>>>> 3. 你可以 ts + INTERVAL '8' HOUR >>>>> >>>>> PS: 在1.12中,我们计划让 hive 也能直接写 changelog 数据,这样的话 cdc 可以直接 -> hive >>>>> streaming,不需要中间的 kafka。 不过到了 hive 中后,仍然需要另外写 query 将数据做实时merge。 >>>>> >>>>> Best, >>>>> Jark >>>>> >>>>> On Sat, 31 Oct 2020 at 13:26, 罗显宴 <[email protected]> wrote: >>>>> >>>>>> hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。 >>>>>> >>>>>> >>>>>> | | >>>>>> 罗显宴 >>>>>> | >>>>>> | >>>>>> 邮箱:[email protected] >>>>>> | >>>>>> >>>>>> 签名由 网易邮箱大师 定制 >>>>>> >>>>>> 在2020年10月31日 12:06,陈帅 写道: >>>>>> 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛 >>>>>> >>>>>> Exception in thread "main" org.apache.flink.table.api.TableException: >>>>>> AppendStreamTableSink doesn't support consuming update and delete >>>>>> changes >>>>>> which is produced by node TableSourceScan(table=[[hive_catalog, cdc, >>>>>> team]], fields=[team_id, team_name, create_time, update_time]) >>>>>> >>>>>> 我的问题: >>>>>> 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢? >>>>>> 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> >>>>>> kafka,然后kafka >>>>>> -> hive streaming? 谢谢! >>>>>> 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么? >>>>>> >>>>>> sql语句如下 >>>>>> >>>>>> CREATE DATABASE IF NOT EXISTS cdc >>>>>> >>>>>> DROP TABLE IF EXISTS cdc.team >>>>>> >>>>>> CREATE TABLE team( >>>>>> team_id BIGINT, >>>>>> team_name STRING, >>>>>> create_time TIMESTAMP, >>>>>> update_time TIMESTAMP, >>>>>> proctime as proctime() >>>>>> ) WITH ( >>>>>> 'connector' = 'mysql-cdc', >>>>>> 'hostname' = 'localhost', >>>>>> 'port' = '3306', >>>>>> 'username' = 'root', >>>>>> 'password' = 'root', >>>>>> 'database-name' = 'test', >>>>>> 'table-name' = 'team' >>>>>> ) >>>>>> >>>>>> CREATE DATABASE IF NOT EXISTS ods >>>>>> >>>>>> DROP TABLE IF EXISTS ods.team >>>>>> >>>>>> CREATE TABLE ods.team ( >>>>>> team_id BIGINT, >>>>>> team_name STRING, >>>>>> create_time TIMESTAMP, >>>>>> update_time TIMESTAMP, >>>>>> ) PARTITIONED BY ( >>>>>> ts_date STRING, >>>>>> ts_hour STRING, >>>>>> ts_minute STRING, >>>>>> ) STORED AS PARQUET TBLPROPERTIES ( >>>>>> 'sink.partition-commit.trigger' = 'partition-time', >>>>>> 'sink.partition-commit.delay' = '1 min', >>>>>> 'sink.partition-commit.policy.kind' = 'metastore,success-file', >>>>>> 'partition.time-extractor.timestamp-pattern' = '$ts_date >>>>>> $ts_hour:$ts_minute:00' >>>>>> ) >>>>>> >>>>>> INSERT INTO ods.team >>>>>> SELECT team_id, team_name, create_time, update_time, >>>>>> my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'), >>>>>> my_date_format(create_time,'HH', 'Asia/Shanghai'), >>>>>> my_date_format(create_time,'mm', 'Asia/Shanghai') >>>>>> FROM cdc.team >>>>>> >>>>>
