Hi, Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。 可以参考下[1]
[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html Zhou Zach <[email protected]> 于2020年6月12日周五 下午1:33写道: > SLF4J: Class path contains multiple SLF4J bindings. > > SLF4J: Found binding in > [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > > SLF4J: Found binding in > [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] > > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > > SLF4J: Actual binding is of type > [org.apache.logging.slf4j.Log4jLoggerFactory] > > ERROR StatusLogger No log4j2 configuration file found. Using default > configuration: logging only errors to the console. > > Exception in thread "main" org.apache.flink.table.api.TableException: > Cannot generate a valid execution plan for the given query: > > > > > FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`], > fields=[time, sum_age]) > > +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age]) > > +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner]) > > :- FlinkLogicalCalc(select=[uid, time]) > > : +- FlinkLogicalTableSourceScan(table=[[default_catalog, > default_database, user_behavior, source: [KafkaTableSource(uid, phoneType, > clickCount, time)]]], fields=[uid, phoneType, clickCount, time]) > > +- FlinkLogicalSnapshot(period=[$cor0.time]) > > +- FlinkLogicalCalc(select=[uid, age]) > > +- FlinkLogicalTableSourceScan(table=[[default_catalog, > default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex, > age, created_time)]]], fields=[uid, sex, age, created_time]) > > > > > Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left > table's proctime field, doesn't support 'PROCTIME()' > > Please check the documentation for the set of currently supported SQL > features. > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) > > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) > > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) > > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) > > at > org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90) > > at > org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) > > Caused by: org.apache.flink.table.api.TableException: Temporal table join > currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime > field, doesn't support 'PROCTIME()' > > at > org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67) > > at > org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147) > > at > org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161) > > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263) > > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) > > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) > > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) > > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) > > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) > > at > org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) > > at > org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) > > at > org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529) > > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324) > > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) > > ... 20 more > > > > > query: > > > val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment > val blinkEnvSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val streamTableEnv = > StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings) > > streamTableEnv.sqlUpdate( > """ > | > |CREATE TABLE user_behavior ( > | uid VARCHAR, > | phoneType VARCHAR, > | clickCount INT, > | `time` TIMESTAMP(3) > |) WITH ( > | 'connector.type' = 'kafka', > | 'connector.version' = 'universal', > | 'connector.topic' = 'user_behavior', > | 'connector.startup-mode' = 'earliest-offset', > | 'connector.properties.0.key' = 'zookeeper.connect', > | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181', > | 'connector.properties.1.key' = 'bootstrap.servers', > | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092', > | 'update-mode' = 'append', > | 'format.type' = 'json', > | 'format.derive-schema' = 'true' > |) > |""".stripMargin) > streamTableEnv.sqlUpdate( > """ > | > |CREATE TABLE user_cnt ( > | `time` VARCHAR, > | sum_age INT > |) WITH ( > | 'connector.type' = 'jdbc', > | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard', > | 'connector.table' = 'user_cnt', > | 'connector.username' = 'root', > | 'connector.password' = '123456', > | 'connector.write.flush.max-rows' = '1' > |) > |""".stripMargin) > val userTableSource = new MysqlAsyncLookupTableSource( > Array("uid", "sex", "age", "created_time"), > Array(), > Array(Types.STRING, Types.STRING, Types.INT, Types.STRING)) > streamTableEnv.registerTableSource("users", userTableSource) > streamTableEnv.sqlUpdate( > """ > | > |insert into user_cnt > |SELECT > | cast(b.`time` as string), u.age > |FROM > | user_behavior AS b > | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u > | ON b.uid = u.uid > | > |""".stripMargin) > streamTableEnv.execute("Temporal table join")
