dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r283845430
########## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala ########## @@ -40,63 +40,67 @@ object ExternalTableUtil extends Logging { * @param externalTable the [[ExternalCatalogTable]] instance which to convert * @return converted [[TableSourceTable]] instance from the input catalog table */ - def fromExternalCatalogTable[T1, T2]( - tableEnv: TableEnvironment, - externalTable: ExternalCatalogTable) + def fromExternalCatalogTable[T1, T2](isBatch: Boolean, externalTable: ExternalCatalogTable) : TableSourceSinkTable[T1, T2] = { val statistics = new FlinkStatistic(toScala(externalTable.getTableStats)) val source: Option[TableSourceTable[T1]] = if (externalTable.isTableSource) { - Some(createTableSource(tableEnv, externalTable, statistics)) + Some(createTableSource(isBatch, externalTable, statistics)) } else { None } val sink: Option[TableSinkTable[T2]] = if (externalTable.isTableSink) { - Some(createTableSink(tableEnv, externalTable, statistics)) + Some(createTableSink(isBatch, externalTable, statistics)) } else { None } new TableSourceSinkTable[T1, T2](source, sink) } + def getTableSchema(externalTable: ExternalCatalogTable) : TableSchema = { + if (externalTable.isTableSource) { + TableFactoryUtil.findAndCreateTableSource[Any](externalTable).getTableSchema + } else { + val tableSink = TableFactoryUtil.findAndCreateTableSink(externalTable) + new TableSchema(tableSink.getFieldNames, tableSink.getFieldTypes) + } + } + private def createTableSource[T]( - tableEnv: TableEnvironment, + isBatch: Boolean, externalTable: ExternalCatalogTable, statistics: FlinkStatistic) - : TableSourceTable[T] = tableEnv match { - - case _: BatchTableEnvImpl if externalTable.isBatchTable => + : TableSourceTable[T] = { + if (isBatch && externalTable.isBatchTable) { val source = TableFactoryUtil.findAndCreateTableSource(externalTable) new BatchTableSourceTable[T](source.asInstanceOf[BatchTableSource[T]], statistics) - - case _: StreamTableEnvImpl if externalTable.isStreamTable => + } else if (!isBatch && externalTable.isStreamTable) { val source = TableFactoryUtil.findAndCreateTableSource(externalTable) new StreamTableSourceTable[T](source.asInstanceOf[StreamTableSource[T]], statistics) - - case _ => + } else { throw new ValidationException( "External catalog table does not support the current environment for a table source.") Review comment: Yes, but I think this message applies to both of those. BTW the logic has not changed, just the type of the flag has changed. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services