Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3829#discussion_r139530317 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala --- @@ -800,6 +798,68 @@ class Table( } /** + * Writes the [[Table]] to a [[TableSink]] specified by given name. The tableName + * represents a registered [[TableSink]] which defines an external storage location. + * + * A batch [[Table]] can only be written to a + * [[org.apache.flink.table.sinks.BatchTableSink]], a streaming [[Table]] requires a + * [[org.apache.flink.table.sinks.AppendStreamTableSink]], a + * [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an + * [[org.apache.flink.table.sinks.UpsertStreamTableSink]].* + * + * @param tableName Name of the [[TableSink]] to which the [[Table]] is written. + */ + def insertInto(tableName: String): Unit = { + insertInto(tableName, this.tableEnv.queryConfig) + } + + /** + * Writes the [[Table]] to a [[TableSink]] specified by given name. The tableName + * represents a registered [[TableSink]] which defines an external storage location. + * + * A batch [[Table]] can only be written to a + * [[org.apache.flink.table.sinks.BatchTableSink]], a streaming [[Table]] requires a + * [[org.apache.flink.table.sinks.AppendStreamTableSink]], a + * [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an + * [[org.apache.flink.table.sinks.UpsertStreamTableSink]].* + * + * @param tableName Name of the [[TableSink]] to which the [[Table]] is written. + * @param conf The [[QueryConfig]] to use. + */ + def insertInto(tableName: String, conf: QueryConfig): Unit = { + require(tableName != null && !tableName.isEmpty, "tableSink must not be null or empty.") + // validate if the tableSink is registered + if (!tableEnv.isRegistered(tableName)) { --- End diff -- I think it is better to move the implementation to a method `TableEnvironment.insertTableInto(table: Table, sinkTable: String, conf: QueryConfig)`. This would also allow to make `TableEnvironment.getTable()` `private`.
---