Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3829#discussion_r139621820 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -130,6 +130,72 @@ abstract class StreamTableEnvironment( } /** + * Registers an external [[TableSink]] with given field names and types in this + * [[TableEnvironment]]'s catalog. Registered sink tables can be referenced in SQL DML clause. + * + * Examples: + * + * - predefine a table sink and its field names and types + * {{{ + * val fieldNames: Array[String] = Array("a", "b", "c") + * val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG) + * val tableSink: TableSink = new YourTableSinkImpl(...) + * }}} + * + * - register an alias for this table sink to catalog + * {{{ + * tableEnv.registerTableSink("example_sink_table", fieldNames, fieldsTypes, tableSink) + * }}} + * + * - use the registered sink in SQL directly + * {{{ + * tableEnv.sqlInsert("INSERT INTO example_sink_table SELECT a, b, c FROM sourceTable") + * }}} + * + * @param name The name under which the [[TableSink]] is registered. + * @param fieldNames The field names related to the [[TableSink]]. + * @param fieldTypes The field types related to the [[TableSink]]. + * @param tableSink The [[TableSink]] to register. + */ + def registerTableSink( + name: String, + fieldNames: Array[String], + fieldTypes: Array[TypeInformation[_]], + tableSink: TableSink[_]): Unit = { + checkValidTableName(name) + if (null == fieldNames || null == fieldTypes) { + throw new TableException("fieldNames and fieldTypes should not be empty!") + } + if (fieldNames.length != fieldTypes.length) { + val errorMsg = "Field names and types should have same length! Passing fieldNames " + + s"length: ${fieldNames.length} but fieldTypes length: ${fieldTypes.length}" + throw new TableException(errorMsg) + } + + tableSink match { + case streamTableSink@(_: AppendStreamTableSink[_] | _: UpsertStreamTableSink[_] | + _: RetractStreamTableSink[_]) => + val configuredSink = streamTableSink.configure(fieldNames, fieldTypes) + registerTableInternal(name, new TableSinkTable(configuredSink)) + case _ => + throw new TableException( + "Only AppendStreamTableSink, UpsertStreamTableSink and RetractStreamTableSink can be " + + "registered in StreamTableEnvironment.") + } + } + + /** + * Registers a [[Table]] under a unique name in the TableEnvironment's catalog. + * Registered tables can be referenced in SQL queries. + * + * @param name The name under which the table will be registered. + * @param table The table to register. + */ + override def registerTable( --- End diff -- thx for the confirmation
---