[ 
https://issues.apache.org/jira/browse/FLINK-28111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-28111:
-----------------------------------
    Affects Version/s: 1.13.5

> flinksql use hivecatalog cause union all  operation lost  'eventTime 
> attribute'
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-28111
>                 URL: https://issues.apache.org/jira/browse/FLINK-28111
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API, Table SQL / Planner, Table SQL / Runtime
>    Affects Versions: 1.12.4, 1.13.5
>         Environment: flink 1.12.4
> hadoop 2.6.5
> hive 1.1.0
>            Reporter: yutao
>            Priority: Major
>
>  In my scenario , i have 2 topics  have same schema ; i register them  to  
> table and define eventtime.
> then create view use union all  2 table ,and use view  group by  tumble 
> window ;
> but when set hivecatalog ,sql can not run ;just like this:
> Exception in thread "main" org.apache.flink.table.api.TableException: Window 
> aggregate can only be defined over a time attribute column, but TIMESTAMP(3) 
> encountered.
>  
>  *The complete code is as follows*
> {code:java}
> package com.unicom.test;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.SqlDialect;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.catalog.hive.HiveCatalog;
> /**
>  *
>  * @author yt
>  */
> public class DataGenAndPrintSink {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setParallelism(1);
>         EnvironmentSettings envSetting = EnvironmentSettings
>                 .newInstance()
>                 .useBlinkPlanner()
>                 .inStreamingMode()
>                 .build();
>         StreamTableEnvironment tableEnv =  StreamTableEnvironment.create(env, 
> envSetting);
>         String defaultDatabase = "dc_dw" ;
>         String catalogName = "dc_catalog";
>         HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, 
> "hdfs://beh/flink/hive/conf","1.1.0");
>         tableEnv.registerCatalog(catalogName, hive);
>         tableEnv.useCatalog(catalogName);
>         tableEnv.useDatabase(defaultDatabase);
>         tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>         String sourceDDL = "CREATE TABLE IF NOT EXISTS source_table (\n" +
>                 "    -- 维度数据\n" +
>                 "    order_id  STRING,\n" +
>                 "    -- 用户 id\n" +
>                 "    user_id BIGINT,\n" +
>                 "    -- 用户\n" +
>                 "    price BIGINT,\n" +
>                 "    -- 事件时间戳\n" +
>                 "    row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),\n" +
>                 "    -- watermark 设置\n" +
>                 "    WATERMARK FOR row_time AS row_time - INTERVAL '5' 
> SECOND\n" +
>                 ") WITH (\n" +
>                 "  'connector' = 'datagen',\n" +
>                 "  'rows-per-second' = '10',\n" +
>                 "  'fields.order_id.length' = '1',\n" +
>                 "  'fields.user_id.min' = '1',\n" +
>                 "  'fields.user_id.max' = '100000',\n" +
>                 "  'fields.price.min' = '1',\n" +
>                 "  'fields.price.max' = '100000'\n" +
>                 ")";
>         String sourceDDL_2 = "CREATE TABLE IF NOT EXISTS source_table_2 (\n" +
>                 "    -- 维度数据\n" +
>                 "    order_id  STRING,\n" +
>                 "    -- 用户 id\n" +
>                 "    user_id BIGINT,\n" +
>                 "    -- 用户\n" +
>                 "    price BIGINT,\n" +
>                 "    -- 事件时间戳\n" +
>                 "    row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),\n" +
>                 "    -- watermark 设置\n" +
>                 "    WATERMARK FOR row_time AS row_time - INTERVAL '5' 
> SECOND\n" +
>                 ") WITH (\n" +
>                 "  'connector' = 'datagen',\n" +
>                 "  'rows-per-second' = '10',\n" +
>                 "  'fields.order_id.length' = '1',\n" +
>                 "  'fields.user_id.min' = '1',\n" +
>                 "  'fields.user_id.max' = '100000',\n" +
>                 "  'fields.price.min' = '1',\n" +
>                 "  'fields.price.max' = '100000'\n" +
>                 ")";
>         tableEnv.executeSql(sourceDDL);
>         tableEnv.executeSql(sourceDDL_2);
>         String view = "create view IF NOT EXISTS test_view as select * from 
> (select * from source_table union all select * from source_table_2) tb1";
>         tableEnv.executeSql(view);
>         
>         String sqlGroup = "select 
> count(*),UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS 
> STRING)) * 1000  as window_start from test_view group by 
> order_id,tumble(row_time, interval '1' minute)";
>         tableEnv.executeSql(sqlGroup).print();
>     }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to