This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 61f9034d1d8 Refactor PipelineSQLBuilderEngine (#27199)
61f9034d1d8 is described below

commit 61f9034d1d8cb80f825a8f61df8d3ac133c2ce5b
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Jul 14 19:23:33 2023 +0800

    Refactor PipelineSQLBuilderEngine (#27199)
    
    * Refactor PipelineSQLBuilderEngine
    
    * Refactor PipelineSQLBuilderEngine
    
    * Refactor PipelineSQLBuilderEngine
---
 .../spi/sqlbuilder/DialectPipelineSQLBuilder.java  |  2 +-
 .../sqlbuilder/PipelineSQLBuilderEngine.java       | 35 ++++++++--------------
 .../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java  |  2 +-
 .../sqlbuilder/MySQLPipelineSQLBuilderTest.java    |  4 +--
 .../sqlbuilder/OpenGaussPipelineSQLBuilder.java    |  2 +-
 .../OpenGaussPipelineSQLBuilderTest.java           |  6 ++--
 .../sqlbuilder/PostgreSQLPipelineSQLBuilder.java   |  2 +-
 .../PostgreSQLPipelineSQLBuilderTest.java          |  2 +-
 8 files changed, 23 insertions(+), 32 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/DialectPipelineSQLBuilder.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/DialectPipelineSQLBuilder.java
index 0cd993db1b3..cf87727c563 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/DialectPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/DialectPipelineSQLBuilder.java
@@ -46,7 +46,7 @@ public interface DialectPipelineSQLBuilder extends 
DatabaseTypedSPI {
      * @param dataRecord data record
      * @return on duplicate clause of insert SQL
      */
-    default Optional<String> buildInsertSQLOnDuplicateClause(String 
schemaName, DataRecord dataRecord) {
+    default Optional<String> buildInsertOnDuplicateClause(String schemaName, 
DataRecord dataRecord) {
         return Optional.empty();
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
index 3d9e02b3a46..fd08c5d254e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineSQLBuilderEngine.java
@@ -78,22 +78,16 @@ public final class PipelineSQLBuilderEngine {
     public String buildInsertSQL(final String schemaName, final DataRecord 
dataRecord) {
         String sqlCacheKey = INSERT_SQL_CACHE_KEY_PREFIX + 
dataRecord.getTableName();
         if (!sqlCacheMap.containsKey(sqlCacheKey)) {
-            sqlCacheMap.put(sqlCacheKey, buildInsertSQLInternal(schemaName, 
dataRecord.getTableName(), dataRecord.getColumns()));
+            String insertMainClause = buildInsertMainClause(schemaName, 
dataRecord.getTableName(), dataRecord.getColumns());
+            sqlCacheMap.put(sqlCacheKey, 
dialectSQLBuilder.buildInsertOnDuplicateClause(schemaName, 
dataRecord).map(optional -> insertMainClause + " " + 
optional).orElse(insertMainClause));
         }
-        String insertSQL = sqlCacheMap.get(sqlCacheKey);
-        return dialectSQLBuilder.buildInsertSQLOnDuplicateClause(schemaName, 
dataRecord).map(optional -> insertSQL + " " + optional).orElse(insertSQL);
+        return sqlCacheMap.get(sqlCacheKey);
     }
     
-    private String buildInsertSQLInternal(final String schemaName, final 
String tableName, final List<Column> columns) {
-        StringBuilder columnsLiteral = new StringBuilder();
-        StringBuilder holder = new StringBuilder();
-        for (Column each : columns) {
-            columnsLiteral.append(String.format("%s,", 
sqlSegmentBuilder.getEscapedIdentifier(each.getName())));
-            holder.append("?,");
-        }
-        columnsLiteral.setLength(columnsLiteral.length() - 1);
-        holder.setLength(holder.length() - 1);
-        return String.format("INSERT INTO %s(%s) VALUES(%s)", 
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName), columnsLiteral, 
holder);
+    private String buildInsertMainClause(final String schemaName, final String 
tableName, final List<Column> columns) {
+        String columnsLiteral = columns.stream().map(each -> 
sqlSegmentBuilder.getEscapedIdentifier(each.getName())).collect(Collectors.joining(","));
+        String valuesLiteral = columns.stream().map(each -> 
"?").collect(Collectors.joining(","));
+        return String.format("INSERT INTO %s(%s) VALUES(%s)", 
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName), columnsLiteral, 
valuesLiteral);
     }
     
     /**
@@ -107,18 +101,15 @@ public final class PipelineSQLBuilderEngine {
     public String buildUpdateSQL(final String schemaName, final DataRecord 
dataRecord, final Collection<Column> conditionColumns) {
         String sqlCacheKey = UPDATE_SQL_CACHE_KEY_PREFIX + 
dataRecord.getTableName();
         if (!sqlCacheMap.containsKey(sqlCacheKey)) {
-            sqlCacheMap.put(sqlCacheKey, buildUpdateSQLInternal(schemaName, 
dataRecord.getTableName(), conditionColumns));
+            String updateMainClause = buildUpdateMainClause(schemaName, 
dataRecord.getTableName(), extractUpdatedColumns(dataRecord), conditionColumns);
+            sqlCacheMap.put(sqlCacheKey, updateMainClause);
         }
-        StringBuilder updatedColumnString = new StringBuilder();
-        for (Column each : extractUpdatedColumns(dataRecord)) {
-            updatedColumnString.append(String.format("%s = ?,", 
sqlSegmentBuilder.getEscapedIdentifier(each.getName())));
-        }
-        updatedColumnString.setLength(updatedColumnString.length() - 1);
-        return String.format(sqlCacheMap.get(sqlCacheKey), 
updatedColumnString);
+        return sqlCacheMap.get(sqlCacheKey);
     }
     
-    private String buildUpdateSQLInternal(final String schemaName, final 
String tableName, final Collection<Column> conditionColumns) {
-        return String.format("UPDATE %s SET %%s WHERE %s", 
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName), 
buildWhereSQL(conditionColumns));
+    private String buildUpdateMainClause(final String schemaName, final String 
tableName, final Collection<Column> setColumns, final Collection<Column> 
conditionColumns) {
+        String updateSetClause = setColumns.stream().map(each -> 
sqlSegmentBuilder.getEscapedIdentifier(each.getName()) + " = 
?").collect(Collectors.joining(","));
+        return String.format("UPDATE %s SET %s WHERE %s", 
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName), 
updateSetClause, buildWhereSQL(conditionColumns));
     }
     
     /**
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index f99330bdfda..21593060770 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -33,7 +33,7 @@ import java.util.Optional;
 public final class MySQLPipelineSQLBuilder implements 
DialectPipelineSQLBuilder {
     
     @Override
-    public Optional<String> buildInsertSQLOnDuplicateClause(final String 
schemaName, final DataRecord dataRecord) {
+    public Optional<String> buildInsertOnDuplicateClause(final String 
schemaName, final DataRecord dataRecord) {
         StringBuilder result = new StringBuilder("ON DUPLICATE KEY UPDATE ");
         PipelineSQLSegmentBuilder sqlSegmentBuilder = new 
PipelineSQLSegmentBuilder(getType());
         for (int i = 0; i < dataRecord.getColumnCount(); i++) {
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
index 50b725c6ccc..bedd29843f6 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
@@ -35,13 +35,13 @@ class MySQLPipelineSQLBuilderTest {
     
     @Test
     void assertBuildInsertSQLOnDuplicateClause() {
-        String actual = sqlBuilder.buildInsertSQLOnDuplicateClause(null, 
mockDataRecord("t1")).orElse(null);
+        String actual = sqlBuilder.buildInsertOnDuplicateClause(null, 
mockDataRecord("t1")).orElse(null);
         assertThat(actual, is("ON DUPLICATE KEY UPDATE 
c1=VALUES(c1),c2=VALUES(c2),c3=VALUES(c3)"));
     }
     
     @Test
     void assertBuildInsertSQLOnDuplicateClauseHasShardingColumn() {
-        String actual = sqlBuilder.buildInsertSQLOnDuplicateClause(null, 
mockDataRecord("t2")).orElse(null);
+        String actual = sqlBuilder.buildInsertOnDuplicateClause(null, 
mockDataRecord("t2")).orElse(null);
         assertThat(actual, is("ON DUPLICATE KEY UPDATE 
c1=VALUES(c1),c2=VALUES(c2),c3=VALUES(c3)"));
     }
     
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index bf4ddb6cfe6..e70c1f42c0d 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -38,7 +38,7 @@ public final class OpenGaussPipelineSQLBuilder implements 
DialectPipelineSQLBuil
     }
     
     @Override
-    public Optional<String> buildInsertSQLOnDuplicateClause(final String 
schemaName, final DataRecord dataRecord) {
+    public Optional<String> buildInsertOnDuplicateClause(final String 
schemaName, final DataRecord dataRecord) {
         StringBuilder result = new StringBuilder("ON DUPLICATE KEY UPDATE ");
         PipelineSQLSegmentBuilder sqlSegmentBuilder = new 
PipelineSQLSegmentBuilder(getType());
         for (int i = 0; i < dataRecord.getColumnCount(); i++) {
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
 
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
index 22283a5e2f3..20112e39018 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
@@ -32,12 +32,12 @@ class OpenGaussPipelineSQLBuilderTest {
     
     @Test
     void assertBuildInsertSQLOnDuplicatePart() {
-        String actual = sqlBuilder.buildInsertSQLOnDuplicateClause(null, 
mockDataRecord("t1")).orElse(null);
+        String actual = sqlBuilder.buildInsertOnDuplicateClause(null, 
mockDataRecord()).orElse(null);
         assertThat(actual, is("ON DUPLICATE KEY UPDATE 
c0=EXCLUDED.c0,c1=EXCLUDED.c1,c2=EXCLUDED.c2,c3=EXCLUDED.c3"));
     }
     
-    private DataRecord mockDataRecord(final String tableName) {
-        DataRecord result = new DataRecord(IngestDataChangeType.INSERT, 
tableName, new PlaceholderPosition(), 4);
+    private DataRecord mockDataRecord() {
+        DataRecord result = new DataRecord(IngestDataChangeType.INSERT, "t1", 
new PlaceholderPosition(), 4);
         result.addColumn(new Column("id", "", false, true));
         result.addColumn(new Column("c0", "", false, false));
         result.addColumn(new Column("c1", "", true, false));
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index 65c6e954a73..fbc415c4736 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -39,7 +39,7 @@ public final class PostgreSQLPipelineSQLBuilder implements 
DialectPipelineSQLBui
     }
     
     @Override
-    public Optional<String> buildInsertSQLOnDuplicateClause(final String 
schemaName, final DataRecord dataRecord) {
+    public Optional<String> buildInsertOnDuplicateClause(final String 
schemaName, final DataRecord dataRecord) {
         // TODO without unique key, job has been interrupted, which may lead 
to data duplication
         if (dataRecord.getUniqueKeyValue().isEmpty()) {
             return Optional.empty();
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
index 3c55505c0b4..a2d4e99b7e9 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
@@ -34,7 +34,7 @@ class PostgreSQLPipelineSQLBuilderTest {
     
     @Test
     void assertBuildInsertSQLOnDuplicateClause() {
-        String actual = sqlBuilder.buildInsertSQLOnDuplicateClause("schema1", 
mockDataRecord()).orElse(null);
+        String actual = sqlBuilder.buildInsertOnDuplicateClause("schema1", 
mockDataRecord()).orElse(null);
         assertThat(actual, is("ON CONFLICT (order_id) DO UPDATE SET 
user_id=EXCLUDED.user_id,status=EXCLUDED.status"));
     }
     

Reply via email to