This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d67ba69b5b4 Add test fields for openGauss at pipeline E2E (#25608)
d67ba69b5b4 is described below

commit d67ba69b5b4e38af3fbd7c0c849c875ee3d97f07
Author: Xinze Guo <[email protected]>
AuthorDate: Mon May 15 10:23:57 2023 +0800

    Add test fields for openGauss at pipeline E2E (#25608)
    
    * Add test fields for openGauss/PostgreSQL at pipeline E2E
    
    * Add jdbcUrl parameter for fix bit(n) column migration
    
    * Data match add array equals method
    
    * Improve column names
---
 ...DataMatchDataConsistencyCalculateAlgorithm.java |  3 +
 .../OpenGaussJdbcQueryPropertiesExtension.java     |  2 +
 .../ingest/wal/OpenGaussColumnValueReader.java}    | 48 ++++++-------
 ...ta.pipeline.spi.ingest.dumper.ColumnValueReader | 18 +++++
 .../OpenGaussJdbcQueryPropertiesExtensionTest.java |  2 +-
 .../ingest/PostgreSQLColumnValueReader.java        |  9 ---
 .../pipeline/cases/PipelineContainerComposer.java  | 54 +++++++-------
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java |  6 +-
 .../cases/migration/AbstractMigrationE2EIT.java    |  8 +--
 .../general/PostgreSQLMigrationGeneralE2EIT.java   |  8 +--
 .../pipeline/cases/task/E2EIncrementalTask.java    | 29 ++++++--
 .../framework/helper/PipelineCaseHelper.java       | 22 +++++-
 .../resources/env/scenario/general/opengauss.xml   | 82 ++++++++++++++++++++++
 13 files changed, 210 insertions(+), 81 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index c6ea5031b44..01b30ae060f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -42,6 +42,7 @@ import 
org.apache.shardingsphere.infra.util.spi.annotation.SPIDescription;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 
 import java.math.BigDecimal;
+import java.sql.Array;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -296,6 +297,8 @@ public final class 
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
                         matched = ((SQLXML) 
thisResult).getString().equals(((SQLXML) thatResult).getString());
                     } else if (thisResult instanceof BigDecimal && thatResult 
instanceof BigDecimal) {
                         matched = 
DataConsistencyCheckUtils.isBigDecimalEquals((BigDecimal) thisResult, 
(BigDecimal) thatResult);
+                    } else if (thisResult instanceof Array && thatResult 
instanceof Array) {
+                        matched = Objects.deepEquals(((Array) 
thisResult).getArray(), ((Array) thatResult).getArray());
                     } else {
                         matched = equalsBuilder.append(thisResult, 
thatResult).isEquals();
                     }
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtension.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtension.java
index c2952349a32..616631728d9 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtension.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtension.java
@@ -30,6 +30,8 @@ public final class OpenGaussJdbcQueryPropertiesExtension 
implements JdbcQueryPro
     
     @Override
     public Properties extendQueryProperties() {
+        queryProps.setProperty("stringtype", "unspecified");
+        queryProps.setProperty("bitToString", "true");
         return queryProps;
     }
     
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussColumnValueReader.java
similarity index 54%
copy from 
kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
copy to 
kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussColumnValueReader.java
index a84844018bb..b299be37ac0 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussColumnValueReader.java
@@ -15,62 +15,58 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
+package org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal;
 
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractColumnValueReader;
-import org.postgresql.util.PGobject;
 
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Types;
-import java.util.Collection;
-import java.util.Collections;
 
 /**
- * Column value reader for PostgreSQL.
+ * Column value reader for openGauss.
  */
-public final class PostgreSQLColumnValueReader extends 
AbstractColumnValueReader {
+public final class OpenGaussColumnValueReader extends 
AbstractColumnValueReader {
     
-    private static final Collection<String> TYPE_ALIASES = 
Collections.singletonList("openGauss");
+    private static final String MONEY_TYPE = "money";
     
-    private static final String PG_MONEY_TYPE = "money";
+    private static final String BIT_TYPE = "bit";
     
-    private static final String PG_BIT_TYPE = "bit";
+    private static final String BOOL_TYPE = "bool";
     
     @Override
     protected Object doReadValue(final ResultSet resultSet, final 
ResultSetMetaData metaData, final int columnIndex) throws SQLException {
-        if (isPgMoneyType(metaData, columnIndex)) {
+        if (isMoneyType(metaData, columnIndex)) {
             return resultSet.getBigDecimal(columnIndex);
         }
-        if (isPgBitType(metaData, columnIndex)) {
-            PGobject result = new PGobject();
-            result.setType("bit");
-            Object bitValue = resultSet.getObject(columnIndex);
-            result.setValue(null == bitValue ? null : (Boolean) bitValue ? "1" 
: "0");
-            return result;
+        if (isBitType(metaData, columnIndex)) {
+            // openGauss JDBC driver can't parse bit(n) correctly when n > 1, 
so JDBC url already add bitToString, there just return string
+            return resultSet.getString(columnIndex);
+        }
+        if (isBoolType(metaData, columnIndex)) {
+            return resultSet.getBoolean(columnIndex);
         }
         return super.defaultDoReadValue(resultSet, metaData, columnIndex);
     }
     
-    private boolean isPgMoneyType(final ResultSetMetaData resultSetMetaData, 
final int index) throws SQLException {
-        return 
PG_MONEY_TYPE.equalsIgnoreCase(resultSetMetaData.getColumnTypeName(index));
+    private boolean isMoneyType(final ResultSetMetaData resultSetMetaData, 
final int index) throws SQLException {
+        return 
MONEY_TYPE.equalsIgnoreCase(resultSetMetaData.getColumnTypeName(index));
+    }
+    
+    private boolean isBoolType(final ResultSetMetaData resultSetMetaData, 
final int index) throws SQLException {
+        return 
BOOL_TYPE.equalsIgnoreCase(resultSetMetaData.getColumnTypeName(index));
     }
     
-    private boolean isPgBitType(final ResultSetMetaData resultSetMetaData, 
final int index) throws SQLException {
+    private boolean isBitType(final ResultSetMetaData resultSetMetaData, final 
int index) throws SQLException {
         if (Types.BIT == resultSetMetaData.getColumnType(index)) {
-            return 
PG_BIT_TYPE.equalsIgnoreCase(resultSetMetaData.getColumnTypeName(index));
+            return 
BIT_TYPE.equalsIgnoreCase(resultSetMetaData.getColumnTypeName(index));
         }
         return false;
     }
     
     @Override
     public String getType() {
-        return "PostgreSQL";
-    }
-    
-    @Override
-    public Collection<String> getTypeAliases() {
-        return TYPE_ALIASES;
+        return "openGauss";
     }
 }
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
 
b/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
new file mode 100644
index 00000000000..c2616b6fa02
--- /dev/null
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.OpenGaussColumnValueReader
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtensionTest.java
 
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtensionTest.java
index 97a7227bd65..9968d05953d 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtensionTest.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtensionTest.java
@@ -40,6 +40,6 @@ class OpenGaussJdbcQueryPropertiesExtensionTest {
     private void assertExtension(final JdbcQueryPropertiesExtension actual) {
         assertThat(actual, 
instanceOf(OpenGaussJdbcQueryPropertiesExtension.class));
         assertThat(actual.getType(), equalTo("openGauss"));
-        assertTrue(actual.extendQueryProperties().isEmpty());
+        assertThat(actual.extendQueryProperties().size(), equalTo(2));
     }
 }
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
index a84844018bb..0cfdbdb6cdb 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
@@ -24,16 +24,12 @@ import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Types;
-import java.util.Collection;
-import java.util.Collections;
 
 /**
  * Column value reader for PostgreSQL.
  */
 public final class PostgreSQLColumnValueReader extends 
AbstractColumnValueReader {
     
-    private static final Collection<String> TYPE_ALIASES = 
Collections.singletonList("openGauss");
-    
     private static final String PG_MONEY_TYPE = "money";
     
     private static final String PG_BIT_TYPE = "bit";
@@ -68,9 +64,4 @@ public final class PostgreSQLColumnValueReader extends 
AbstractColumnValueReader
     public String getType() {
         return "PostgreSQL";
     }
-    
-    @Override
-    public Collection<String> getTypeAliases() {
-        return TYPE_ALIASES;
-    }
 }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index 19b903cf565..99a8fb2fe11 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -123,7 +123,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
         }
         extraSQLCommand = 
JAXB.unmarshal(Objects.requireNonNull(PipelineContainerComposer.class.getClassLoader().getResource(testParam.getScenario())),
 ExtraSQLCommand.class);
         containerComposer.start();
-        sourceDataSource = 
StorageContainerUtils.generateDataSource(appendExtraParameter(getActualJdbcUrlTemplate(DS_0,
 false)), username, password);
+        sourceDataSource = 
StorageContainerUtils.generateDataSource(getActualJdbcUrlTemplate(DS_0, false), 
username, password);
         proxyDataSource = StorageContainerUtils.generateDataSource(
                 
appendExtraParameter(containerComposer.getProxyJdbcUrl(PROXY_DATABASE)), 
ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD);
         init(jobType);
@@ -198,7 +198,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     
     /**
      * Append extra parameter.
-     * 
+     *
      * @param jdbcUrl JDBC URL
      * @return appended JDBC URL
      */
@@ -207,14 +207,14 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
             return new JdbcUrlAppender().appendQueryProperties(jdbcUrl, 
PropertiesBuilder.build(new Property("rewriteBatchedStatements", 
Boolean.TRUE.toString())));
         }
         if (DatabaseTypeUtils.isPostgreSQL(databaseType) || 
DatabaseTypeUtils.isOpenGauss(databaseType)) {
-            return new JdbcUrlAppender().appendQueryProperties(jdbcUrl, 
PropertiesBuilder.build(new Property("stringtype", "unspecified")));
+            return new JdbcUrlAppender().appendQueryProperties(jdbcUrl, 
PropertiesBuilder.build(new Property("stringtype", "unspecified"), new 
Property("bitToString", Boolean.TRUE.toString())));
         }
         return jdbcUrl;
     }
     
     /**
      * Register storage unit.
-     * 
+     *
      * @param storageUnitName storage unit name
      * @throws SQLException SQL exception
      */
@@ -222,13 +222,13 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
         String registerStorageUnitTemplate = "REGISTER STORAGE UNIT ${ds} ( 
URL='${url}', USER='${user}', PASSWORD='${password}')".replace("${ds}", 
storageUnitName)
                 .replace("${user}", getUsername())
                 .replace("${password}", getPassword())
-                .replace("${url}", 
appendExtraParameter(getActualJdbcUrlTemplate(storageUnitName, true)));
+                .replace("${url}", getActualJdbcUrlTemplate(storageUnitName, 
true));
         proxyExecuteWithLog(registerStorageUnitTemplate, 2);
     }
     
     /**
      * Add resource.
-     * 
+     *
      * @param distSQL dist SQL
      * @throws SQLException SQL exception
      */
@@ -239,7 +239,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     
     /**
      * Get actual JDBC URL template.
-     * 
+     *
      * @param databaseName database name
      * @param isInContainer is in container
      * @param storageContainerIndex storage container index
@@ -257,18 +257,18 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     
     /**
      * Get actual JDBC URL template.
-     * 
+     *
      * @param databaseName database name
      * @param isInContainer is in container
      * @return actual JDBC URL template
      */
     public String getActualJdbcUrlTemplate(final String databaseName, final 
boolean isInContainer) {
-        return getActualJdbcUrlTemplate(databaseName, isInContainer, 0);
+        return appendExtraParameter(getActualJdbcUrlTemplate(databaseName, 
isInContainer, 0));
     }
     
     /**
      * Create schema.
-     * 
+     *
      * @param connection connection
      * @param sleepSeconds sleep seconds
      * @throws SQLException SQL exception
@@ -286,8 +286,8 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     
     /**
      * Create source order table.
-     * 
-     * @param sourceTableName source table name 
+     *
+     * @param sourceTableName source table name
      * @throws SQLException SQL exception
      */
     public void createSourceOrderTable(final String sourceTableName) throws 
SQLException {
@@ -296,7 +296,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     
     /**
      * Create source table index list.
-     * 
+     *
      * @param schema schema
      * @param sourceTableName source table name
      * @throws SQLException SQL exception
@@ -311,7 +311,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     
     /**
      * Create source comment on list.
-     * 
+     *
      * @param schema schema
      * @param sourceTableName source table name
      * @throws SQLException SQL exception
@@ -322,7 +322,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     
     /**
      * Create source order item table.
-     * 
+     *
      * @throws SQLException SQL exception
      */
     public void createSourceOrderItemTable() throws SQLException {
@@ -331,7 +331,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     
     /**
      * Source execute with log.
-     * 
+     *
      * @param sql SQL
      * @throws SQLException SQL exception
      */
@@ -344,7 +344,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     
     /**
      * Proxy execute with log.
-     * 
+     *
      * @param sql SQL
      * @param sleepSeconds sleep seconds
      * @throws SQLException SQL exception
@@ -360,7 +360,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     
     /**
      * Wait job prepare success.
-     * 
+     *
      * @param distSQL dist SQL
      */
     @SneakyThrows(InterruptedException.class)
@@ -376,7 +376,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     
     /**
      * Query for list with log.
-     * 
+     *
      * @param sql SQL
      * @return query result
      * @throws RuntimeException runtime exception
@@ -399,7 +399,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     
     /**
      * Transform result set to list.
-     * 
+     *
      * @param resultSet result set
      * @return transformed result
      * @throws SQLException SQL exception
@@ -420,7 +420,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     
     /**
      * Start increment task.
-     * 
+     *
      * @param baseIncrementTask base increment task
      */
     public void startIncrementTask(final BaseIncrementTask baseIncrementTask) {
@@ -430,7 +430,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     
     /**
      * Wait increment task finished.
-     * 
+     *
      * @param distSQL dist SQL
      * @return result
      * @throws InterruptedException interrupted exception
@@ -464,7 +464,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     
     /**
      * Assert proxy order record exist.
-     * 
+     *
      * @param tableName table name
      * @param orderId order id
      */
@@ -480,7 +480,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     
     /**
      * Assert proxy order record exist.
-     * 
+     *
      * @param sql SQL
      */
     @SneakyThrows(InterruptedException.class)
@@ -499,7 +499,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     
     /**
      * Get target table records count.
-     * 
+     *
      * @param tableName table name
      * @return target table records count
      */
@@ -511,7 +511,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     
     /**
      * Assert greater than order table init rows.
-     * 
+     *
      * @param tableInitRows table init rows
      * @param schema schema
      */
@@ -523,7 +523,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     
     /**
      * Generate ShardingSphere data source from proxy.
-     * 
+     *
      * @return ShardingSphere data source
      * @throws SQLException SQL exception
      */
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 133bd0e8495..1967d7d97be 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -79,7 +79,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  */
 @PipelineE2ESettings(database = {
         @PipelineE2EDatabaseSettings(type = "MySQL", scenarioFiles = 
"env/scenario/general/mysql.xml"),
-        @PipelineE2EDatabaseSettings(type = "openGauss", scenarioFiles = 
"env/scenario/general/postgresql.xml")})
+        @PipelineE2EDatabaseSettings(type = "openGauss", scenarioFiles = 
"env/scenario/general/opengauss.xml")})
 @Slf4j
 class CDCE2EIT {
     
@@ -108,7 +108,7 @@ class CDCE2EIT {
             }
             createOrderTableRule(containerComposer);
             try (Connection connection = 
containerComposer.getProxyDataSource().getConnection()) {
-                initSchemaAndTable(containerComposer, connection, 2);
+                initSchemaAndTable(containerComposer, connection, 3);
             }
             DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
             Pair<List<Object[]>, List<Object[]>> dataPair = 
PipelineCaseHelper.generateFullInsertData(containerComposer.getDatabaseType(), 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
@@ -170,7 +170,7 @@ class CDCE2EIT {
     }
     
     private void startCDCClient(final PipelineContainerComposer 
containerComposer) {
-        DataSource dataSource = 
StorageContainerUtils.generateDataSource(containerComposer.appendExtraParameter(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4,
 false)),
+        DataSource dataSource = 
StorageContainerUtils.generateDataSource(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4,
 false),
                 containerComposer.getUsername(), 
containerComposer.getPassword());
         StartCDCClientParameter parameter = new StartCDCClientParameter();
         parameter.setAddress("localhost");
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index a3fa80535d2..60d7960e9ed 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -61,16 +61,16 @@ public abstract class AbstractMigrationE2EIT {
         }
         String addSourceResource = 
migrationDistSQL.getRegisterMigrationSourceStorageUnitTemplate().replace("${user}",
 containerComposer.getUsername())
                 .replace("${password}", containerComposer.getPassword())
-                .replace("${ds0}", 
containerComposer.appendExtraParameter(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_0,
 true)));
+                .replace("${ds0}", 
containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_0, 
true));
         containerComposer.addResource(addSourceResource);
     }
     
     protected void addMigrationTargetResource(final PipelineContainerComposer 
containerComposer) throws SQLException {
         String addTargetResource = 
migrationDistSQL.getRegisterMigrationTargetStorageUnitTemplate().replace("${user}",
 containerComposer.getUsername())
                 .replace("${password}", containerComposer.getPassword())
-                .replace("${ds2}", 
containerComposer.appendExtraParameter(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_2,
 true)))
-                .replace("${ds3}", 
containerComposer.appendExtraParameter(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_3,
 true)))
-                .replace("${ds4}", 
containerComposer.appendExtraParameter(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4,
 true)));
+                .replace("${ds2}", 
containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_2, 
true))
+                .replace("${ds3}", 
containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_3, 
true))
+                .replace("${ds4}", 
containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, 
true));
         containerComposer.addResource(addTargetResource);
         List<Map<String, Object>> resources = 
containerComposer.queryForListWithLog("SHOW STORAGE UNITS from sharding_db");
         assertThat(resources.size(), is(3));
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index f518c43e58d..51fff681a5c 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -47,11 +47,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @PipelineE2ESettings(database = {
         @PipelineE2EDatabaseSettings(type = "PostgreSQL", scenarioFiles = 
"env/scenario/general/postgresql.xml"),
-        @PipelineE2EDatabaseSettings(type = "openGauss", scenarioFiles = 
"env/scenario/general/postgresql.xml")})
+        @PipelineE2EDatabaseSettings(type = "openGauss", scenarioFiles = 
"env/scenario/general/opengauss.xml")})
 @Slf4j
 class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
     
-    private static final String SOURCE_TABLE_NAME = "t_order_copy";
+    private static final String SOURCE_TABLE_NAME = "t_order";
     
     private static final String TARGET_TABLE_NAME = "t_order";
     
@@ -97,12 +97,12 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
     private void checkOrderMigration(final PipelineContainerComposer 
containerComposer, final String jobId) throws SQLException, 
InterruptedException {
         containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
         stopMigrationByJobId(containerComposer, jobId);
+        // must refresh firstly, otherwise proxy can't get schema and table 
info
+        containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA;", 2);
         long recordId = new SnowflakeKeyGenerateAlgorithm().generateKey();
         containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s 
(order_id,user_id,status) VALUES (%s, %s, '%s')",
                 String.join(".", PipelineContainerComposer.SCHEMA_NAME, 
SOURCE_TABLE_NAME), recordId, 1, "afterStop"));
         startMigrationByJobId(containerComposer, jobId);
-        // must refresh firstly, otherwise proxy can't get schema and table 
info
-        containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA;", 2);
         Awaitility.await().atMost(5L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog(
                 String.format("SELECT * FROM %s WHERE order_id = %s", 
String.join(".", PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME), 
recordId)).isEmpty());
         containerComposer.assertProxyOrderRecordExist(String.join(".", 
PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME), recordId);
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
index 0e211b4e7e6..72049d85340 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
@@ -19,9 +19,11 @@ package 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.task;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import 
org.apache.shardingsphere.infra.database.type.SchemaSupportedDatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import 
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+import 
org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
 import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.BaseIncrementTask;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
@@ -40,6 +42,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
 @RequiredArgsConstructor
@@ -53,6 +56,10 @@ public final class E2EIncrementalTask extends 
BaseIncrementTask {
     private static final List<String> POSTGRESQL_COLUMN_NAMES = 
Arrays.asList("order_id", "user_id", "status", "t_int2", "t_numeric", "t_bool", 
"t_bytea", "t_char", "t_varchar", "t_float",
             "t_double", "t_json", "t_jsonb", "t_text", "t_date", "t_time", 
"t_timestamp", "t_timestamptz");
     
+    private static final List<String> OPENGAUSS_COLUMN_NAMES = 
Arrays.asList("order_id", "user_id", "status", "c_int", "c_smallint", 
"c_float", "c_double", "c_numeric", "c_boolean", "c_char",
+            "c_text", "c_bytea", "c_date", "c_time", "c_smalldatetime", 
"c_timestamp", "c_timestamptz", "c_interval", "c_array", "c_json", "c_jsonb", 
"c_uuid", "c_hash32", "c_tsvector", "c_bit",
+            "c_int4range", "c_reltime", "c_abstime", "c_point", "c_lseg", 
"c_box", "c_circle", "c_bitvarying", "c_cidr", "c_inet", "c_macaddr", "c_hll");
+    
     private final DataSource dataSource;
     
     private final String orderTableName;
@@ -88,8 +95,10 @@ public final class E2EIncrementalTask extends 
BaseIncrementTask {
         String sql;
         if (databaseType instanceof MySQLDatabaseType) {
             sql = SQLBuilderUtils.buildInsertSQL(MYSQL_COLUMN_NAMES, 
orderTableName);
-        } else if (databaseType instanceof SchemaSupportedDatabaseType) {
+        } else if (databaseType instanceof PostgreSQLDatabaseType) {
             sql = SQLBuilderUtils.buildInsertSQL(POSTGRESQL_COLUMN_NAMES, 
orderTableName);
+        } else if (databaseType instanceof OpenGaussDatabaseType) {
+            sql = SQLBuilderUtils.buildInsertSQL(OPENGAUSS_COLUMN_NAMES, 
orderTableName);
         } else {
             throw new UnsupportedOperationException();
         }
@@ -110,13 +119,25 @@ public final class E2EIncrementalTask extends 
BaseIncrementTask {
             DataSourceExecuteUtils.execute(dataSource, sql, parameters);
             return;
         }
-        if (databaseType instanceof SchemaSupportedDatabaseType) {
+        if (databaseType instanceof PostgreSQLDatabaseType) {
             String sql = 
SQLBuilderUtils.buildUpdateSQL(ignoreShardingColumns(POSTGRESQL_COLUMN_NAMES), 
orderTableName, "?");
-            Object[] parameters = {"中文测试", randomInt, 
BigDecimal.valueOf(10000), true, new byte[]{}, "char", "varchar", 
PipelineCaseHelper.generateFloat(),
+            Object[] parameters = {"中文测试", randomInt, 
BigDecimal.valueOf(10000), random.nextBoolean(), new byte[]{}, "char", 
"varchar", PipelineCaseHelper.generateFloat(),
                     PipelineCaseHelper.generateDouble(), 
PipelineCaseHelper.generateJsonString(10, true), 
PipelineCaseHelper.generateJsonString(20, true), "text-update", LocalDate.now(),
                     LocalTime.now(), Timestamp.valueOf(LocalDateTime.now()), 
OffsetDateTime.now(), orderId};
             log.info("update sql: {}, params: {}", sql, parameters);
             DataSourceExecuteUtils.execute(dataSource, sql, parameters);
+            return;
+        }
+        if (databaseType instanceof OpenGaussDatabaseType) {
+            LocalDateTime now = LocalDateTime.now();
+            String sql = 
SQLBuilderUtils.buildUpdateSQL(ignoreShardingColumns(OPENGAUSS_COLUMN_NAMES), 
orderTableName, "?");
+            Object[] parameters = {"中文测试", randomInt, random.nextInt(-999, 
999), PipelineCaseHelper.generateFloat(), PipelineCaseHelper.generateDouble(), 
BigDecimal.valueOf(10000),
+                    random.nextBoolean(), "update-char", "update-text", 
"update-bytea".getBytes(), now.toLocalDate().plusDays(1), 
now.toLocalTime().plusHours(6), "2023-03-01", now,
+                    OffsetDateTime.now(), "1 years 1 mons 1 days 1 hours 1 
mins 1 secs", "{4, 5, 6}", PipelineCaseHelper.generateJsonString(1, true), 
PipelineCaseHelper.generateJsonString(1, false),
+                    UUID.randomUUID().toString(), 
DigestUtils.md5Hex(now.toString()), null, "1111", "[1,10000)", "2 years 2 mons 
2 days 06:00:00", "2023-01-01 00:00:00+00", "(2.0,2.0)",
+                    "[(0.0,0.0),(3.0,3.0)]", "(1.0,1.0),(3.0,3.0)", 
"<(5.0,5.0),1.0>", "1010", "192.168.0.0/24", "192.168.1.1", 
"08:00:3b:01:02:03", null, orderId};
+            log.info("update sql: {}, params: {}", sql, parameters);
+            DataSourceExecuteUtils.execute(dataSource, sql, parameters);
         }
     }
     
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java
index c6b9cf1ec82..04a3400cb46 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java
@@ -21,11 +21,13 @@ import com.google.common.base.Strings;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import 
org.apache.shardingsphere.infra.database.type.SchemaSupportedDatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import 
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+import 
org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
 import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
 
@@ -41,6 +43,7 @@ import java.time.OffsetDateTime;
 import java.time.Year;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
@@ -91,7 +94,7 @@ public final class PipelineCaseHelper {
             }
             return result;
         }
-        if (databaseType instanceof SchemaSupportedDatabaseType) {
+        if (databaseType instanceof PostgreSQLDatabaseType) {
             for (int i = 0; i < insertRows; i++) {
                 Object orderId = keyGenerateAlgorithm.generateKey();
                 result.add(new Object[]{orderId, generateInt(0, 100), 
generateString(6), generateInt(-128, 127),
@@ -101,6 +104,19 @@ public final class PipelineCaseHelper {
             }
             return result;
         }
+        if (databaseType instanceof OpenGaussDatabaseType) {
+            for (int i = 0; i < insertRows; i++) {
+                Object orderId = keyGenerateAlgorithm.generateKey();
+                // TODO openGauss mpp plugin parses single quotes incorrectly
+                result.add(new Object[]{orderId, generateInt(0, 1000), 
"status" + i, generateInt(-1000, 9999), generateInt(0, 100), generateFloat(), 
generateDouble(),
+                        BigDecimal.valueOf(generateDouble()), false, 
generateString(6), "texts", "bytea".getBytes(), LocalDate.now(), 
LocalTime.now(), "2001-10-01",
+                        Timestamp.valueOf(LocalDateTime.now()), 
OffsetDateTime.now(), "0 years 0 mons 1 days 2 hours 3 mins 4 secs", "{1, 2, 
3}", generateJsonString(8, false),
+                        generateJsonString(8, true), 
UUID.randomUUID().toString(), DigestUtils.md5Hex(orderId.toString()), null, 
"0000", "[1,1000)",
+                        "1 years 1 mons 10 days -06:00:00", "2000-01-02 
00:00:00+00", "(1.0,1.0)", "[(0.0,0.0),(2.0,2.0)]", "(3.0,3.0),(1.0,1.0)", 
"<(5.0,5.0),5.0>", "1111",
+                        "192.168.0.0/16", "192.168.1.1", "08:00:2b:01:02:03", 
"\\x484c4c00000000002b05000000000000000000000000000000000000"});
+            }
+            return result;
+        }
         throw new UnsupportedOperationException("now support generate %s 
insert data");
     }
     
@@ -109,7 +125,7 @@ public final class PipelineCaseHelper {
     }
     
     private static String generateString(final int strLength) {
-        return RandomStringUtils.randomAlphabetic(strLength);
+        return RandomStringUtils.randomAlphanumeric(strLength);
     }
     
     /**
diff --git 
a/test/e2e/operation/pipeline/src/test/resources/env/scenario/general/opengauss.xml
 
b/test/e2e/operation/pipeline/src/test/resources/env/scenario/general/opengauss.xml
new file mode 100644
index 00000000000..c7da6ef3ab4
--- /dev/null
+++ 
b/test/e2e/operation/pipeline/src/test/resources/env/scenario/general/opengauss.xml
@@ -0,0 +1,82 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<command>
+    <create-table-order>
+        create table test.t_order (
+        order_id bigint,
+        user_id integer,
+        status character varying(50),
+        c_int integer,
+        c_smallint smallint,
+        c_float real,
+        c_double double precision,
+        c_numeric numeric(10,2),
+        c_boolean boolean,
+        c_char character(32),
+        c_text text,
+        c_bytea bytea,
+        c_date date,
+        c_time time without time zone,
+        c_smalldatetime smalldatetime,
+        c_timestamp timestamp without time zone,
+        c_timestamptz timestamp with time zone,
+        c_interval interval,
+        c_array integer[],
+        c_json json,
+        c_jsonb jsonb,
+        c_uuid uuid,
+        c_hash32 hash32,
+        c_tsvector tsvector,
+        c_bit bit(4),
+        c_int4range int4range,
+        c_reltime reltime,
+        c_abstime abstime,
+        c_point point,
+        c_lseg lseg,
+        c_box box,
+        c_circle circle,
+        c_bitvarying bit varying(32),
+        c_cidr cidr,
+        c_inet inet,
+        c_macaddr macaddr,
+        c_hll hll(14,10,12,0),
+        PRIMARY KEY ( order_id )
+        );
+    </create-table-order>
+
+    <full-insert-order>
+        INSERT INTO test.t_order (
+        order_id, user_id, status, c_int, c_smallint, c_float, c_double, 
c_numeric, c_boolean, c_char, c_text, c_bytea, c_date, c_time,
+        c_smalldatetime, c_timestamp, c_timestamptz, c_interval, c_array, 
c_json, c_jsonb, c_uuid, c_hash32, c_tsvector, c_bit,
+        c_int4range, c_reltime, c_abstime, c_point, c_lseg, c_box, c_circle, 
c_bitvarying, c_cidr, c_inet, c_macaddr, c_hll
+        ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
+    </full-insert-order>
+
+    <create-table-order-item>
+        CREATE TABLE test.t_order_item (
+        item_id int8 NOT NULL,
+        order_id int8 NOT NULL,
+        user_id int4 NOT NULL,
+        status varchar(50),
+        PRIMARY KEY (item_id)
+        )
+    </create-table-order-item>
+
+    <full-insert-order-item>
+        INSERT INTO test.t_order_item(item_id,order_id,user_id,status) 
VALUES(?,?,?,?)
+    </full-insert-order-item>
+</command>


Reply via email to