????UserBehavior????ts_row??????LocalDateTime??????????????????????????yyyy-MM-dd
HH:mm:ss????????????????????
Table behaviorTable = tEnv.fromDataStream(behaviorStream, Schema.newBuilder()
.column("ts_row",
"TIMESTAMP(3)")
.column("uid", "BIGINT")
.column("ts", "BIGINT")
.watermark("ts_row",
"ts_row - INTERVAL '10' SECOND")
.build());
???????????????????????????? DataStream ?????? Table
???????? DataStream ?????????????????? Flink
?????? rowtime ?????????? TIMESTAMP WITHOUT TIME
ZONE ?????????????????????????????????? UTC ??????????
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/concepts/time_attributes/
------------------ ???????? ------------------
??????:
"user-zh"
<[email protected]>;
????????: 2022??5??13??(??????) ????10:07
??????: "user-zh"<[email protected]>;
????: Re:??????DataStream ???? Table ??????????????
????????????
?? 2022-05-13 21:52:51??"renzhaojin" <[email protected]> ??????
>hello??????????????????????
>Configuration configuration =
tableEnvironment.getConfig().getConfiguration();
>
>configuration.setString("table.local-time-zone", "Asia/Shanghai");
>
>
>?? 2022??5??13?? 21:40??????<[email protected]> ??????
>
>
>DataStream ???? Table ?????????????????????????????????????????????? ```
DataStream<UserBehavior> sourceStream = env.fromElements( new
UserBehavior(1001L, 3827899L, 2920476L, "pv", 1511713473000L, "2017-11-27
00:24:33"), new UserBehavior(1001L, 3745169L, 2891509L, "pv", 1511725471000L,
"2017-11-27 03:44:31"), new UserBehavior(1001L, 1531036L, 2920476L, "pv",
1511733732000L, "2017-11-27 06:02:12"), new UserBehavior(1001L, 2266567L,
4145813L, "pv", 1511741471000L, "2017-11-27 08:11:11"), new UserBehavior(1001L,
2951368L, 1080785L, "pv", 1511750828000L, "2017-11-27 10:47:08"), new
UserBehavior(1002L, 5002615L, 2520377L, "pv", 1511752985000L, "2017-11-27
11:23:05") ); // ???????????????? Watermark DataStream<UserBehavior>
behaviorStream = sourceStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<UserBehavior>forBoundedOutOfOrderness(Duration.ofMinutes(1))
.withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() {
@Override public long extractTimestamp(UserBehavior behavior, long
recordTimestamp) { return behavior.getTs(); } }) ); // ?????????? Table
behaviorTable = tEnv.fromDataStream(behaviorStream, $("uid"), $("ts"),
$("ts_row").rowtime()); tEnv.createTemporaryView("user_behavior",
behaviorTable); Table resultTable = tEnv.sqlQuery("SELECT uid, ts, ts_row FROM
user_behavior"); DataStream<Row> resultStream =
tEnv.toAppendStream(resultTable, Row.class); resultStream.print(); ```
?????????? ``` 3> +I[1001, 1511725471000, 2017-11-26T19:44:31] 4>
+I[1001, 1511733732000, 2017-11-26T22:02:12] 3> +I[1002, 1511752985000,
2017-11-27T03:23:05] 2> +I[1001, 1511713473000, 2017-11-26T16:24:33] 2>
+I[1001, 1511750828000, 2017-11-27T02:47:08] 1> +I[1001, 1511741471000,
2017-11-27T00:11:11] ``` 1511725471000 -> 2017-11-27 03:44:31
????????????????