Maxime Radigue created FLINK-32600: -------------------------------------- Summary: SQL Server JDBC Connector wrong quoteIdentifier + issue when there is no update statement in upsert statement Key: FLINK-32600 URL: https://issues.apache.org/jira/browse/FLINK-32600 Project: Flink Issue Type: Bug Components: Connectors / JDBC Affects Versions: jdbc-3.1.1 Reporter: Maxime Radigue
Hi, i've discovered 2 Issues in the MS SQL JDBC connector. They are located in the file SqlServerDialect.java * the MS SQL Server quote identifier is [identifier]. * Assuming you want to upsert in this sql table: MyTable(id1,id2) with primary key is the coumpound (id1,id2). There is an syntax error because there are no fields to update These are the fix I've tested against the 2 functions involved: *@Override* *public String quoteIdentifier(String identifier) {* return "[" + identifier + "]"; *}* *@Override* *public Optional<String> getUpsertStatement(* String tableName, String[] fieldNames, String[] uniqueKeyFields) { List<String> nonUniqueKeyFields = Arrays.stream(fieldNames) .filter(f -> !Arrays.asList(uniqueKeyFields).contains(f)) .collect(Collectors.toList()); String fieldsProjection = Arrays.stream(fieldNames) .map(this::quoteIdentifier) .collect(Collectors.joining(", ")); String valuesBinding = Arrays.stream(fieldNames) .map(f -> ":" + f + " " + quoteIdentifier(f)) .collect(Collectors.joining(", ")); String usingClause = String.format("SELECT %s", valuesBinding); String onConditions = Arrays.stream(uniqueKeyFields) .map( f -> "[TARGET]." + quoteIdentifier(f) + "=[SOURCE]." + quoteIdentifier(f)) .collect(Collectors.joining(" AND ")); String updateSetClause = nonUniqueKeyFields.stream() .map( f -> "[TARGET]." + quoteIdentifier(f) + "=[SOURCE]." + quoteIdentifier(f)) .collect(Collectors.joining(", ")); String insertValues = Arrays.stream(fieldNames) .map(f -> "[SOURCE]." + quoteIdentifier(f)) .collect(Collectors.joining(", ")); StringBuilder sb = new StringBuilder(); sb.append( String.format( "MERGE INTO %s AS [TARGET] USING (%s) AS [SOURCE] ON (%s)", quoteIdentifier(tableName), usingClause, onConditions)); if (StringUtils.isNotEmpty(updateSetClause)) { sb.append(String.format(" WHEN MATCHED THEN UPDATE SET %s", updateSetClause)); } sb.append( String.format( " WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s);", fieldsProjection, insertValues)); return Optional.of(sb.toString()); *}* -- This message was sent by Atlassian Jira (v8.20.10#820010)