Github user Aegeaner commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5065#discussion_r155740555
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
    @@ -98,7 +98,9 @@ abstract class BatchTableEnvironment(
     
         tableSource match {
           case batchTableSource: BatchTableSource[_] =>
    -        registerTableInternal(name, new 
BatchTableSourceTable(batchTableSource))
    +        val table = new BatchTableSourceTable(batchTableSource)
    +        checkValidTableSourceType(tableSource)
    +        registerTableInternal(name, table)
    --- End diff --
    
    `registerTableInternal` takes parameter with `AbstractTable` type, while 
when registering `TableSource`, we should pass `TableSource` type.


---

Reply via email to