还是不行,
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
ERROR StatusLogger No log4j2 configuration file found. Using default
configuration: logging only errors to the console.
Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL
parse failed. Encountered "time FROM" at line 1, column 44.
Was expecting one of:
"CURSOR" ...
"EXISTS" ...
"NOT" ...
"ROW" ...
"(" ...
"+" ...
"-" ...
<UNSIGNED_INTEGER_LITERAL> ...
<DECIMAL_NUMERIC_LITERAL> ...
<APPROX_NUMERIC_LITERAL> ...
<BINARY_STRING_LITERAL> ...
<PREFIXED_STRING_LITERAL> ...
<QUOTED_STRING> ...
<UNICODE_STRING_LITERAL> ...
"TRUE" ...
"FALSE" ...
"UNKNOWN" ...
"NULL" ...
<LBRACE_D> ...
<LBRACE_T> ...
<LBRACE_TS> ...
"DATE" ...
"TIME" <QUOTED_STRING> ...
"TIMESTAMP" ...
"INTERVAL" ...
"?" ...
"CAST" ...
"EXTRACT" ...
"POSITION" ...
"CONVERT" ...
"TRANSLATE" ...
"OVERLAY" ...
"FLOOR" ...
"CEIL" ...
"CEILING" ...
"SUBSTRING" ...
"TRIM" ...
"CLASSIFIER" ...
"MATCH_NUMBER" ...
"RUNNING" ...
"PREV" ...
"NEXT" ...
"JSON_EXISTS" ...
"JSON_VALUE" ...
"JSON_QUERY" ...
"JSON_OBJECT" ...
"JSON_OBJECTAGG" ...
"JSON_ARRAY" ...
"JSON_ARRAYAGG" ...
<LBRACE_FN> ...
"MULTISET" ...
"ARRAY" ...
"MAP" ...
"PERIOD" ...
"SPECIFIC" ...
<IDENTIFIER> ...
<QUOTED_IDENTIFIER> ...
<BACK_QUOTED_IDENTIFIER> ...
<BRACKET_QUOTED_IDENTIFIER> ...
<UNICODE_QUOTED_IDENTIFIER> ...
"ABS" ...
"AVG" ...
"CARDINALITY" ...
"CHAR_LENGTH" ...
"CHARACTER_LENGTH" ...
"COALESCE" ...
"COLLECT" ...
"COVAR_POP" ...
"COVAR_SAMP" ...
"CUME_DIST" ...
"COUNT" ...
"CURRENT_DATE" ...
"CURRENT_TIME" ...
"CURRENT_TIMESTAMP" ...
"DENSE_RANK" ...
"ELEMENT" ...
"EXP" ...
"FIRST_VALUE" ...
"FUSION" ...
"GROUPING" ...
"HOUR" ...
"LAG" ...
"LEAD" ...
"LEFT" ...
"LAST_VALUE" ...
"LN" ...
"LOCALTIME" ...
"LOCALTIMESTAMP" ...
"LOWER" ...
"MAX" ...
"MIN" ...
"MINUTE" ...
"MOD" ...
"MONTH" ...
"NTH_VALUE" ...
"NTILE" ...
"NULLIF" ...
"OCTET_LENGTH" ...
"PERCENT_RANK" ...
"POWER" ...
"RANK" ...
"REGR_COUNT" ...
"REGR_SXX" ...
"REGR_SYY" ...
"RIGHT" ...
"ROW_NUMBER" ...
"SECOND" ...
"SQRT" ...
"STDDEV_POP" ...
"STDDEV_SAMP" ...
"SUM" ...
"UPPER" ...
"TRUNCATE" ...
"USER" ...
"VAR_POP" ...
"VAR_SAMP" ...
"YEAR" ...
"CURRENT_CATALOG" ...
"CURRENT_DEFAULT_TRANSFORM_GROUP" ...
"CURRENT_PATH" ...
"CURRENT_ROLE" ...
"CURRENT_SCHEMA" ...
"CURRENT_USER" ...
"SESSION_USER" ...
"SYSTEM_USER" ...
"NEW" ...
"CASE" ...
"CURRENT" ...
at
org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
at
org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
at
org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:63)
at
org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
query:
streamTableEnv.sqlUpdate(
"""
|
|CREATE TABLE user_behavior (
| uid VARCHAR,
| phoneType VARCHAR,
| clickCount INT,
| proctime AS PROCTIME(),
| `time` TIMESTAMP(3)
|) WITH (
| 'connector.type' = 'kafka',
| 'connector.version' = 'universal',
| 'connector.topic' = 'user_behavior',
| 'connector.startup-mode' = 'earliest-offset',
| 'connector.properties.0.key' = 'zookeeper.connect',
| 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
| 'connector.properties.1.key' = 'bootstrap.servers',
| 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
| 'update-mode' = 'append',
| 'format.type' = 'json',
| 'format.derive-schema' = 'true'
|)
|""".stripMargin)
streamTableEnv.sqlUpdate(
"""
|
|insert into user_cnt
|SELECT
| cast(b.`time` as string), u.age
|FROM
| user_behavior AS b
| JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u
| ON b.uid = u.uid
|
|""".stripMargin)
不过,PROCTIME() AS proctime 放在select 后面可以执行成功,proctime AS PROCTIME() 放在select
后面也不行。
在 2020-06-12 15:29:49,"Benchao Li" <[email protected]> 写道:
>你写反了,是proctime AS PROCTIME()。
>计算列跟普通query里面的AS是反着的。
>
>Zhou Zach <[email protected]> 于2020年6月12日周五 下午2:24写道:
>
>> flink 1.10.0:
>> 在create table中,加PROCTIME() AS proctime字段报错
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-06-12 14:08:11,"Benchao Li" <[email protected]> 写道:
>> >Hi,
>> >
>> >Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
>> >可以参考下[1]
>> >
>> >[1]
>> >
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>> >
>> >Zhou Zach <[email protected]> 于2020年6月12日周五 下午1:33写道:
>> >
>> >> SLF4J: Class path contains multiple SLF4J bindings.
>> >>
>> >> SLF4J: Found binding in
>> >>
>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> >>
>> >> SLF4J: Found binding in
>> >>
>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> >>
>> >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> >> explanation.
>> >>
>> >> SLF4J: Actual binding is of type
>> >> [org.apache.logging.slf4j.Log4jLoggerFactory]
>> >>
>> >> ERROR StatusLogger No log4j2 configuration file found. Using default
>> >> configuration: logging only errors to the console.
>> >>
>> >> Exception in thread "main" org.apache.flink.table.api.TableException:
>> >> Cannot generate a valid execution plan for the given query:
>> >>
>> >>
>> >>
>> >>
>> >> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
>> >> fields=[time, sum_age])
>> >>
>> >> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>> >>
>> >> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>> >>
>> >> :- FlinkLogicalCalc(select=[uid, time])
>> >>
>> >> : +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>> >> default_database, user_behavior, source: [KafkaTableSource(uid,
>> phoneType,
>> >> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>> >>
>> >> +- FlinkLogicalSnapshot(period=[$cor0.time])
>> >>
>> >> +- FlinkLogicalCalc(select=[uid, age])
>> >>
>> >> +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>> >> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex,
>> >> age, created_time)]]], fields=[uid, sex, age, created_time])
>> >>
>> >>
>> >>
>> >>
>> >> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left
>> >> table's proctime field, doesn't support 'PROCTIME()'
>> >>
>> >> Please check the documentation for the set of currently supported SQL
>> >> features.
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>> >>
>> >> at
>> >>
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>> >>
>> >> at
>> >>
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>> >>
>> >> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> >>
>> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> >>
>> >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> >>
>> >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> >>
>> >> at
>> >>
>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>> >>
>> >> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>> >>
>> >> at
>> >>
>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>> >>
>> >> at
>> >>
>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>> >>
>> >> Caused by: org.apache.flink.table.api.TableException: Temporal table
>> join
>> >> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
>> >> field, doesn't support 'PROCTIME()'
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>> >>
>> >> at
>> >>
>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>> >>
>> >> at
>> >>
>> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>> >>
>> >> at
>> >>
>> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529)
>> >>
>> >> at
>> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324)
>> >>
>> >> at
>> >>
>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>> >>
>> >> ... 20 more
>> >>
>> >>
>> >>
>> >>
>> >> query:
>> >>
>> >>
>> >> val streamExecutionEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment
>> >> val blinkEnvSettings =
>> >>
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>> >> val streamTableEnv =
>> >> StreamTableEnvironment.create(streamExecutionEnv,blinkEnvSettings)
>> >>
>> >> streamTableEnv.sqlUpdate(
>> >> """
>> >> |
>> >> |CREATE TABLE user_behavior (
>> >> | uid VARCHAR,
>> >> | phoneType VARCHAR,
>> >> | clickCount INT,
>> >> | `time` TIMESTAMP(3)
>> >> |) WITH (
>> >> | 'connector.type' = 'kafka',
>> >> | 'connector.version' = 'universal',
>> >> | 'connector.topic' = 'user_behavior',
>> >> | 'connector.startup-mode' = 'earliest-offset',
>> >> | 'connector.properties.0.key' = 'zookeeper.connect',
>> >> | 'connector.properties.0.value' =
>> 'cdh1:2181,cdh2:2181,cdh3:2181',
>> >> | 'connector.properties.1.key' = 'bootstrap.servers',
>> >> | 'connector.properties.1.value' =
>> 'cdh1:9092,cdh2:9092,cdh3:9092',
>> >> | 'update-mode' = 'append',
>> >> | 'format.type' = 'json',
>> >> | 'format.derive-schema' = 'true'
>> >> |)
>> >> |""".stripMargin)
>> >> streamTableEnv.sqlUpdate(
>> >> """
>> >> |
>> >> |CREATE TABLE user_cnt (
>> >> | `time` VARCHAR,
>> >> | sum_age INT
>> >> |) WITH (
>> >> | 'connector.type' = 'jdbc',
>> >> | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
>> >> | 'connector.table' = 'user_cnt',
>> >> | 'connector.username' = 'root',
>> >> | 'connector.password' = '123456',
>> >> | 'connector.write.flush.max-rows' = '1'
>> >> |)
>> >> |""".stripMargin)
>> >> val userTableSource = new MysqlAsyncLookupTableSource(
>> >> Array("uid", "sex", "age", "created_time"),
>> >> Array(),
>> >> Array(Types.STRING, Types.STRING, Types.INT, Types.STRING))
>> >> streamTableEnv.registerTableSource("users", userTableSource)
>> >> streamTableEnv.sqlUpdate(
>> >> """
>> >> |
>> >> |insert into user_cnt
>> >> |SELECT
>> >> | cast(b.`time` as string), u.age
>> >> |FROM
>> >> | user_behavior AS b
>> >> | JOIN users FOR SYSTEM_TIME AS OF b.`time` AS u
>> >> | ON b.uid = u.uid
>> >> |
>> >> |""".stripMargin)
>> >> streamTableEnv.execute("Temporal table join")
>>