This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 54d7e9c90b9 Improve Postgres generate slot name at scaling (#19864)
54d7e9c90b9 is described below
commit 54d7e9c90b91fa404c3dc88fbb8c0e7e89c2b62e
Author: Xinze Guo <[email protected]>
AuthorDate: Thu Aug 4 18:24:22 2022 +0800
Improve Postgres generate slot name at scaling (#19864)
---
.../ingest/OpenGaussPositionInitializer.java | 5 +++-
.../ingest/PostgreSQLPositionInitializer.java | 5 +++-
.../postgresql/ingest/PostgreSQLWalDumperTest.java | 1 +
.../data/pipeline/cases/base/BaseITCase.java | 27 ++++++++--------------
4 files changed, 19 insertions(+), 19 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
index e30e6d844ce..c05d33c4e3c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.opengauss.ingest;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.digest.DigestUtils;
import
org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
@@ -127,7 +128,9 @@ public final class OpenGaussPositionInitializer implements
PositionInitializer {
* @throws SQLException failed when getCatalog
*/
public static String getUniqueSlotName(final Connection connection) throws
SQLException {
- return String.format("%s_%s", SLOT_NAME_PREFIX,
connection.getCatalog());
+ // same as PostgreSQL, but length over 64 will throw an exception
directly
+ String slotName = DigestUtils.md5Hex(connection.getCatalog());
+ return String.format("%s_%s", SLOT_NAME_PREFIX, slotName);
}
@Override
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
index 34275e4aef4..178f4e5c3c4 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.digest.DigestUtils;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
@@ -128,7 +129,9 @@ public final class PostgreSQLPositionInitializer implements
PositionInitializer
* @throws SQLException failed when getCatalog
*/
public static String getUniqueSlotName(final Connection connection) throws
SQLException {
- return String.format("%s_%s", SLOT_NAME_PREFIX,
connection.getCatalog());
+ // PostgreSQL slot name maximum length can't exceed 64,automatic
truncation when the length exceeds the limit
+ String slotName = DigestUtils.md5Hex(connection.getCatalog());
+ return String.format("%s_%s", SLOT_NAME_PREFIX, slotName);
}
@Override
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
index 1c18128a0a6..8c0912d1874 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
@@ -115,6 +115,7 @@ public final class PostgreSQLWalDumperTest {
ReflectionUtil.setFieldValue(walDumper, "logicalReplication",
logicalReplication);
when(logicalReplication.createConnection(dataSourceConfig)).thenReturn(pgConnection);
when(pgConnection.unwrap(PgConnection.class)).thenReturn(pgConnection);
+ when(pgConnection.getCatalog()).thenReturn("test_db");
when(logicalReplication.createReplicationStream(pgConnection,
PostgreSQLPositionInitializer.getUniqueSlotName(pgConnection),
position.getLogSequenceNumber()))
.thenReturn(pgReplicationStream);
ByteBuffer data = ByteBuffer.wrap("table public.t_order_0: DELETE:
order_id[integer]:1".getBytes());
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index f0cf37a191d..318e2c96829 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -23,6 +23,7 @@ import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
@@ -64,7 +65,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -355,24 +356,16 @@ public abstract class BaseITCase {
}
log.info("jobId: {}", jobId);
Set<String> actualStatus = null;
- for (int i = 0; i < 15; i++) {
- actualStatus = new HashSet<>();
+ for (int i = 0; i < 20; i++) {
List<Map<String, Object>> showScalingStatusResult =
showScalingStatus(jobId);
log.info("show scaling status result: {}",
showScalingStatusResult);
- boolean finished = true;
- for (Map<String, Object> each : showScalingStatusResult) {
- String status = each.get("status").toString();
- assertThat(status, not(JobStatus.PREPARING_FAILURE.name()));
- assertThat(status,
not(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name()));
- assertThat(status,
not(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name()));
- actualStatus.add(status);
- if (!Objects.equals(status,
JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
- log.info("scaling status before increment, status: {}",
status);
- finished = false;
- break;
- }
- }
- if (finished) {
+ actualStatus = showScalingStatusResult.stream().map(each ->
each.get("status").toString()).collect(Collectors.toSet());
+ assertFalse(CollectionUtils.containsAny(actualStatus,
Arrays.asList(JobStatus.PREPARING_FAILURE.name(),
JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name(),
+ JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name())));
+ if (actualStatus.size() == 1 &&
actualStatus.contains(JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
+ break;
+ } else if (actualStatus.size() >= 1 &&
actualStatus.containsAll(new HashSet<>(Arrays.asList("",
JobStatus.EXECUTE_INCREMENTAL_TASK.name())))) {
+ log.error("one of the shardingItem was not started correctly");
break;
}
ThreadUtil.sleep(2, TimeUnit.SECONDS);