Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202506574 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala --- @@ -44,32 +43,27 @@ object ExternalTableSourceUtil extends Logging { val properties = new DescriptorProperties() externalCatalogTable.addProperties(properties) val javaMap = properties.asMap - val source = TableFactoryService.find(classOf[TableSourceFactory[_]], javaMap) - .asInstanceOf[TableSourceFactory[_]] - .createTableSource(javaMap) tableEnv match { // check for a batch table source in this batch environment case _: BatchTableEnvironment => - source match { - case bts: BatchTableSource[_] => - new TableSourceSinkTable(Some(new BatchTableSourceTable( - bts, - new FlinkStatistic(externalCatalogTable.getTableStats))), None) - case _ => throw new TableException( - s"Found table source '${source.getClass.getCanonicalName}' is not applicable " + - s"in a batch environment.") - } + val source = TableFactoryService --- End diff -- Usually it is very uncommon to define both a batch and streaming source in the same factory. Your proposed change would require all future sources to implement a check for the environment before which is unnecessary in 80% of the cases. Separating by environment is a concept that can be find throughout the entire `flink-table` module because both sources/sinks behave quite different.
---