xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r283025643
########## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala ########## @@ -40,63 +40,67 @@ object ExternalTableUtil extends Logging { * @param externalTable the [[ExternalCatalogTable]] instance which to convert * @return converted [[TableSourceTable]] instance from the input catalog table */ - def fromExternalCatalogTable[T1, T2]( - tableEnv: TableEnvironment, - externalTable: ExternalCatalogTable) + def fromExternalCatalogTable[T1, T2](isBatch: Boolean, externalTable: ExternalCatalogTable) : TableSourceSinkTable[T1, T2] = { val statistics = new FlinkStatistic(toScala(externalTable.getTableStats)) val source: Option[TableSourceTable[T1]] = if (externalTable.isTableSource) { - Some(createTableSource(tableEnv, externalTable, statistics)) + Some(createTableSource(isBatch, externalTable, statistics)) } else { None } val sink: Option[TableSinkTable[T2]] = if (externalTable.isTableSink) { - Some(createTableSink(tableEnv, externalTable, statistics)) + Some(createTableSink(isBatch, externalTable, statistics)) } else { None } new TableSourceSinkTable[T1, T2](source, sink) } + def getTableSchema(externalTable: ExternalCatalogTable) : TableSchema = { + if (externalTable.isTableSource) { + TableFactoryUtil.findAndCreateTableSource[Any](externalTable).getTableSchema + } else { + val tableSink = TableFactoryUtil.findAndCreateTableSink(externalTable) + new TableSchema(tableSink.getFieldNames, tableSink.getFieldTypes) + } + } + private def createTableSource[T]( - tableEnv: TableEnvironment, + isBatch: Boolean, externalTable: ExternalCatalogTable, statistics: FlinkStatistic) - : TableSourceTable[T] = tableEnv match { - - case _: BatchTableEnvImpl if externalTable.isBatchTable => + : TableSourceTable[T] = { + if (isBatch && externalTable.isBatchTable) { val source = TableFactoryUtil.findAndCreateTableSource(externalTable) new BatchTableSourceTable[T](source.asInstanceOf[BatchTableSource[T]], statistics) - - case _: StreamTableEnvImpl if externalTable.isStreamTable => + } else if (!isBatch && externalTable.isStreamTable) { val source = TableFactoryUtil.findAndCreateTableSource(externalTable) new StreamTableSourceTable[T](source.asInstanceOf[StreamTableSource[T]], statistics) - - case _ => + } else { throw new ValidationException( "External catalog table does not support the current environment for a table source.") Review comment: This message might need to change because it covers two cases: isBatch==true && externalTable.isStreamTable, and isBatch==false && externalTable.isBatchTable. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services