看起来你又踩到了一个坑,你用的是1.10.0吧?可以切换到1.10.1试一下,有两个bug已经在1.10.1中修复了。
Zhou Zach <[email protected]> 于2020年6月12日周五 下午3:47写道: > 还是不行, > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type > [org.apache.logging.slf4j.Log4jLoggerFactory] > ERROR StatusLogger No log4j2 configuration file found. Using default > configuration: logging only errors to the console. > Exception in thread "main" org.apache.flink.table.api.SqlParserException: > SQL parse failed. Encountered "time FROM" at line 1, column 44. > Was expecting one of: > "CURSOR" ... > "EXISTS" ... > "NOT" ... > "ROW" ... > "(" ... > "+" ... > "-" ... > <UNSIGNED_INTEGER_LITERAL> ... > <DECIMAL_NUMERIC_LITERAL> ... > <APPROX_NUMERIC_LITERAL> ... > <BINARY_STRING_LITERAL> ... > <PREFIXED_STRING_LITERAL> ... > <QUOTED_STRING> ... > <UNICODE_STRING_LITERAL> ... > "TRUE" ... > "FALSE" ... > "UNKNOWN" ... > "NULL" ... > <LBRACE_D> ... > <LBRACE_T> ... > <LBRACE_TS> ... > "DATE" ... > "TIME" <QUOTED_STRING> ... > "TIMESTAMP" ... > "INTERVAL" ... > "?" ... > "CAST" ... > "EXTRACT" ... > "POSITION" ... > "CONVERT" ... > "TRANSLATE" ... > "OVERLAY" ... > "FLOOR" ... > "CEIL" ... > "CEILING" ... > "SUBSTRING" ... > "TRIM" ... > "CLASSIFIER" ... > "MATCH_NUMBER" ... > "RUNNING" ... > "PREV" ... > "NEXT" ... > "JSON_EXISTS" ... > "JSON_VALUE" ... > "JSON_QUERY" ... > "JSON_OBJECT" ... > "JSON_OBJECTAGG" ... > "JSON_ARRAY" ... > "JSON_ARRAYAGG" ... > <LBRACE_FN> ... > "MULTISET" ... > "ARRAY" ... > "MAP" ... > "PERIOD" ... > "SPECIFIC" ... > <IDENTIFIER> ... > <QUOTED_IDENTIFIER> ... > <BACK_QUOTED_IDENTIFIER> ... > <BRACKET_QUOTED_IDENTIFIER> ... > <UNICODE_QUOTED_IDENTIFIER> ... > "ABS" ... > "AVG" ... > "CARDINALITY" ... > "CHAR_LENGTH" ... > "CHARACTER_LENGTH" ... > "COALESCE" ... > "COLLECT" ... > "COVAR_POP" ... > "COVAR_SAMP" ... > "CUME_DIST" ... > "COUNT" ... > "CURRENT_DATE" ... > "CURRENT_TIME" ... > "CURRENT_TIMESTAMP" ... > "DENSE_RANK" ... > "ELEMENT" ... > "EXP" ... > "FIRST_VALUE" ... > "FUSION" ... > "GROUPING" ... > "HOUR" ... > "LAG" ... > "LEAD" ... > "LEFT" ... > "LAST_VALUE" ... > "LN" ... > "LOCALTIME" ... > "LOCALTIMESTAMP" ... > "LOWER" ... > "MAX" ... > "MIN" ... > "MINUTE" ... > "MOD" ... > "MONTH" ... > "NTH_VALUE" ... > "NTILE" ... > "NULLIF" ... > "OCTET_LENGTH" ... > "PERCENT_RANK" ... > "POWER" ... > "RANK" ... > "REGR_COUNT" ... > "REGR_SXX" ... > "REGR_SYY" ... > "RIGHT" ... > "ROW_NUMBER" ... > "SECOND" ... > "SQRT" ... > "STDDEV_POP" ... > "STDDEV_SAMP" ... > "SUM" ... > "UPPER" ... > "TRUNCATE" ... > "USER" ... > "VAR_POP" ... > "VAR_SAMP" ... > "YEAR" ... > "CURRENT_CATALOG" ... > "CURRENT_DEFAULT_TRANSFORM_GROUP" ... > "CURRENT_PATH" ... > "CURRENT_ROLE" ... > "CURRENT_SCHEMA" ... > "CURRENT_USER" ... > "SESSION_USER" ... > "SYSTEM_USER" ... > "NEW" ... > "CASE" ... > "CURRENT" ... > > at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50) > at > org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79) > at > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111) > at > org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > at > org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63) > at > org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) > > > query: > > > streamTableEnv.sqlUpdate( > """ > | > |CREATE TABLE user_behavior ( > | uid VARCHAR, > | phoneType VARCHAR, > | clickCount INT, > | proctime AS PROCTIME(), > | `time` TIMESTAMP(3) > |) WITH ( > | 'connector.type' = 'kafka', > | 'connector.version' = 'universal', > | 'connector.topic' = 'user_behavior', > | 'connector.startup-mode' = 'earliest-offset', > | 'connector.properties.0.key' = 'zookeeper.connect', > | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181', > | 'connector.properties.1.key' = 'bootstrap.servers', > | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092', > | 'update-mode' = 'append', > | 'format.type' = 'json', > | 'format.derive-schema' = 'true' > |) > |""".stripMargin) > streamTableEnv.sqlUpdate( > """ > | > |insert into user_cnt > |SELECT > | cast(b.`time` as string), u.age > |FROM > | user_behavior AS b > | JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u > | ON b.uid = u.uid > | > |""".stripMargin) > > > > > > > 不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME() > 放在select 后面也不行。 > > > > > > > > > 在 2020-06-12 15:29:49,"Benchao Li" <[email protected]> 写道: > >你写反了,是proctime AS PROCTIME()。 > >计算列跟普通query里面的AS是反着的。 > > > >Zhou Zach <[email protected]> 于2020年6月12日周五 下午2:24写道: > > > >> flink 1.10.0: > >> 在create table中,加PROCTIME() AS proctime字段报错 > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2020-06-12 14:08:11,"Benchao Li" <[email protected]> 写道: > >> >Hi, > >> > > >> >Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。 > >> >可以参考下[1] > >> > > >> >[1] > >> > > >> > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html > >> > > >> >Zhou Zach <[email protected]> 于2020年6月12日周五 下午1:33写道: > >> > > >> >> SLF4J: Class path contains multiple SLF4J bindings. > >> >> > >> >> SLF4J: Found binding in > >> >> > >> > [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > >> >> > >> >> SLF4J: Found binding in > >> >> > >> > [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] > >> >> > >> >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > >> >> explanation. > >> >> > >> >> SLF4J: Actual binding is of type > >> >> [org.apache.logging.slf4j.Log4jLoggerFactory] > >> >> > >> >> ERROR StatusLogger No log4j2 configuration file found. Using default > >> >> configuration: logging only errors to the console. > >> >> > >> >> Exception in thread "main" org.apache.flink.table.api.TableException: > >> >> Cannot generate a valid execution plan for the given query: > >> >> > >> >> > >> >> > >> >> > >> >> > FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`], > >> >> fields=[time, sum_age]) > >> >> > >> >> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age]) > >> >> > >> >> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner]) > >> >> > >> >> :- FlinkLogicalCalc(select=[uid, time]) > >> >> > >> >> : +- FlinkLogicalTableSourceScan(table=[[default_catalog, > >> >> default_database, user_behavior, source: [KafkaTableSource(uid, > >> phoneType, > >> >> clickCount, time)]]], fields=[uid, phoneType, clickCount, time]) > >> >> > >> >> +- FlinkLogicalSnapshot(period=[$cor0.time]) > >> >> > >> >> +- FlinkLogicalCalc(select=[uid, age]) > >> >> > >> >> +- FlinkLogicalTableSourceScan(table=[[default_catalog, > >> >> default_database, users, source: [MysqlAsyncLookupTableSource(uid, > sex, > >> >> age, created_time)]]], fields=[uid, sex, age, created_time]) > >> >> > >> >> > >> >> > >> >> > >> >> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' > left > >> >> table's proctime field, doesn't support 'PROCTIME()' > >> >> > >> >> Please check the documentation for the set of currently supported SQL > >> >> features. > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > >> >> > >> >> at > >> >> > >> > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > >> >> > >> >> at > >> >> > >> > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > >> >> > >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) > >> >> > >> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > >> >> > >> >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > >> >> > >> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > >> >> > >> >> at > >> >> > >> > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > >> >> > >> >> at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) > >> >> > >> >> at > >> >> > >> > org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90) > >> >> > >> >> at > >> >> > >> > org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) > >> >> > >> >> Caused by: org.apache.flink.table.api.TableException: Temporal table > >> join > >> >> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime > >> >> field, doesn't support 'PROCTIME()' > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67) > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147) > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) > >> >> > >> >> at > >> >> > >> > org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529) > >> >> > >> >> at > >> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324) > >> >> > >> >> at > >> >> > >> > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) > >> >> > >> >> ... 20 more > >> >> > >> >> > >> >> > >> >> > >> >> query: > >> >> > >> >> > >> >> val streamExecutionEnv = > >> StreamExecutionEnvironment.getExecutionEnvironment > >> >> val blinkEnvSettings = > >> >> > >> > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > >> >> val streamTableEnv = > >> >> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings) > >> >> > >> >> streamTableEnv.sqlUpdate( > >> >> """ > >> >> | > >> >> |CREATE TABLE user_behavior ( > >> >> | uid VARCHAR, > >> >> | phoneType VARCHAR, > >> >> | clickCount INT, > >> >> | `time` TIMESTAMP(3) > >> >> |) WITH ( > >> >> | 'connector.type' = 'kafka', > >> >> | 'connector.version' = 'universal', > >> >> | 'connector.topic' = 'user_behavior', > >> >> | 'connector.startup-mode' = 'earliest-offset', > >> >> | 'connector.properties.0.key' = 'zookeeper.connect', > >> >> | 'connector.properties.0.value' = > >> 'cdh1:2181,cdh2:2181,cdh3:2181', > >> >> | 'connector.properties.1.key' = 'bootstrap.servers', > >> >> | 'connector.properties.1.value' = > >> 'cdh1:9092,cdh2:9092,cdh3:9092', > >> >> | 'update-mode' = 'append', > >> >> | 'format.type' = 'json', > >> >> | 'format.derive-schema' = 'true' > >> >> |) > >> >> |""".stripMargin) > >> >> streamTableEnv.sqlUpdate( > >> >> """ > >> >> | > >> >> |CREATE TABLE user_cnt ( > >> >> | `time` VARCHAR, > >> >> | sum_age INT > >> >> |) WITH ( > >> >> | 'connector.type' = 'jdbc', > >> >> | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard', > >> >> | 'connector.table' = 'user_cnt', > >> >> | 'connector.username' = 'root', > >> >> | 'connector.password' = '123456', > >> >> | 'connector.write.flush.max-rows' = '1' > >> >> |) > >> >> |""".stripMargin) > >> >> val userTableSource = new MysqlAsyncLookupTableSource( > >> >> Array("uid", "sex", "age", "created_time"), > >> >> Array(), > >> >> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING)) > >> >> streamTableEnv.registerTableSource("users", userTableSource) > >> >> streamTableEnv.sqlUpdate( > >> >> """ > >> >> | > >> >> |insert into user_cnt > >> >> |SELECT > >> >> | cast(b.`time` as string), u.age > >> >> |FROM > >> >> | user_behavior AS b > >> >> | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u > >> >> | ON b.uid = u.uid > >> >> | > >> >> |""".stripMargin) > >> >> streamTableEnv.execute("Temporal table join") > >> >
