libenchao commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1445947248
########## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java: ########## @@ -92,9 +103,22 @@ public JdbcRowDataLookupFunction( }) .toArray(DataType[]::new); this.maxRetryTimes = maxRetryTimes; - this.query = + + final String baseSelectStatement = options.getDialect() .getSelectFromStatement(options.getTableName(), fieldNames, keyNames); + if (conditions == null || conditions.length == 0) { + this.query = baseSelectStatement; + if (LOG.isDebugEnabled()) { + LOG.debug("Issuing look up select {}", this.query); + } + } else { + this.query = baseSelectStatement + " AND " + String.join(" AND ", conditions); Review Comment: What if the `baseSelectStatement` does not contain a `WHERE` clause? ########## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java: ########## @@ -72,6 +72,17 @@ public JdbcRowDataLookupFunction( DataType[] fieldTypes, String[] keyNames, RowType rowType) { + this(options, maxRetryTimes, fieldNames, fieldTypes, keyNames, rowType, null); + } + + public JdbcRowDataLookupFunction( Review Comment: This is a `Internal` class, why not just change the original constructor ? ########## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java: ########## @@ -78,6 +79,8 @@ public class JdbcDynamicTableSource private List<String> resolvedPredicates = new ArrayList<>(); private Serializable[] pushdownParams = new Serializable[0]; + private List<ParameterizedPredicate> pushdownParameterizedPredicates = new ArrayList<>(); Review Comment: I'm not sure why we need another list of parameters in addition to `pushdownParams`, can we just reuse that? ########## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java: ########## @@ -92,9 +103,22 @@ public JdbcRowDataLookupFunction( }) .toArray(DataType[]::new); this.maxRetryTimes = maxRetryTimes; - this.query = + + final String baseSelectStatement = options.getDialect() .getSelectFromStatement(options.getTableName(), fieldNames, keyNames); + if (conditions == null || conditions.length == 0) { + this.query = baseSelectStatement; + if (LOG.isDebugEnabled()) { Review Comment: I'm not sure if the `if` is necessary, why not just using `LOG.debug("Issuing look up select {}", this.query);`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org