xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create 
CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r283025643
 
 

 ##########
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
 ##########
 @@ -40,63 +40,67 @@ object ExternalTableUtil extends Logging {
     * @param externalTable the [[ExternalCatalogTable]] instance which to 
convert
     * @return converted [[TableSourceTable]] instance from the input catalog 
table
     */
-  def fromExternalCatalogTable[T1, T2](
-      tableEnv: TableEnvironment,
-      externalTable: ExternalCatalogTable)
+  def fromExternalCatalogTable[T1, T2](isBatch: Boolean, externalTable: 
ExternalCatalogTable)
     : TableSourceSinkTable[T1, T2] = {
 
     val statistics = new FlinkStatistic(toScala(externalTable.getTableStats))
 
     val source: Option[TableSourceTable[T1]] = if 
(externalTable.isTableSource) {
-      Some(createTableSource(tableEnv, externalTable, statistics))
+      Some(createTableSource(isBatch, externalTable, statistics))
     } else {
       None
     }
 
     val sink: Option[TableSinkTable[T2]] = if (externalTable.isTableSink) {
-      Some(createTableSink(tableEnv, externalTable, statistics))
+      Some(createTableSink(isBatch, externalTable, statistics))
     } else {
       None
     }
 
     new TableSourceSinkTable[T1, T2](source, sink)
   }
 
+  def getTableSchema(externalTable: ExternalCatalogTable) : TableSchema = {
+    if (externalTable.isTableSource) {
+      
TableFactoryUtil.findAndCreateTableSource[Any](externalTable).getTableSchema
+    } else {
+      val tableSink = TableFactoryUtil.findAndCreateTableSink(externalTable)
+      new TableSchema(tableSink.getFieldNames, tableSink.getFieldTypes)
+    }
+  }
+
   private def createTableSource[T](
-      tableEnv: TableEnvironment,
+      isBatch: Boolean,
       externalTable: ExternalCatalogTable,
       statistics: FlinkStatistic)
-    : TableSourceTable[T] = tableEnv match {
-
-    case _: BatchTableEnvImpl if externalTable.isBatchTable =>
+    : TableSourceTable[T] = {
+    if (isBatch && externalTable.isBatchTable) {
       val source = TableFactoryUtil.findAndCreateTableSource(externalTable)
       new BatchTableSourceTable[T](source.asInstanceOf[BatchTableSource[T]], 
statistics)
-
-    case _: StreamTableEnvImpl if externalTable.isStreamTable =>
+    } else if (!isBatch && externalTable.isStreamTable) {
       val source = TableFactoryUtil.findAndCreateTableSource(externalTable)
       new StreamTableSourceTable[T](source.asInstanceOf[StreamTableSource[T]], 
statistics)
-
-    case _ =>
+    } else {
       throw new ValidationException(
         "External catalog table does not support the current environment for a 
table source.")
 
 Review comment:
   This message might need to change because it covers two cases: isBatch==true 
&& externalTable.isStreamTable, and isBatch==false && 
externalTable.isBatchTable.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to