看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。

Zhou Zach <[email protected]> 于2020年6月12日周五 下午3:47写道:

> 还是不行,
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type
> [org.apache.logging.slf4j.Log4jLoggerFactory]
> ERROR StatusLogger No log4j2 configuration file found. Using default
> configuration: logging only errors to the console.
> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
> SQL parse failed. Encountered "time FROM" at line 1, column 44.
> Was expecting one of:
>     "CURSOR" ...
>     "EXISTS" ...
>     "NOT" ...
>     "ROW" ...
>     "(" ...
>     "+" ...
>     "-" ...
>     <UNSIGNED_INTEGER_LITERAL> ...
>     <DECIMAL_NUMERIC_LITERAL> ...
>     <APPROX_NUMERIC_LITERAL> ...
>     <BINARY_STRING_LITERAL> ...
>     <PREFIXED_STRING_LITERAL> ...
>     <QUOTED_STRING> ...
>     <UNICODE_STRING_LITERAL> ...
>     "TRUE" ...
>     "FALSE" ...
>     "UNKNOWN" ...
>     "NULL" ...
>     <LBRACE_D> ...
>     <LBRACE_T> ...
>     <LBRACE_TS> ...
>     "DATE" ...
>     "TIME" <QUOTED_STRING> ...
>     "TIMESTAMP" ...
>     "INTERVAL" ...
>     "?" ...
>     "CAST" ...
>     "EXTRACT" ...
>     "POSITION" ...
>     "CONVERT" ...
>     "TRANSLATE" ...
>     "OVERLAY" ...
>     "FLOOR" ...
>     "CEIL" ...
>     "CEILING" ...
>     "SUBSTRING" ...
>     "TRIM" ...
>     "CLASSIFIER" ...
>     "MATCH_NUMBER" ...
>     "RUNNING" ...
>     "PREV" ...
>     "NEXT" ...
>     "JSON_EXISTS" ...
>     "JSON_VALUE" ...
>     "JSON_QUERY" ...
>     "JSON_OBJECT" ...
>     "JSON_OBJECTAGG" ...
>     "JSON_ARRAY" ...
>     "JSON_ARRAYAGG" ...
>     <LBRACE_FN> ...
>     "MULTISET" ...
>     "ARRAY" ...
>     "MAP" ...
>     "PERIOD" ...
>     "SPECIFIC" ...
>     <IDENTIFIER> ...
>     <QUOTED_IDENTIFIER> ...
>     <BACK_QUOTED_IDENTIFIER> ...
>     <BRACKET_QUOTED_IDENTIFIER> ...
>     <UNICODE_QUOTED_IDENTIFIER> ...
>     "ABS" ...
>     "AVG" ...
>     "CARDINALITY" ...
>     "CHAR_LENGTH" ...
>     "CHARACTER_LENGTH" ...
>     "COALESCE" ...
>     "COLLECT" ...
>     "COVAR_POP" ...
>     "COVAR_SAMP" ...
>     "CUME_DIST" ...
>     "COUNT" ...
>     "CURRENT_DATE" ...
>     "CURRENT_TIME" ...
>     "CURRENT_TIMESTAMP" ...
>     "DENSE_RANK" ...
>     "ELEMENT" ...
>     "EXP" ...
>     "FIRST_VALUE" ...
>     "FUSION" ...
>     "GROUPING" ...
>     "HOUR" ...
>     "LAG" ...
>     "LEAD" ...
>     "LEFT" ...
>     "LAST_VALUE" ...
>     "LN" ...
>     "LOCALTIME" ...
>     "LOCALTIMESTAMP" ...
>     "LOWER" ...
>     "MAX" ...
>     "MIN" ...
>     "MINUTE" ...
>     "MOD" ...
>     "MONTH" ...
>     "NTH_VALUE" ...
>     "NTILE" ...
>     "NULLIF" ...
>     "OCTET_LENGTH" ...
>     "PERCENT_RANK" ...
>     "POWER" ...
>     "RANK" ...
>     "REGR_COUNT" ...
>     "REGR_SXX" ...
>     "REGR_SYY" ...
>     "RIGHT" ...
>     "ROW_NUMBER" ...
>     "SECOND" ...
>     "SQRT" ...
>     "STDDEV_POP" ...
>     "STDDEV_SAMP" ...
>     "SUM" ...
>     "UPPER" ...
>     "TRUNCATE" ...
>     "USER" ...
>     "VAR_POP" ...
>     "VAR_SAMP" ...
>     "YEAR" ...
>     "CURRENT_CATALOG" ...
>     "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>     "CURRENT_PATH" ...
>     "CURRENT_ROLE" ...
>     "CURRENT_SCHEMA" ...
>     "CURRENT_USER" ...
>     "SESSION_USER" ...
>     "SYSTEM_USER" ...
>     "NEW" ...
>     "CASE" ...
>     "CURRENT" ...
>
> at
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
> at
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> at
> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63)
> at
> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>
>
> query:
>
>
> streamTableEnv.sqlUpdate(
> """
>     |
>     |CREATE TABLE user_behavior (
>     |    uid VARCHAR,
>     |    phoneType VARCHAR,
>     |    clickCount INT,
>     |    proctime AS PROCTIME(),
>     |    `time` TIMESTAMP(3)
>     |) WITH (
>     |    'connector.type' = 'kafka',
>     |    'connector.version' = 'universal',
>     |    'connector.topic' = 'user_behavior',
>     |    'connector.startup-mode' = 'earliest-offset',
>     |    'connector.properties.0.key' = 'zookeeper.connect',
>     |    'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>     |    'connector.properties.1.key' = 'bootstrap.servers',
>     |    'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>     |    'update-mode' = 'append',
>     |    'format.type' = 'json',
>     |    'format.derive-schema' = 'true'
>     |)
>     |""".stripMargin)
> streamTableEnv.sqlUpdate(
> """
>     |
>     |insert into  user_cnt
>     |SELECT
>     |  cast(b.`time` as string), u.age
>     |FROM
>     |  user_behavior AS b
>     |  JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u
>     |  ON b.uid = u.uid
>     |
>     |""".stripMargin)
>
>
>
>
>
>
> 不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME()
> 放在select 后面也不行。
>
>
>
>
>
>
>
>
> 在 2020-06-12 15:29:49,"Benchao Li" <[email protected]> 写道:
> >你写反了,是proctime AS PROCTIME()。
> >计算列跟普通query里面的AS是反着的。
> >
> >Zhou Zach <[email protected]> 于2020年6月12日周五 下午2:24写道:
> >
> >> flink 1.10.0:
> >> 在create table中,加PROCTIME() AS proctime字段报错
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-06-12 14:08:11,"Benchao Li" <[email protected]> 写道:
> >> >Hi,
> >> >
> >> >Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
> >> >可以参考下[1]
> >> >
> >> >[1]
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
> >> >
> >> >Zhou Zach <[email protected]> 于2020年6月12日周五 下午1:33写道:
> >> >
> >> >> SLF4J: Class path contains multiple SLF4J bindings.
> >> >>
> >> >> SLF4J: Found binding in
> >> >>
> >>
> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >> >>
> >> >> SLF4J: Found binding in
> >> >>
> >>
> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >> >>
> >> >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> >> >> explanation.
> >> >>
> >> >> SLF4J: Actual binding is of type
> >> >> [org.apache.logging.slf4j.Log4jLoggerFactory]
> >> >>
> >> >> ERROR StatusLogger No log4j2 configuration file found. Using default
> >> >> configuration: logging only errors to the console.
> >> >>
> >> >> Exception in thread "main" org.apache.flink.table.api.TableException:
> >> >> Cannot generate a valid execution plan for the given query:
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
> >> >> fields=[time, sum_age])
> >> >>
> >> >> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
> >> >>
> >> >>    +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
> >> >>
> >> >>       :- FlinkLogicalCalc(select=[uid, time])
> >> >>
> >> >>       :  +- FlinkLogicalTableSourceScan(table=[[default_catalog,
> >> >> default_database, user_behavior, source: [KafkaTableSource(uid,
> >> phoneType,
> >> >> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
> >> >>
> >> >>       +- FlinkLogicalSnapshot(period=[$cor0.time])
> >> >>
> >> >>          +- FlinkLogicalCalc(select=[uid, age])
> >> >>
> >> >>             +- FlinkLogicalTableSourceScan(table=[[default_catalog,
> >> >> default_database, users, source: [MysqlAsyncLookupTableSource(uid,
> sex,
> >> >> age, created_time)]]], fields=[uid, sex, age, created_time])
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF'
> left
> >> >> table's proctime field, doesn't support 'PROCTIME()'
> >> >>
> >> >> Please check the documentation for the set of currently supported SQL
> >> >> features.
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> >> >>
> >> >> at
> >> >>
> >>
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> >> >>
> >> >> at
> >> >>
> >>
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> >> >>
> >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >> >>
> >> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >> >>
> >> >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >> >>
> >> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >> >>
> >> >> at
> >> >>
> >>
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> >> >>
> >> >> at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> >> >>
> >> >> at
> >> >>
> >>
> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
> >> >>
> >> >> at
> >> >>
> >>
> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
> >> >>
> >> >> Caused by: org.apache.flink.table.api.TableException: Temporal table
> >> join
> >> >> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
> >> >> field, doesn't support 'PROCTIME()'
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
> >> >>
> >> >> at
> >> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
> >> >>
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
> >> >>
> >> >> ... 20 more
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> query:
> >> >>
> >> >>
> >> >> val streamExecutionEnv =
> >> StreamExecutionEnvironment.getExecutionEnvironment
> >> >> val blinkEnvSettings =
> >> >>
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >> >> val streamTableEnv =
> >> >> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
> >> >>
> >> >> streamTableEnv.sqlUpdate(
> >> >> """
> >> >>     |
> >> >>     |CREATE TABLE user_behavior (
> >> >>     |    uid VARCHAR,
> >> >>     |    phoneType VARCHAR,
> >> >>     |    clickCount INT,
> >> >>     |    `time` TIMESTAMP(3)
> >> >>     |) WITH (
> >> >>     |    'connector.type' = 'kafka',
> >> >>     |    'connector.version' = 'universal',
> >> >>     |    'connector.topic' = 'user_behavior',
> >> >>     |    'connector.startup-mode' = 'earliest-offset',
> >> >>     |    'connector.properties.0.key' = 'zookeeper.connect',
> >> >>     |    'connector.properties.0.value' =
> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
> >> >>     |    'connector.properties.1.key' = 'bootstrap.servers',
> >> >>     |    'connector.properties.1.value' =
> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
> >> >>     |    'update-mode' = 'append',
> >> >>     |    'format.type' = 'json',
> >> >>     |    'format.derive-schema' = 'true'
> >> >>     |)
> >> >>     |""".stripMargin)
> >> >> streamTableEnv.sqlUpdate(
> >> >> """
> >> >>     |
> >> >>     |CREATE TABLE user_cnt (
> >> >>     |    `time` VARCHAR,
> >> >>     |    sum_age INT
> >> >>     |) WITH (
> >> >>     |    'connector.type' = 'jdbc',
> >> >>     |    'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
> >> >>     |    'connector.table' = 'user_cnt',
> >> >>     |    'connector.username' = 'root',
> >> >>     |    'connector.password' = '123456',
> >> >>     |    'connector.write.flush.max-rows' = '1'
> >> >>     |)
> >> >>     |""".stripMargin)
> >> >> val userTableSource = new MysqlAsyncLookupTableSource(
> >> >> Array("uid", "sex", "age", "created_time"),
> >> >> Array(),
> >> >> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
> >> >> streamTableEnv.registerTableSource("users", userTableSource)
> >> >> streamTableEnv.sqlUpdate(
> >> >> """
> >> >>     |
> >> >>     |insert into  user_cnt
> >> >>     |SELECT
> >> >>     |  cast(b.`time` as string), u.age
> >> >>     |FROM
> >> >>     |  user_behavior AS b
> >> >>     |  JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
> >> >>     |  ON b.uid = u.uid
> >> >>     |
> >> >>     |""".stripMargin)
> >> >> streamTableEnv.execute("Temporal table join")
> >>
>

回复