snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1443521093
########## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java: ########## @@ -97,12 +147,14 @@ private Optional<ParameterizedPredicate> renderBinaryOperator( Optional<ParameterizedPredicate> leftOperandString = allOperands.get(0).accept(this); Optional<ParameterizedPredicate> rightOperandString = allOperands.get(1).accept(this); - - return leftOperandString.flatMap( - left -> rightOperandString.map(right -> left.combine(operator, right))); + Optional<ParameterizedPredicate> renderedParameterizedPredicate = + leftOperandString.flatMap( + left -> rightOperandString.map(right -> left.combine(operator, right))); + return renderedParameterizedPredicate; } - private Optional<ParameterizedPredicate> renderUnaryOperator( + @VisibleForTesting + protected Optional<ParameterizedPredicate> renderUnaryOperator( Review Comment: Why is it marked as `VisibleForTesting` if it is not used in any test? ########## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java: ########## @@ -390,7 +390,7 @@ void testJdbcLookupPropertiesWithExcludeEmptyResult() { assertThat(actual).isEqualTo(expected); } - private Map<String, String> getAllOptions() { + protected static Map<String, String> getAllOptions() { Review Comment: why do we need this? ########## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/ParameterizedPredicate.java: ########## @@ -23,18 +23,35 @@ import org.apache.commons.lang3.ArrayUtils; import java.io.Serializable; +import java.util.ArrayList; /** A data class that model parameterized sql predicate. */ @Experimental public class ParameterizedPredicate { private String predicate; private Serializable[] parameters; + private ArrayList<Integer> indexesOfPredicatePlaceHolders = new ArrayList<>(); Review Comment: ```suggestion private List<Integer> indexesOfPredicatePlaceHolders = new ArrayList<>(); ``` It is recommened to use interfaces rather than concrete implementation ########## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/ParameterizedPredicate.java: ########## @@ -52,8 +69,26 @@ public void setPredicate(String predicate) { } public ParameterizedPredicate combine(String operator, ParameterizedPredicate that) { + int paramIndex = String.format("(%s %s ", this.predicate, operator).length(); + if (!that.indexesOfPredicatePlaceHolders.isEmpty()) { + paramIndex = paramIndex + that.indexesOfPredicatePlaceHolders.get(0); + } + this.predicate = String.format("(%s %s %s)", this.predicate, operator, that.predicate); this.parameters = ArrayUtils.addAll(this.parameters, that.parameters); + + for (int i = 0; i < this.indexesOfPredicatePlaceHolders.size(); i++) { + // increment all the existing indexes to account for the new additional first begin + // bracket + this.indexesOfPredicatePlaceHolders.set( + i, this.indexesOfPredicatePlaceHolders.get(i) + 1); + } + if (that.predicate.equals( + JdbcFilterPushdownPreparedStatementVisitor.PUSHDOWN_PREDICATE_PLACEHOLDER) + || (!that.indexesOfPredicatePlaceHolders.isEmpty())) { + // add index if that is a placeholder or has a placeholder. + this.indexesOfPredicatePlaceHolders.add(paramIndex); + } Review Comment: This will not work for cases when constant on the left side e.g. ```sql @Test public void test123() { util.verifyExecPlan( "SELECT * FROM a left join d FOR SYSTEM_TIME AS OF a.proctime on (50 = d.age OR 1 = d.type) and a.ip = d.ip"); } ``` this test will fail with ``` Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:659) at java.util.ArrayList.get(ArrayList.java:435) at org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getResolvedConditions(JdbcDynamicTableSource.java:157) at org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getLookupRuntimeProvider(JdbcDynamicTableSource.java:120) at org.apache.flink.table.planner.plan.utils.LookupJoinUtil.createLookupRuntimeProvider(LookupJoinUtil.java:626) at org.apache.flink.table.planner.plan.utils.LookupJoinUtil.isAsyncLookup(LookupJoinUtil.java:413) ``` The method itself makes some things which could be omitted, I would suggest to replace it with ```suggestion this.predicate = String.format("(%s %s %s)", this.predicate, operator, that.predicate); this.parameters = ArrayUtils.addAll(this.parameters, that.parameters); this.indexesOfPredicatePlaceHolders.add("?".equals(this.predicate) ? 0 : this.predicate.length() - 1); ``` which is simpler and does not fail for the mentioned test WDYT? ########## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java: ########## @@ -92,9 +104,25 @@ public JdbcRowDataLookupFunction( }) .toArray(DataType[]::new); this.maxRetryTimes = maxRetryTimes; - this.query = + + final String baseSelectStatement = options.getDialect() .getSelectFromStatement(options.getTableName(), fieldNames, keyNames); + if (conditions == null || conditions.length == 0) { + this.query = baseSelectStatement; + if (LOG.isDebugEnabled()) { + LOG.debug("Issuing look up select {}", this.query); + } + } else { + this.query = + baseSelectStatement + + " AND " + + Arrays.stream(conditions).collect(Collectors.joining(" AND ")); Review Comment: ```suggestion + String.join(" AND ", conditions); ``` could be simplified ########## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java: ########## @@ -46,8 +50,53 @@ public class JdbcFilterPushdownPreparedStatementVisitor extends ExpressionDefaultVisitor<Optional<ParameterizedPredicate>> { + protected static final String PUSHDOWN_PREDICATE_PLACEHOLDER = "?"; + + protected static final String OPERATOR_EQUALS = "="; + + protected static final String OPERATOR_LESS_THAN = "<"; + + protected static final String OPERATOR_LESS_THAN_OR_EQUAL = "<="; + + protected static final String OPERATOR_GREATER_THAN = ">"; + + protected static final String OPERATOR_GREATER_THAN_OR_EQUAL = ">="; + + protected static final String OPERATOR_NOT_EQUALS = "<>"; + + protected static final String OPERATOR_OR = "OR"; + + protected static final String OPERATOR_AND = "AND"; + + protected static final String OPERATOR_LIKE = "LIKE"; + + protected static final String OPERATOR_IS_NULL = "IS NULL"; + + protected static final String OPERATOR_IS_NOT_NULL = "IS NOT NULL"; + + private static String[] simpleBinaryOperatorValues = + new String[] { + OPERATOR_EQUALS, + OPERATOR_LESS_THAN, + OPERATOR_LESS_THAN_OR_EQUAL, + OPERATOR_GREATER_THAN, + OPERATOR_GREATER_THAN_OR_EQUAL, + OPERATOR_NOT_EQUALS, + OPERATOR_LIKE + }; + private static String[] unaryOperatorValues = + new String[] {OPERATOR_IS_NULL, OPERATOR_IS_NOT_NULL}; + Review Comment: What is the reason to have these constants? why can not we live with what was before? P.S. Some data could be retrieved from `BuiltInFunctionDefinitions` so no need to create duplicate constants ########## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/ParameterizedPredicate.java: ########## @@ -23,18 +23,35 @@ import org.apache.commons.lang3.ArrayUtils; import java.io.Serializable; +import java.util.ArrayList; /** A data class that model parameterized sql predicate. */ @Experimental public class ParameterizedPredicate { private String predicate; private Serializable[] parameters; + private ArrayList<Integer> indexesOfPredicatePlaceHolders = new ArrayList<>(); + + public ArrayList<Integer> getIndexesOfPredicatePlaceHolders() { + return indexesOfPredicatePlaceHolders; + } + + public void setIndexesOfPredicatePlaceHolders( + ArrayList<Integer> indexesOfPredicatePlaceHolders) { Review Comment: ```suggestion List<Integer> indexesOfPredicatePlaceHolders) { ``` It is recommened to use interfaces rather than concrete implementation ########## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java: ########## @@ -97,12 +147,14 @@ private Optional<ParameterizedPredicate> renderBinaryOperator( Optional<ParameterizedPredicate> leftOperandString = allOperands.get(0).accept(this); Optional<ParameterizedPredicate> rightOperandString = allOperands.get(1).accept(this); - - return leftOperandString.flatMap( - left -> rightOperandString.map(right -> left.combine(operator, right))); + Optional<ParameterizedPredicate> renderedParameterizedPredicate = + leftOperandString.flatMap( + left -> rightOperandString.map(right -> left.combine(operator, right))); + return renderedParameterizedPredicate; Review Comment: is this change intentional? ########## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/ParameterizedPredicate.java: ########## @@ -23,18 +23,35 @@ import org.apache.commons.lang3.ArrayUtils; import java.io.Serializable; +import java.util.ArrayList; /** A data class that model parameterized sql predicate. */ @Experimental public class ParameterizedPredicate { private String predicate; private Serializable[] parameters; + private ArrayList<Integer> indexesOfPredicatePlaceHolders = new ArrayList<>(); + + public ArrayList<Integer> getIndexesOfPredicatePlaceHolders() { Review Comment: ```suggestion public List<Integer> getIndexesOfPredicatePlaceHolders() { ``` It is recommened to use interfaces rather than concrete implementation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org