[ https://issues.apache.org/jira/browse/FLINK-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16147933#comment-16147933 ]
ASF GitHub Bot commented on FLINK-6442: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3829#discussion_r136163423 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala --- @@ -800,6 +794,66 @@ class Table( } /** + * Writes the [[Table]] to a [[TableSink]] specified by given name. The tableSink name + * represents a registered [[TableSink]] which defines an external storage location. + * + * A batch [[Table]] can only be written to a + * [[org.apache.flink.table.sinks.BatchTableSink]], a streaming [[Table]] requires a + * [[org.apache.flink.table.sinks.AppendStreamTableSink]], a + * [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an + * [[org.apache.flink.table.sinks.UpsertStreamTableSink]].* + * + * @param tableName Name of the [[TableSink]] to which the [[Table]] is written. + * @tparam T The data type that the [[TableSink]] expects. + */ + def insertInto[T](tableName: String): Unit = { + insertInto(tableName, QueryConfig.getQueryConfigFromTableEnv(this.tableEnv)) + } + + /** + * Writes the [[Table]] to a [[TableSink]] specified by given name. The tableSink name + * represents a registered [[TableSink]] which defines an external storage location. + * + * A batch [[Table]] can only be written to a + * [[org.apache.flink.table.sinks.BatchTableSink]], a streaming [[Table]] requires a + * [[org.apache.flink.table.sinks.AppendStreamTableSink]], a + * [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an + * [[org.apache.flink.table.sinks.UpsertStreamTableSink]].* + * + * @param tableName Name of the [[TableSink]] to which the [[Table]] is written. + * @param conf The [[QueryConfig]] to use. + * @tparam T The data type that the [[TableSink]] expects. + */ + def insertInto[T](tableName: String, conf: QueryConfig): Unit = { + require(tableName != null && !tableName.isEmpty, "tableSink must not be null or empty.") + // validate if the tableSink is registered + if (!tableEnv.isRegistered(tableName)) { + throw TableException("tableSink must be registered.") + } + // find if the tableSink is registered //, include validation internally + tableEnv.getTable(tableName) match { + case sink: TableSinkTable[_] => { + // get row type info of upstream table + val rowType = getRelNode.getRowType + val srcFieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala + .map(field => FlinkTypeFactory.toTypeInfo(field.getType)).toArray + // column count validation + if (srcFieldTypes.length != sink.fieldTypes.length) { + throw TableException(s"source column count doesn't match target table[$tableName]'s.") + } + // column type validation, no need to validate field names + if (sink.fieldTypes.zipWithIndex.exists(f => f._1 != srcFieldTypes(f._2))) { --- End diff -- simplify to `sink.fieldTypes.zip(srcFieldTypes).exists(t => t._1 != t._2)` > Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in > SQL > ------------------------------------------------------------------------------- > > Key: FLINK-6442 > URL: https://issues.apache.org/jira/browse/FLINK-6442 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: lincoln.lee > Assignee: lincoln.lee > Priority: Minor > > Currently in TableAPI there’s only registration method for source table, > when we use SQL writing a streaming job, we should add additional part for > the sink, like TableAPI does: > {code} > val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3" > val t = StreamTestData.getSmall3TupleDataStream(env) > tEnv.registerDataStream("MyTable", t) > // one way: invoke tableAPI’s writeToSink method directly > val result = tEnv.sql(sqlQuery) > result.writeToSink(new YourStreamSink) > // another way: convert to datastream first and then invoke addSink > val result = tEnv.sql(sqlQuery).toDataStream[Row] > result.addSink(new StreamITCase.StringSink) > {code} > From the api we can see the sink table always be a derived table because its > 'schema' is inferred from the result type of upstream query. > Compare to traditional RDBMS which support DML syntax, a query with a target > output could be written like this: > {code} > insert into table target_table_name > [(column_name [ ,...n ])] > query > {code} > The equivalent form of the example above is as follows: > {code} > tEnv.registerTableSink("targetTable", new YourSink) > val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable" > val result = tEnv.sql(sql) > {code} > It is supported by Calcite’s grammar: > {code} > insert:( INSERT | UPSERT ) INTO tablePrimary > [ '(' column [, column ]* ')' ] > query > {code} > I'd like to extend Flink TableAPI to support such feature. see design doc: > https://goo.gl/n3phK5 -- This message was sent by Atlassian JIRA (v6.4.14#64029)