[ https://issues.apache.org/jira/browse/FLINK-28111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Martijn Visser updated FLINK-28111: ----------------------------------- Affects Version/s: 1.13.5 > flinksql use hivecatalog cause union all operation lost 'eventTime > attribute' > ------------------------------------------------------------------------------- > > Key: FLINK-28111 > URL: https://issues.apache.org/jira/browse/FLINK-28111 > Project: Flink > Issue Type: Bug > Components: Table SQL / API, Table SQL / Planner, Table SQL / Runtime > Affects Versions: 1.12.4, 1.13.5 > Environment: flink 1.12.4 > hadoop 2.6.5 > hive 1.1.0 > Reporter: yutao > Priority: Major > > In my scenario , i have 2 topics have same schema ; i register them to > table and define eventtime. > then create view use union all 2 table ,and use view group by tumble > window ; > but when set hivecatalog ,sql can not run ;just like this: > Exception in thread "main" org.apache.flink.table.api.TableException: Window > aggregate can only be defined over a time attribute column, but TIMESTAMP(3) > encountered. > > *The complete code is as follows* > {code:java} > package com.unicom.test; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.SqlDialect; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.table.catalog.hive.HiveCatalog; > /** > * > * @author yt > */ > public class DataGenAndPrintSink { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > EnvironmentSettings envSetting = EnvironmentSettings > .newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, > envSetting); > String defaultDatabase = "dc_dw" ; > String catalogName = "dc_catalog"; > HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, > "hdfs://beh/flink/hive/conf","1.1.0"); > tableEnv.registerCatalog(catalogName, hive); > tableEnv.useCatalog(catalogName); > tableEnv.useDatabase(defaultDatabase); > tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > String sourceDDL = "CREATE TABLE IF NOT EXISTS source_table (\n" + > " -- 维度数据\n" + > " order_id STRING,\n" + > " -- 用户 id\n" + > " user_id BIGINT,\n" + > " -- 用户\n" + > " price BIGINT,\n" + > " -- 事件时间戳\n" + > " row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),\n" + > " -- watermark 设置\n" + > " WATERMARK FOR row_time AS row_time - INTERVAL '5' > SECOND\n" + > ") WITH (\n" + > " 'connector' = 'datagen',\n" + > " 'rows-per-second' = '10',\n" + > " 'fields.order_id.length' = '1',\n" + > " 'fields.user_id.min' = '1',\n" + > " 'fields.user_id.max' = '100000',\n" + > " 'fields.price.min' = '1',\n" + > " 'fields.price.max' = '100000'\n" + > ")"; > String sourceDDL_2 = "CREATE TABLE IF NOT EXISTS source_table_2 (\n" + > " -- 维度数据\n" + > " order_id STRING,\n" + > " -- 用户 id\n" + > " user_id BIGINT,\n" + > " -- 用户\n" + > " price BIGINT,\n" + > " -- 事件时间戳\n" + > " row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),\n" + > " -- watermark 设置\n" + > " WATERMARK FOR row_time AS row_time - INTERVAL '5' > SECOND\n" + > ") WITH (\n" + > " 'connector' = 'datagen',\n" + > " 'rows-per-second' = '10',\n" + > " 'fields.order_id.length' = '1',\n" + > " 'fields.user_id.min' = '1',\n" + > " 'fields.user_id.max' = '100000',\n" + > " 'fields.price.min' = '1',\n" + > " 'fields.price.max' = '100000'\n" + > ")"; > tableEnv.executeSql(sourceDDL); > tableEnv.executeSql(sourceDDL_2); > String view = "create view IF NOT EXISTS test_view as select * from > (select * from source_table union all select * from source_table_2) tb1"; > tableEnv.executeSql(view); > > String sqlGroup = "select > count(*),UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS > STRING)) * 1000 as window_start from test_view group by > order_id,tumble(row_time, interval '1' minute)"; > tableEnv.executeSql(sqlGroup).print(); > } > } > {code} > -- This message was sent by Atlassian Jira (v8.20.7#820007)