This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ad11d899 [INLONG-6594][Sort] Fix ClickHouse connector throw exception 
when source is change log stream (#6595)
9ad11d899 is described below

commit 9ad11d8996d9148c932d7f08a79c7a77911f7cd5
Author: Xin Gong <genzhedangd...@gmail.com>
AuthorDate: Mon Nov 28 11:52:52 2022 +0800

    [INLONG-6594][Sort] Fix ClickHouse connector throw exception when source is 
change log stream (#6595)
---
 .../sort/jdbc/dialect/ClickHouseDialect.java       | 95 +++++++++++++++++++++-
 .../sort/parser/ClickHouseSqlParserTest.java       |  2 +-
 2 files changed, 94 insertions(+), 3 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/ClickHouseDialect.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/ClickHouseDialect.java
index b9d7f48b0..016f994c1 100644
--- 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/ClickHouseDialect.java
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/ClickHouseDialect.java
@@ -18,11 +18,14 @@
 
 package org.apache.inlong.sort.jdbc.dialect;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.inlong.sort.jdbc.converter.clickhouse.ClickHouseRowConverter;
 import org.apache.inlong.sort.jdbc.table.AbstractJdbcDialect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.List;
@@ -36,6 +39,8 @@ import static java.lang.String.format;
  */
 public class ClickHouseDialect extends AbstractJdbcDialect {
 
+    public static final Logger LOG = 
LoggerFactory.getLogger(ClickHouseDialect.class);
+
     // Define MAX/MIN precision of TIMESTAMP type according to ClickHouse docs:
     // https://clickhouse.com/docs/zh/sql-reference/data-types/datetime64
     private static final int MAX_TIMESTAMP_PRECISION = 8;
@@ -45,6 +50,7 @@ public class ClickHouseDialect extends AbstractJdbcDialect {
     // https://clickhouse.com/docs/zh/sql-reference/data-types/decimal/
     private static final int MAX_DECIMAL_PRECISION = 128;
     private static final int MIN_DECIMAL_PRECISION = 32;
+    private static final String POINT = ".";
 
     @Override
     public String dialectName() {
@@ -129,22 +135,46 @@ public class ClickHouseDialect extends 
AbstractJdbcDialect {
     @Override
     public String getUpdateStatement(
             String tableName, String[] fieldNames, String[] conditionFields) {
+        List<String> conditionFieldList = Arrays.asList(conditionFields);
         String setClause =
                 Arrays.stream(fieldNames)
+                        .filter(fieldName -> 
!conditionFieldList.contains(fieldName))
                         .map(f -> format("%s = :%s", quoteIdentifier(f), f))
                         .collect(Collectors.joining(", "));
+
         String conditionClause =
                 Arrays.stream(conditionFields)
                         .map(f -> format("%s = :%s", quoteIdentifier(f), f))
                         .collect(Collectors.joining(" AND "));
+        Pair<String, String> databaseAndTableName = 
getDatabaseAndTableName(tableName);
         return "ALTER TABLE "
-                + quoteIdentifier(tableName)
+                + quoteIdentifier(databaseAndTableName.getLeft())
+                + POINT
+                + quoteIdentifier(databaseAndTableName.getRight())
                 + " UPDATE "
                 + setClause
                 + " WHERE "
                 + conditionClause;
     }
 
+    /**
+     * ClickHouse throw exception "Table default.test_user doesn't exist". But 
jdbc-url have database name.
+     * So we specify database when exec query. This method parse tableName to 
database and table.
+     * @param tableName include database.table
+     * @return pair left is database, right is table
+     */
+    private Pair<String, String> getDatabaseAndTableName(String tableName) {
+        String databaseName = "default";
+        if (tableName.contains(POINT)) {
+            String[] tableNameArray = tableName.split("\\.");
+            databaseName = tableNameArray[0];
+            tableName = tableNameArray[1];
+        } else {
+            LOG.warn("TableName doesn't include database name, so using 
default as database name");
+        }
+        return Pair.of(databaseName, tableName);
+    }
+
     /**
      * Get delete one row statement by condition fields
      */
@@ -154,6 +184,67 @@ public class ClickHouseDialect extends AbstractJdbcDialect 
{
                 Arrays.stream(conditionFields)
                         .map(f -> format("%s = :%s", quoteIdentifier(f), f))
                         .collect(Collectors.joining(" AND "));
-        return "ALTER TABLE " + quoteIdentifier(tableName) + " DELETE WHERE " 
+ conditionClause;
+        Pair<String, String> databaseAndTableName = 
getDatabaseAndTableName(tableName);
+        return "ALTER TABLE "
+                + quoteIdentifier(databaseAndTableName.getLeft())
+                + POINT
+                + quoteIdentifier(databaseAndTableName.getRight())
+                + " DELETE WHERE " + conditionClause;
+    }
+
+    @Override
+    public String getInsertIntoStatement(String tableName, String[] 
fieldNames) {
+        String columns =
+                Arrays.stream(fieldNames)
+                        .map(this::quoteIdentifier)
+                        .collect(Collectors.joining(", "));
+        String placeholders =
+                Arrays.stream(fieldNames).map(f -> ":" + 
f).collect(Collectors.joining(", "));
+        Pair<String, String> databaseAndTableName = 
getDatabaseAndTableName(tableName);
+        return "INSERT INTO "
+                + quoteIdentifier(databaseAndTableName.getLeft())
+                + POINT
+                + quoteIdentifier(databaseAndTableName.getRight())
+                + "("
+                + columns
+                + ")"
+                + " VALUES ("
+                + placeholders
+                + ")";
+    }
+
+    @Override
+    public String getSelectFromStatement(
+            String tableName, String[] selectFields, String[] conditionFields) 
{
+        String selectExpressions =
+                Arrays.stream(selectFields)
+                        .map(this::quoteIdentifier)
+                        .collect(Collectors.joining(", "));
+        String fieldExpressions =
+                Arrays.stream(conditionFields)
+                        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+                        .collect(Collectors.joining(" AND "));
+        Pair<String, String> databaseAndTableName = 
getDatabaseAndTableName(tableName);
+        return "SELECT "
+                + selectExpressions
+                + " FROM "
+                + quoteIdentifier(databaseAndTableName.getLeft())
+                + POINT
+                + quoteIdentifier(databaseAndTableName.getRight())
+                + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : 
"");
+    }
+
+    @Override
+    public String getRowExistsStatement(String tableName, String[] 
conditionFields) {
+        String fieldExpressions =
+                Arrays.stream(conditionFields)
+                        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+                        .collect(Collectors.joining(" AND "));
+        Pair<String, String> pair = getDatabaseAndTableName(tableName);
+        return "SELECT 1 FROM "
+                + quoteIdentifier(pair.getLeft())
+                + POINT
+                + quoteIdentifier(pair.getRight())
+                + " WHERE " + fieldExpressions;
     }
 }
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java
index ac35d491e..09e050e0e 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java
@@ -77,7 +77,7 @@ public class ClickHouseSqlParserTest {
                 null,
                 1,
                 null,
-                "ck_demo",
+                "demo.ck_demo",
                 "jdbc:clickhouse://localhost:8123/demo",
                 "default",
                 "",

Reply via email to