各位老师好,我最近在做Flink查询Hive的功能,需要用到窗口处理数据,在编写代码过程中无法设置水印,我看官网看到Table API & SQL
设置事件时间有三种方式:
1、在 DDL 中定义
2、在 DataStream 到 Table 转换时定义
3、使用 TableSource 定义
而我使用的是HiveCatalog查询hive,貌似用不上以上三种方式。所以我想问问各位老师,有没有一种办法可以直接在Table上设置某个字段为事件事件,并且设置水印?
另外说明,我的第一版代码是将Table转换为DataSteam,然后再设置水印和窗口,但是执行转换过程非常耗时,并且在源码中
toDataSteam()方法的注释上也说“表生态系统的类型系统比DataStream API的类型系统更丰富”,因此开始考虑使用Table或SQL解决问题。
以下是我的第一版代码
// flink 集成 hive
System.out.println("初始化Flink环境");
String hiveVersion = "3.1.2";
String catalogName = "myhive";
String defaultDatabase = "dwd_1580_egd_finishing_mill_lv1_202302";
String hiveConfDir = "/usr/hdp/3.1.4.0-315/apache-hive-3.1.2-bin/conf";
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
System.out.println("定义hive环境");
// 定义 hive catalog 参数:catalog名称、数据库名称、对象名称
HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir,
hiveVersion);
tableEnv.registerCatalog(catalogName, hive);
// 将 HiveCatalog 设置为 session 的当前 catalog
tableEnv.useCatalog(catalogName);
tableEnv.useDatabase(defaultDatabase);
// 设置 hive 并行度
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.setInteger("table.exec.hive.infer-source-parallelism.max",
sourceParallelism); // Default 1000
// 使用 HiveTableSource
System.out.println("定义查询条件");
// 定义查询条件
Table table = tableEnv
.from(catalogName + "." + databaseName + "." + tableName)
.select(DATETIME + "," + fields + "," + YEAR + "," + MONTH + "," + DAY
+ "," + HOUR)
.filter($(YEAR).isEqual(year))
.filter($(MONTH).isEqual(startMonth))
.filter($(DAY).isGreaterOrEqual(startDay))
.filter($(HOUR).isGreaterOrEqual(startHour))
.filter($(DAY).isLessOrEqual(endDay))
.filter($(HOUR).isLessOrEqual(endHour));
tableEnv.createTemporaryView("myTable", table);
// Table 转 Stream,非常耗时
System.out.println("Table to Stream");
DataStream<Row> resultStream = tableEnv.toDataStream(table);
// 水印及窗口设置
System.out.println("水印及窗口");
resultStream
.assignTimestampsAndWatermarks(WatermarkStrategy.<Row>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((SerializableTimestampAssigner<Row>)
(element, recordTimestamp) -> {
long datetime = 0;
try {
datetime = new SimpleDateFormat(DATEFORMAT)
.parse(element.getFieldAs(DATETIME).toString())
.getTime();
} catch (ParseException e) {
e.printStackTrace();
}
return datetime;
}))
.windowAll(TumblingEventTimeWindows.of(Time.seconds(windowTime)))
| |
ZhaoShuKang
|
|
[email protected]
|