Maxime Radigue created FLINK-32600:
--------------------------------------

             Summary: SQL Server JDBC Connector wrong quoteIdentifier + issue 
when there is no update statement in upsert statement
                 Key: FLINK-32600
                 URL: https://issues.apache.org/jira/browse/FLINK-32600
             Project: Flink
          Issue Type: Bug
          Components: Connectors / JDBC
    Affects Versions: jdbc-3.1.1
            Reporter: Maxime Radigue


Hi, i've discovered 2 Issues in the MS SQL JDBC connector. They are located in 
the file SqlServerDialect.java

 
 * the MS SQL Server quote identifier is [identifier].
 * Assuming you want to upsert in this sql table: MyTable(id1,id2) with primary 
key is the coumpound (id1,id2). There is an syntax error because there are no 
fields to update

 

These are the fix I've tested against the 2 functions involved:

 

    *@Override*
    *public String quoteIdentifier(String identifier) {*
        return "[" + identifier + "]";
    *}*


    *@Override*
    *public Optional<String> getUpsertStatement(*
            String tableName, String[] fieldNames, String[] uniqueKeyFields) {
        List<String> nonUniqueKeyFields =
                Arrays.stream(fieldNames)
                        .filter(f -> 
!Arrays.asList(uniqueKeyFields).contains(f))
                        .collect(Collectors.toList());
        String fieldsProjection =
                Arrays.stream(fieldNames)
                        .map(this::quoteIdentifier)
                        .collect(Collectors.joining(", "));

        String valuesBinding =
                Arrays.stream(fieldNames)
                        .map(f -> ":" + f + " " + quoteIdentifier(f))
                        .collect(Collectors.joining(", "));

        String usingClause = String.format("SELECT %s", valuesBinding);
        String onConditions =
                Arrays.stream(uniqueKeyFields)
                        .map(
                                f ->
                                        "[TARGET]."
                                                + quoteIdentifier(f)
                                                + "=[SOURCE]."
                                                + quoteIdentifier(f))
                        .collect(Collectors.joining(" AND "));
        String updateSetClause =
                nonUniqueKeyFields.stream()
                        .map(
                                f ->
                                        "[TARGET]."
                                                + quoteIdentifier(f)
                                                + "=[SOURCE]."
                                                + quoteIdentifier(f))
                        .collect(Collectors.joining(", "));

        String insertValues =
                Arrays.stream(fieldNames)
                        .map(f -> "[SOURCE]." + quoteIdentifier(f))
                        .collect(Collectors.joining(", "));

        StringBuilder sb = new StringBuilder();
        sb.append(
                String.format(
                        "MERGE INTO %s AS [TARGET] USING (%s) AS [SOURCE] ON 
(%s)",
                        quoteIdentifier(tableName), usingClause, onConditions));
        if (StringUtils.isNotEmpty(updateSetClause)) {
            sb.append(String.format(" WHEN MATCHED THEN UPDATE SET %s", 
updateSetClause));
        }

        sb.append(
                String.format(
                        " WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s);",
                        fieldsProjection, insertValues));

        return Optional.of(sb.toString());
    *}*

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to