SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in 
[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in 
[jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

ERROR StatusLogger No log4j2 configuration file found. Using default 
configuration: logging only errors to the console.

Exception in thread "main" org.apache.flink.table.api.TableException: Cannot 
generate a valid execution plan for the given query: 




FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`], 
fields=[time, sum_age])

+- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])

   +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])

      :- FlinkLogicalCalc(select=[uid, time])

      :  +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, user_behavior, source: [KafkaTableSource(uid, phoneType, 
clickCount, time)]]], fields=[uid, phoneType, clickCount, time])

      +- FlinkLogicalSnapshot(period=[$cor0.time])

         +- FlinkLogicalCalc(select=[uid, age])

            +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex, age, 
created_time)]]], fields=[uid, sex, age, created_time])




Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left 
table's proctime field, doesn't support 'PROCTIME()'

Please check the documentation for the set of currently supported SQL features.

at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)

at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)

at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)

at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)

at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)

at scala.collection.Iterator$class.foreach(Iterator.scala:891)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)

at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)

at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)

at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)

at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)

at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)

at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)

at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)

at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)

at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)

at 
org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)

at 
org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)

Caused by: org.apache.flink.table.api.TableException: Temporal table join 
currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, 
doesn't support 'PROCTIME()'

at 
org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)

at 
org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)

at 
org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)

at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)

at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)

at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)

at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)

at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)

at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)

at 
org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)

at 
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)

at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)

at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)

at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)

at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)

at 
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)

at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)

at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)

at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)

at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)

at 
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)

at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)

at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)

at 
org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)

at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)

at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)

... 20 more




query:


val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
val blinkEnvSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamTableEnv = 
StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)

streamTableEnv.sqlUpdate(
"""
    |
    |CREATE TABLE user_behavior (
    |    uid VARCHAR,
    |    phoneType VARCHAR,
    |    clickCount INT,
    |    `time` TIMESTAMP(3)
    |) WITH (
    |    'connector.type' = 'kafka',
    |    'connector.version' = 'universal',
    |    'connector.topic' = 'user_behavior',
    |    'connector.startup-mode' = 'earliest-offset',
    |    'connector.properties.0.key' = 'zookeeper.connect',
    |    'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
    |    'connector.properties.1.key' = 'bootstrap.servers',
    |    'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
    |    'update-mode' = 'append',
    |    'format.type' = 'json',
    |    'format.derive-schema' = 'true'
    |)
    |""".stripMargin)
streamTableEnv.sqlUpdate(
"""
    |
    |CREATE TABLE user_cnt (
    |    `time` VARCHAR,
    |    sum_age INT
    |) WITH (
    |    'connector.type' = 'jdbc',
    |    'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
    |    'connector.table' = 'user_cnt',
    |    'connector.username' = 'root',
    |    'connector.password' = '123456',
    |    'connector.write.flush.max-rows' = '1'
    |)
    |""".stripMargin)
val userTableSource = new MysqlAsyncLookupTableSource(
Array("uid", "sex", "age", "created_time"),
Array(),
Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
streamTableEnv.registerTableSource("users", userTableSource)
streamTableEnv.sqlUpdate(
"""
    |
    |insert into  user_cnt
    |SELECT
    |  cast(b.`time` as string), u.age
    |FROM
    |  user_behavior AS b
    |  JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
    |  ON b.uid = u.uid
    |
    |""".stripMargin)
streamTableEnv.execute("Temporal table join")

回复