SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console. Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`], fields=[time, sum_age]) +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age]) +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner]) :- FlinkLogicalCalc(select=[uid, time]) : +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, user_behavior, source: [KafkaTableSource(uid, phoneType, clickCount, time)]]], fields=[uid, phoneType, clickCount, time]) +- FlinkLogicalSnapshot(period=[$cor0.time]) +- FlinkLogicalCalc(select=[uid, age]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex, age, created_time)]]], fields=[uid, sex, age, created_time]) Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' Please check the documentation for the set of currently supported SQL features. at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) at org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90) at org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) Caused by: org.apache.flink.table.api.TableException: Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' at org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67) at org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147) at org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) at org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247) at org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) at org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) ... 20 more query: val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment val blinkEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings) streamTableEnv.sqlUpdate( """ | |CREATE TABLE user_behavior ( | uid VARCHAR, | phoneType VARCHAR, | clickCount INT, | `time` TIMESTAMP(3) |) WITH ( | 'connector.type' = 'kafka', | 'connector.version' = 'universal', | 'connector.topic' = 'user_behavior', | 'connector.startup-mode' = 'earliest-offset', | 'connector.properties.0.key' = 'zookeeper.connect', | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181', | 'connector.properties.1.key' = 'bootstrap.servers', | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092', | 'update-mode' = 'append', | 'format.type' = 'json', | 'format.derive-schema' = 'true' |) |""".stripMargin) streamTableEnv.sqlUpdate( """ | |CREATE TABLE user_cnt ( | `time` VARCHAR, | sum_age INT |) WITH ( | 'connector.type' = 'jdbc', | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard', | 'connector.table' = 'user_cnt', | 'connector.username' = 'root', | 'connector.password' = '123456', | 'connector.write.flush.max-rows' = '1' |) |""".stripMargin) val userTableSource = new MysqlAsyncLookupTableSource( Array("uid", "sex", "age", "created_time"), Array(), Array(Types.STRING, Types.STRING, Types.INT, Types.STRING)) streamTableEnv.registerTableSource("users", userTableSource) streamTableEnv.sqlUpdate( """ | |insert into user_cnt |SELECT | cast(b.`time` as string), u.age |FROM | user_behavior AS b | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u | ON b.uid = u.uid | |""".stripMargin) streamTableEnv.execute("Temporal table join")
