Github user Aegeaner commented on a diff in the pull request: https://github.com/apache/flink/pull/5065#discussion_r155740555 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala --- @@ -98,7 +98,9 @@ abstract class BatchTableEnvironment( tableSource match { case batchTableSource: BatchTableSource[_] => - registerTableInternal(name, new BatchTableSourceTable(batchTableSource)) + val table = new BatchTableSourceTable(batchTableSource) + checkValidTableSourceType(tableSource) + registerTableInternal(name, table) --- End diff -- `registerTableInternal` takes parameter with `AbstractTable` type, while when registering `TableSource`, we should pass `TableSource` type.
---