wuchong commented on a change in pull request #11538: [FLINK-16813][jdbc] JDBCInputFormat doesn't correctly map Short URL: https://github.com/apache/flink/pull/11538#discussion_r399142816
########## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ########## @@ -127,7 +134,7 @@ public JDBCInputFormat() { @Override public RowTypeInfo getProducedType() { - return rowTypeInfo; + return (RowTypeInfo) fromDataTypeToLegacyInfo(fromLogicalToDataType(rowType)); Review comment: As far as I can see, the rowType is used for `JdbcDialect#setRow`. IMO, `JdbcDialect` shoudn't be mixed in a runtime class. It's responsibility is to construct a correct runtime class based on the dialect. So I would suggest that `JdbcDialect#getJdbcRowConverter(RowType)` and the `JdbcInputFormat` accepts a `JdbcRowConverter`. The runtime converters should already be in place that simply perform conversion without going through big switch/case statements for types. In this way, we can still keep `rowTypeInfo` as the constructor parameter and don't need `rowType`. ```java interface JdbcRowConverter { Row convertToRow(ResultSet resultSet, Row reuse); } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services