This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 9ad11d899 [INLONG-6594][Sort] Fix ClickHouse connector throw exception when source is change log stream (#6595) 9ad11d899 is described below commit 9ad11d8996d9148c932d7f08a79c7a77911f7cd5 Author: Xin Gong <genzhedangd...@gmail.com> AuthorDate: Mon Nov 28 11:52:52 2022 +0800 [INLONG-6594][Sort] Fix ClickHouse connector throw exception when source is change log stream (#6595) --- .../sort/jdbc/dialect/ClickHouseDialect.java | 95 +++++++++++++++++++++- .../sort/parser/ClickHouseSqlParserTest.java | 2 +- 2 files changed, 94 insertions(+), 3 deletions(-) diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/ClickHouseDialect.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/ClickHouseDialect.java index b9d7f48b0..016f994c1 100644 --- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/ClickHouseDialect.java +++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/ClickHouseDialect.java @@ -18,11 +18,14 @@ package org.apache.inlong.sort.jdbc.dialect; +import org.apache.commons.lang3.tuple.Pair; import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; import org.apache.inlong.sort.jdbc.converter.clickhouse.ClickHouseRowConverter; import org.apache.inlong.sort.jdbc.table.AbstractJdbcDialect; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.List; @@ -36,6 +39,8 @@ import static java.lang.String.format; */ public class ClickHouseDialect extends AbstractJdbcDialect { + public static final Logger LOG = LoggerFactory.getLogger(ClickHouseDialect.class); + // Define MAX/MIN precision of TIMESTAMP type according to ClickHouse docs: // https://clickhouse.com/docs/zh/sql-reference/data-types/datetime64 private static final int MAX_TIMESTAMP_PRECISION = 8; @@ -45,6 +50,7 @@ public class ClickHouseDialect extends AbstractJdbcDialect { // https://clickhouse.com/docs/zh/sql-reference/data-types/decimal/ private static final int MAX_DECIMAL_PRECISION = 128; private static final int MIN_DECIMAL_PRECISION = 32; + private static final String POINT = "."; @Override public String dialectName() { @@ -129,22 +135,46 @@ public class ClickHouseDialect extends AbstractJdbcDialect { @Override public String getUpdateStatement( String tableName, String[] fieldNames, String[] conditionFields) { + List<String> conditionFieldList = Arrays.asList(conditionFields); String setClause = Arrays.stream(fieldNames) + .filter(fieldName -> !conditionFieldList.contains(fieldName)) .map(f -> format("%s = :%s", quoteIdentifier(f), f)) .collect(Collectors.joining(", ")); + String conditionClause = Arrays.stream(conditionFields) .map(f -> format("%s = :%s", quoteIdentifier(f), f)) .collect(Collectors.joining(" AND ")); + Pair<String, String> databaseAndTableName = getDatabaseAndTableName(tableName); return "ALTER TABLE " - + quoteIdentifier(tableName) + + quoteIdentifier(databaseAndTableName.getLeft()) + + POINT + + quoteIdentifier(databaseAndTableName.getRight()) + " UPDATE " + setClause + " WHERE " + conditionClause; } + /** + * ClickHouse throw exception "Table default.test_user doesn't exist". But jdbc-url have database name. + * So we specify database when exec query. This method parse tableName to database and table. + * @param tableName include database.table + * @return pair left is database, right is table + */ + private Pair<String, String> getDatabaseAndTableName(String tableName) { + String databaseName = "default"; + if (tableName.contains(POINT)) { + String[] tableNameArray = tableName.split("\\."); + databaseName = tableNameArray[0]; + tableName = tableNameArray[1]; + } else { + LOG.warn("TableName doesn't include database name, so using default as database name"); + } + return Pair.of(databaseName, tableName); + } + /** * Get delete one row statement by condition fields */ @@ -154,6 +184,67 @@ public class ClickHouseDialect extends AbstractJdbcDialect { Arrays.stream(conditionFields) .map(f -> format("%s = :%s", quoteIdentifier(f), f)) .collect(Collectors.joining(" AND ")); - return "ALTER TABLE " + quoteIdentifier(tableName) + " DELETE WHERE " + conditionClause; + Pair<String, String> databaseAndTableName = getDatabaseAndTableName(tableName); + return "ALTER TABLE " + + quoteIdentifier(databaseAndTableName.getLeft()) + + POINT + + quoteIdentifier(databaseAndTableName.getRight()) + + " DELETE WHERE " + conditionClause; + } + + @Override + public String getInsertIntoStatement(String tableName, String[] fieldNames) { + String columns = + Arrays.stream(fieldNames) + .map(this::quoteIdentifier) + .collect(Collectors.joining(", ")); + String placeholders = + Arrays.stream(fieldNames).map(f -> ":" + f).collect(Collectors.joining(", ")); + Pair<String, String> databaseAndTableName = getDatabaseAndTableName(tableName); + return "INSERT INTO " + + quoteIdentifier(databaseAndTableName.getLeft()) + + POINT + + quoteIdentifier(databaseAndTableName.getRight()) + + "(" + + columns + + ")" + + " VALUES (" + + placeholders + + ")"; + } + + @Override + public String getSelectFromStatement( + String tableName, String[] selectFields, String[] conditionFields) { + String selectExpressions = + Arrays.stream(selectFields) + .map(this::quoteIdentifier) + .collect(Collectors.joining(", ")); + String fieldExpressions = + Arrays.stream(conditionFields) + .map(f -> format("%s = :%s", quoteIdentifier(f), f)) + .collect(Collectors.joining(" AND ")); + Pair<String, String> databaseAndTableName = getDatabaseAndTableName(tableName); + return "SELECT " + + selectExpressions + + " FROM " + + quoteIdentifier(databaseAndTableName.getLeft()) + + POINT + + quoteIdentifier(databaseAndTableName.getRight()) + + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : ""); + } + + @Override + public String getRowExistsStatement(String tableName, String[] conditionFields) { + String fieldExpressions = + Arrays.stream(conditionFields) + .map(f -> format("%s = :%s", quoteIdentifier(f), f)) + .collect(Collectors.joining(" AND ")); + Pair<String, String> pair = getDatabaseAndTableName(tableName); + return "SELECT 1 FROM " + + quoteIdentifier(pair.getLeft()) + + POINT + + quoteIdentifier(pair.getRight()) + + " WHERE " + fieldExpressions; } } diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java index ac35d491e..09e050e0e 100644 --- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java +++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java @@ -77,7 +77,7 @@ public class ClickHouseSqlParserTest { null, 1, null, - "ck_demo", + "demo.ck_demo", "jdbc:clickhouse://localhost:8123/demo", "default", "",