flink 1.10.0:
在create table中,加PROCTIME() AS proctime字段报错
















在 2020-06-12 14:08:11,"Benchao Li" <[email protected]> 写道:
>Hi,
>
>Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
>可以参考下[1]
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>
>Zhou Zach <[email protected]> 于2020年6月12日周五 下午1:33写道:
>
>> SLF4J: Class path contains multiple SLF4J bindings.
>>
>> SLF4J: Found binding in
>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>
>> SLF4J: Found binding in
>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>>
>> SLF4J: Actual binding is of type
>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>
>> ERROR StatusLogger No log4j2 configuration file found. Using default
>> configuration: logging only errors to the console.
>>
>> Exception in thread "main" org.apache.flink.table.api.TableException:
>> Cannot generate a valid execution plan for the given query:
>>
>>
>>
>>
>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
>> fields=[time, sum_age])
>>
>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>>
>>    +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>>
>>       :- FlinkLogicalCalc(select=[uid, time])
>>
>>       :  +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>> default_database, user_behavior, source: [KafkaTableSource(uid, phoneType,
>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>>
>>       +- FlinkLogicalSnapshot(period=[$cor0.time])
>>
>>          +- FlinkLogicalCalc(select=[uid, age])
>>
>>             +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex,
>> age, created_time)]]], fields=[uid, sex, age, created_time])
>>
>>
>>
>>
>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left
>> table's proctime field, doesn't support 'PROCTIME()'
>>
>> Please check the documentation for the set of currently supported SQL
>> features.
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>>
>> at
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>
>> at
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>
>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>
>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>
>> at
>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>>
>> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>>
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>>
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>>
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>>
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>>
>> at
>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>>
>> at
>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>
>> Caused by: org.apache.flink.table.api.TableException: Temporal table join
>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
>> field, doesn't support 'PROCTIME()'
>>
>> at
>> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>>
>> at
>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
>>
>> at
>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>>
>> at
>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>>
>> at
>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>>
>> at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
>>
>> at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
>>
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>>
>> ... 20 more
>>
>>
>>
>>
>> query:
>>
>>
>> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>> val blinkEnvSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>> val streamTableEnv =
>> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
>>
>> streamTableEnv.sqlUpdate(
>> """
>>     |
>>     |CREATE TABLE user_behavior (
>>     |    uid VARCHAR,
>>     |    phoneType VARCHAR,
>>     |    clickCount INT,
>>     |    `time` TIMESTAMP(3)
>>     |) WITH (
>>     |    'connector.type' = 'kafka',
>>     |    'connector.version' = 'universal',
>>     |    'connector.topic' = 'user_behavior',
>>     |    'connector.startup-mode' = 'earliest-offset',
>>     |    'connector.properties.0.key' = 'zookeeper.connect',
>>     |    'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>>     |    'connector.properties.1.key' = 'bootstrap.servers',
>>     |    'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>>     |    'update-mode' = 'append',
>>     |    'format.type' = 'json',
>>     |    'format.derive-schema' = 'true'
>>     |)
>>     |""".stripMargin)
>> streamTableEnv.sqlUpdate(
>> """
>>     |
>>     |CREATE TABLE user_cnt (
>>     |    `time` VARCHAR,
>>     |    sum_age INT
>>     |) WITH (
>>     |    'connector.type' = 'jdbc',
>>     |    'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>>     |    'connector.table' = 'user_cnt',
>>     |    'connector.username' = 'root',
>>     |    'connector.password' = '123456',
>>     |    'connector.write.flush.max-rows' = '1'
>>     |)
>>     |""".stripMargin)
>> val userTableSource = new MysqlAsyncLookupTableSource(
>> Array("uid", "sex", "age", "created_time"),
>> Array(),
>> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
>> streamTableEnv.registerTableSource("users", userTableSource)
>> streamTableEnv.sqlUpdate(
>> """
>>     |
>>     |insert into  user_cnt
>>     |SELECT
>>     |  cast(b.`time` as string), u.age
>>     |FROM
>>     |  user_behavior AS b
>>     |  JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
>>     |  ON b.uid = u.uid
>>     |
>>     |""".stripMargin)
>> streamTableEnv.execute("Temporal table join")

回复