[ https://issues.apache.org/jira/browse/FLINK-18070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-18070: ----------------------------------- Labels: pull-request-available (was: ) > Time attribute been materialized after sub graph optimize > --------------------------------------------------------- > > Key: FLINK-18070 > URL: https://issues.apache.org/jira/browse/FLINK-18070 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.10.0 > Reporter: YufeiLiu > Assignee: YufeiLiu > Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > Hi, I want to use window aggregate after create temporary, and has multiple > sinks. But throw exception: > {code:java} > java.lang.AssertionError: type mismatch: > ref: > TIME ATTRIBUTE(PROCTIME) NOT NULL > input: > TIMESTAMP(3) NOT NULL > {code} > I look into the optimizer logic, there is comment at > {{CommonSubGraphBasedOptimizer}}: > "1. In general, for multi-sinks users tend to use VIEW which is a natural > common sub-graph." > After sub graph optimize, time attribute from source have been convert to > basic TIMESTAMP type according to {{FlinkRelTimeIndicatorProgram}}. But my > create view sql is simple query, I think didn't need to materialized time > attribute in theory. > Here is my code: > {code:java} > // connector.type COLLECTION is for debug use > tableEnv.sqlUpdate("CREATE TABLE source (\n" + > " `ts` AS PROCTIME(),\n" + > " `order_type` INT\n" + > ") WITH (\n" + > " 'connector.type' = 'COLLECTION',\n" + > " 'format.type' = 'json'\n" + > ")\n"); > tableEnv.createTemporaryView("source_view", tableEnv.sqlQuery("SELECT * FROM > source")); > tableEnv.sqlUpdate("CREATE TABLE sink (\n" + > " `result` BIGINT\n" + > ") WITH (\n" + > " 'connector.type' = 'COLLECTION',\n" + > " 'format.type' = 'json'\n" + > ")\n"); > tableEnv.sqlUpdate("INSERT INTO sink \n" + > "SELECT\n" + > " COUNT(1)\n" + > "FROM\n" + > " `source_view`\n" + > "WHERE\n" + > " `order_type` = 33\n" + > "GROUP BY\n" + > " TUMBLE(`ts`, INTERVAL '5' SECOND)\n"); > tableEnv.sqlUpdate("INSERT INTO sink \n" + > "SELECT\n" + > " COUNT(1)\n" + > "FROM\n" + > " `source_view`\n" + > "WHERE\n" + > " `order_type` = 34\n" + > "GROUP BY\n" + > " TUMBLE(`ts`, INTERVAL '5' SECOND)\n"); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)