[ https://issues.apache.org/jira/browse/FLINK-10851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683800#comment-16683800 ]
frank wang commented on FLINK-10851: ------------------------------------ i don't think so tableEnv.sqlUpdate("insert into `kafka.sdkafka.product_4` select filedName1, filedName2 from `kafka.sdkafka.order_4`") the "kafka.sdkafka.product_4" will be split to new String[]\{"kafka","sdkafka","product_4"} and each of them represents a meaning > sqlUpdate support complex insert grammar > ---------------------------------------- > > Key: FLINK-10851 > URL: https://issues.apache.org/jira/browse/FLINK-10851 > Project: Flink > Issue Type: Bug > Reporter: frank wang > Priority: Major > Labels: pull-request-available > > my code is > {{tableEnv.sqlUpdate("insert into kafka.sdkafka.product_4 select filedName1, > filedName2 from kafka.sdkafka.order_4");}} > but flink give me error info, said kafka "No table was registered under the > name kafka" > i modify the code ,that is ok now > TableEnvironment.scala > {code:java} > def sqlUpdate(stmt: String, config: QueryConfig): Unit = { > val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, > getTypeFactory) > // parse the sql query > val parsed = planner.parse(stmt) > parsed match { > case insert: SqlInsert => > // validate the SQL query > val query = insert.getSource > val validatedQuery = planner.validate(query) > // get query result as Table > val queryResult = new Table(this, > LogicalRelNode(planner.rel(validatedQuery).rel)) > // get name of sink table > val targetTableName = > insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0) > // insert query result into sink table > insertInto(queryResult, targetTableName, config) > case _ => > throw new TableException( > "Unsupported SQL query! sqlUpdate() only accepts SQL statements of > type INSERT.") > } > } > {code} > should modify to this > {code:java} > def sqlUpdate(stmt: String, config: QueryConfig): Unit = { > val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, > getTypeFactory) > // parse the sql query > val parsed = planner.parse(stmt) > parsed match { > case insert: SqlInsert => > // validate the SQL query > val query = insert.getSource > val validatedQuery = planner.validate(query) > // get query result as Table > val queryResult = new Table(this, > LogicalRelNode(planner.rel(validatedQuery).rel)) > // get name of sink table > //val targetTableName = > insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0) > val targetTableName = insert.getTargetTable.toString > // insert query result into sink table > insertInto(queryResult, targetTableName, config) > case _ => > throw new TableException( > "Unsupported SQL query! sqlUpdate() only accepts SQL statements of > type INSERT.") > } > } > {code} > > i hope this can be acceptted, thx -- This message was sent by Atlassian JIRA (v7.6.3#76005)