This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5d3e77914e7 Pipeline support parallel decoding plugin of openGauss
(#30498)
5d3e77914e7 is described below
commit 5d3e77914e788061528a5b7ca0156baa505789b9
Author: Xinze Guo <[email protected]>
AuthorDate: Mon Mar 18 19:16:36 2024 +0800
Pipeline support parallel decoding plugin of openGauss (#30498)
* Support parallel decoding plugin
* Add enmotech/opengauss:3.0.0
* Improve
* Rename method
---
.github/workflows/e2e-operation.yml | 4 +-
.../opengauss/ingest/OpenGaussWALDumper.java | 66 ++++++++++++++++++----
.../ingest/wal/OpenGaussLogicalReplication.java | 16 +++++-
.../ingest/wal/decode/MppdbDecodingPlugin.java | 30 ++++++++--
.../opengauss/ingest/OpenGaussWALDumperTest.java | 41 ++++++++++++++
.../ingest/wal/decode/MppdbDecodingPluginTest.java | 59 +++++++++++++------
.../ingest/wal/decode/TestDecodingPlugin.java | 2 +-
.../postgresql/ingest/wal/event/BeginTXEvent.java | 4 +-
.../ingest/wal/WALEventConverterTest.java | 2 +-
9 files changed, 184 insertions(+), 40 deletions(-)
diff --git a/.github/workflows/e2e-operation.yml
b/.github/workflows/e2e-operation.yml
index 8b32ceb6b18..372ed5d1d74 100644
--- a/.github/workflows/e2e-operation.yml
+++ b/.github/workflows/e2e-operation.yml
@@ -60,12 +60,12 @@ jobs:
fail-fast: false
matrix:
operation: [ transaction, pipeline, showprocesslist ]
- image: [ { type: "it.docker.mysql.version", version: "mysql:5.7" }, {
type: "it.docker.postgresql.version", version: "postgres:12-alpine" }, { type:
"it.docker.opengauss.version", version: "enmotech/opengauss:2.1.0" } ]
+ image: [ { type: "it.docker.mysql.version", version: "mysql:5.7" }, {
type: "it.docker.postgresql.version", version: "postgres:12-alpine" }, { type:
"it.docker.opengauss.version", version:
"enmotech/opengauss:2.1.0,enmotech/opengauss:3.1.0" } ]
exclude:
- operation: showprocesslist
image: { type: "it.docker.postgresql.version", version:
"postgres:12-alpine" }
- operation: showprocesslist
- image: { type: "it.docker.opengauss.version", version:
"enmotech/opengauss:2.1.0" }
+ image: { type: "it.docker.opengauss.version", version:
"enmotech/opengauss:2.1.0,enmotech/opengauss:3.1.0" }
steps:
- env:
changed_operations: ${{
needs.detect-changed-files.outputs.changed_operations }}
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
index e8437ad7970..3aeac60402c 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
@@ -19,15 +19,15 @@ package
org.apache.shardingsphere.data.pipeline.opengauss.ingest;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
+import
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumper;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
import
org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.OpenGaussLogicalReplication;
import
org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.MppdbDecodingPlugin;
import
org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber;
@@ -46,12 +46,18 @@ import org.opengauss.jdbc.PgConnection;
import org.opengauss.replication.PGReplicationStream;
import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
/**
* WAL dumper of openGauss.
@@ -60,6 +66,10 @@ import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public final class OpenGaussWALDumper extends
AbstractPipelineLifecycleRunnable implements IncrementalDumper {
+ private static final Pattern VERSION_PATTERN =
Pattern.compile("^\\(openGauss (\\d)");
+
+ private static final int DEFAULT_VERSION = 2;
+
private final IncrementalDumperContext dumperContext;
private final AtomicReference<WALPosition> walPosition;
@@ -74,6 +84,8 @@ public final class OpenGaussWALDumper extends
AbstractPipelineLifecycleRunnable
private List<AbstractRowEvent> rowEvents = new LinkedList<>();
+ private final AtomicReference<Long> currentCsn = new AtomicReference<>();
+
public OpenGaussWALDumper(final IncrementalDumperContext dumperContext,
final IngestPosition position,
final PipelineChannel channel, final
PipelineTableMetaDataLoader metaDataLoader) {
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperContext.getCommonContext().getDataSourceConfig().getClass()),
@@ -110,10 +122,11 @@ public final class OpenGaussWALDumper extends
AbstractPipelineLifecycleRunnable
@SneakyThrows(InterruptedException.class)
private void dump() throws SQLException {
PGReplicationStream stream = null;
+ int majorVersion = getMajorVersion();
try (PgConnection connection = getReplicationConnectionUnwrap()) {
stream = logicalReplication.createReplicationStream(connection,
walPosition.get().getLogSequenceNumber(),
-
OpenGaussIngestPositionManager.getUniqueSlotName(connection,
dumperContext.getJobId()));
- DecodingPlugin decodingPlugin = new MppdbDecodingPlugin(new
OpenGaussTimestampUtils(connection.getTimestampUtils()), decodeWithTX);
+
OpenGaussIngestPositionManager.getUniqueSlotName(connection,
dumperContext.getJobId()), majorVersion);
+ DecodingPlugin decodingPlugin = new MppdbDecodingPlugin(new
OpenGaussTimestampUtils(connection.getTimestampUtils()), decodeWithTX,
majorVersion >= 3);
while (isRunning()) {
ByteBuffer message = stream.readPending();
if (null == message) {
@@ -122,7 +135,7 @@ public final class OpenGaussWALDumper extends
AbstractPipelineLifecycleRunnable
}
AbstractWALEvent event = decodingPlugin.decode(message, new
OpenGaussLogSequenceNumber(stream.getLastReceiveLSN()));
if (decodeWithTX) {
- processEventWithTX(event);
+ processEventWithTX(event, majorVersion);
} else {
processEventIgnoreTX(event);
}
@@ -138,28 +151,61 @@ public final class OpenGaussWALDumper extends
AbstractPipelineLifecycleRunnable
}
}
+ private int getMajorVersion() throws SQLException {
+ StandardPipelineDataSourceConfiguration dataSourceConfig =
(StandardPipelineDataSourceConfiguration)
dumperContext.getCommonContext().getDataSourceConfig();
+ try (
+ Connection connection =
DriverManager.getConnection(dataSourceConfig.getUrl(),
dataSourceConfig.getUsername(), dataSourceConfig.getPassword());
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery("SELECT
version()")) {
+ resultSet.next();
+ String versionText = resultSet.getString(1);
+ return parseMajorVersion(versionText);
+ }
+ }
+
+ private int parseMajorVersion(final String versionText) {
+ Matcher matcher = VERSION_PATTERN.matcher(versionText);
+ boolean isFind = matcher.find();
+ log.info("openGauss major version={}, `select version()`={}", isFind ?
matcher.group(1) : DEFAULT_VERSION, versionText);
+ if (isFind) {
+ return Integer.parseInt(matcher.group(1));
+ }
+ return DEFAULT_VERSION;
+ }
+
private PgConnection getReplicationConnectionUnwrap() throws SQLException {
return
logicalReplication.createConnection((StandardPipelineDataSourceConfiguration)
dumperContext.getCommonContext().getDataSourceConfig()).unwrap(PgConnection.class);
}
- private void processEventWithTX(final AbstractWALEvent event) {
+ private void processEventWithTX(final AbstractWALEvent event, final int
majorVersion) {
if (event instanceof BeginTXEvent) {
+ if (majorVersion < 3) {
+ return;
+ }
+ if (!rowEvents.isEmpty()) {
+ log.warn("Commit event parse have problem, there still has
uncommitted row events size={}, ", rowEvents.size());
+ }
+ currentCsn.set(((BeginTXEvent) event).getCsn());
return;
}
if (event instanceof AbstractRowEvent) {
- rowEvents.add((AbstractRowEvent) event);
+ AbstractRowEvent rowEvent = (AbstractRowEvent) event;
+ rowEvent.setCsn(currentCsn.get());
+ rowEvents.add(rowEvent);
return;
}
if (event instanceof CommitTXEvent) {
- Long csn = ((CommitTXEvent) event).getCsn();
List<Record> records = new LinkedList<>();
for (AbstractRowEvent each : rowEvents) {
- each.setCsn(csn);
+ if (majorVersion < 3) {
+ each.setCsn(((CommitTXEvent) event).getCsn());
+ }
records.add(walEventConverter.convert(each));
}
records.add(walEventConverter.convert(event));
channel.push(records);
rowEvents = new LinkedList<>();
+ currentCsn.set(null);
}
}
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java
index 082134dc0f2..cc8a37d9389 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java
@@ -28,6 +28,7 @@ import org.opengauss.PGProperty;
import org.opengauss.jdbc.PgConnection;
import org.opengauss.replication.LogSequenceNumber;
import org.opengauss.replication.PGReplicationStream;
+import org.opengauss.replication.fluent.logical.ChainedLogicalStreamBuilder;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -86,17 +87,26 @@ public final class OpenGaussLogicalReplication {
* @param connection connection
* @param startPosition start position
* @param slotName slot name
+ * @param majorVersion version
* @return replication stream
* @throws SQLException SQL exception
*/
- public PGReplicationStream createReplicationStream(final PgConnection
connection, final BaseLogSequenceNumber startPosition, final String slotName)
throws SQLException {
- return connection.getReplicationAPI()
+ public PGReplicationStream createReplicationStream(final PgConnection
connection, final BaseLogSequenceNumber startPosition, final String slotName,
+ final int majorVersion)
throws SQLException {
+ ChainedLogicalStreamBuilder logicalStreamBuilder =
connection.getReplicationAPI()
.replicationStream()
.logical()
.withSlotName(slotName)
.withSlotOption("include-xids", true)
.withSlotOption("skip-empty-xacts", true)
- .withStartPosition((LogSequenceNumber) startPosition.get())
+ .withStartPosition((LogSequenceNumber) startPosition.get());
+ if (majorVersion < 3) {
+ return logicalStreamBuilder.start();
+ }
+ return logicalStreamBuilder
+ .withSlotOption("parallel-decode-num", 10)
+ .withSlotOption("decode-style", "j")
+ .withSlotOption("sending-batch", 0)
.start();
}
}
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
index 8f7396a5182..f15ce626cad 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
@@ -57,9 +57,7 @@ public final class MppdbDecodingPlugin implements
DecodingPlugin {
private final boolean decodeWithTX;
- public MppdbDecodingPlugin(final BaseTimestampUtils timestampUtils) {
- this(timestampUtils, false);
- }
+ private final boolean decodeParallelly;
@Override
public AbstractWALEvent decode(final ByteBuffer data, final
BaseLogSequenceNumber logSequenceNumber) {
@@ -77,10 +75,18 @@ public final class MppdbDecodingPlugin implements
DecodingPlugin {
}
private AbstractWALEvent decodeDataWithTX(final String dataText) {
+ if (decodeParallelly) {
+ return decodeParallelly(dataText);
+ } else {
+ return decodeSerially(dataText);
+ }
+ }
+
+ private AbstractWALEvent decodeSerially(final String dataText) {
AbstractWALEvent result = new PlaceholderEvent();
if (dataText.startsWith("BEGIN")) {
int beginIndex = dataText.indexOf("BEGIN") + "BEGIN".length() + 1;
- result = new
BeginTXEvent(Long.parseLong(dataText.substring(beginIndex)));
+ result = new
BeginTXEvent(Long.parseLong(dataText.substring(beginIndex)), null);
} else if (dataText.startsWith("COMMIT")) {
int commitBeginIndex = dataText.indexOf("COMMIT") +
"COMMIT".length() + 1;
int csnBeginIndex = dataText.indexOf("CSN") + "CSN".length() + 1;
@@ -91,6 +97,22 @@ public final class MppdbDecodingPlugin implements
DecodingPlugin {
return result;
}
+ private AbstractWALEvent decodeParallelly(final String dataText) {
+ AbstractWALEvent result = new PlaceholderEvent();
+ if (dataText.startsWith("BEGIN")) {
+ int beginIndex = dataText.indexOf("CSN:") + "CSN:".length() + 1;
+ int firstLsnIndex = dataText.indexOf("first_lsn");
+ long csn = firstLsnIndex > 0 ?
Long.parseLong(dataText.substring(beginIndex, firstLsnIndex - 1)) : 0L;
+ result = new BeginTXEvent(null, csn);
+ } else if (dataText.startsWith("commit") ||
dataText.startsWith("COMMIT")) {
+ int beginIndex = dataText.indexOf("xid:") + "xid:".length() + 1;
+ result = new
CommitTXEvent(Long.parseLong(dataText.substring(beginIndex)), null);
+ } else if (dataText.startsWith("{")) {
+ result = readTableEvent(dataText);
+ }
+ return result;
+ }
+
private AbstractWALEvent decodeDataIgnoreTX(final String dataText) {
return dataText.startsWith("{") ? readTableEvent(dataText) : new
PlaceholderEvent();
}
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumperTest.java
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumperTest.java
new file mode 100644
index 00000000000..21abeb075b4
--- /dev/null
+++
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumperTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.opengauss.ingest;
+
+import org.apache.shardingsphere.infra.util.reflection.ReflectionUtils;
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+
+class OpenGaussWALDumperTest {
+
+ @Test
+ void assertGetVersion() throws NoSuchMethodException {
+ OpenGaussWALDumper dumper = mock(OpenGaussWALDumper.class);
+ int version =
ReflectionUtils.invokeMethod(OpenGaussWALDumper.class.getDeclaredMethod("parseMajorVersion",
String.class), dumper,
+ "(openGauss 3.1.0 build ) compiled at 2023-02-17 16:13:51
commit 0 last mr on x86_64-unknown-linux-gnu, compiled by g++ (GCC) 7.3.0,
64-bit");
+ assertThat(version, is(3));
+ OpenGaussWALDumper mock = mock(OpenGaussWALDumper.class);
+ version =
ReflectionUtils.invokeMethod(OpenGaussWALDumper.class.getDeclaredMethod("parseMajorVersion",
String.class), mock, "(openGauss 5.0.1 build )");
+ assertThat(version, is(5));
+ version =
ReflectionUtils.invokeMethod(OpenGaussWALDumper.class.getDeclaredMethod("parseMajorVersion",
String.class), mock, "not match");
+ assertThat(version, is(2));
+ }
+}
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java
index 221add916f7..3e7726d598a 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java
@@ -45,6 +45,7 @@ import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -65,7 +66,7 @@ class MppdbDecodingPluginTest {
tableData.setColumnsName(IntStream.range(0,
insertTypes.length).mapToObj(idx -> "data" + idx).toArray(String[]::new));
tableData.setColumnsVal(IntStream.range(0,
insertTypes.length).mapToObj(idx -> "'1 2 3'").toArray(String[]::new));
ByteBuffer data =
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
- WriteRowEvent actual = (WriteRowEvent) new
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+ WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null,
false, false).decode(data, logSequenceNumber);
assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
assertThat(actual.getTableName(), is("test"));
IntStream.range(0, insertTypes.length).forEach(each ->
assertThat(actual.getAfterRow().get(each), is("1 2 3")));
@@ -80,7 +81,7 @@ class MppdbDecodingPluginTest {
tableData.setColumnsType(new String[]{"character varying"});
tableData.setColumnsVal(new String[]{"'1 2 3'"});
ByteBuffer data =
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
- UpdateRowEvent actual = (UpdateRowEvent) new
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+ UpdateRowEvent actual = (UpdateRowEvent) new MppdbDecodingPlugin(null,
false, false).decode(data, logSequenceNumber);
assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
assertThat(actual.getTableName(), is("test"));
assertThat(actual.getAfterRow().get(0), is("1 2 3"));
@@ -97,7 +98,7 @@ class MppdbDecodingPluginTest {
tableData.setOldKeysName(IntStream.range(0,
deleteTypes.length).mapToObj(idx -> "data" + idx).toArray(String[]::new));
tableData.setOldKeysVal(deleteValues);
ByteBuffer data =
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
- DeleteRowEvent actual = (DeleteRowEvent) new
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+ DeleteRowEvent actual = (DeleteRowEvent) new MppdbDecodingPlugin(null,
false, false).decode(data, logSequenceNumber);
assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
assertThat(actual.getTableName(), is("test"));
IntStream.range(0, deleteTypes.length).forEach(each ->
assertThat(actual.getPrimaryKeys().get(each).toString(),
is(deleteValues[each])));
@@ -112,7 +113,7 @@ class MppdbDecodingPluginTest {
tableData.setColumnsType(new String[]{"money"});
tableData.setColumnsVal(new String[]{"'$1.08'"});
ByteBuffer data =
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
- WriteRowEvent actual = (WriteRowEvent) new
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+ WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null,
false, false).decode(data, logSequenceNumber);
assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
assertThat(actual.getTableName(), is("test"));
Object byteaObj = actual.getAfterRow().get(0);
@@ -128,7 +129,7 @@ class MppdbDecodingPluginTest {
tableData.setColumnsType(new String[]{"boolean"});
tableData.setColumnsVal(new String[]{Boolean.TRUE.toString()});
ByteBuffer data =
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
- WriteRowEvent actual = (WriteRowEvent) new
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+ WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null,
false, false).decode(data, logSequenceNumber);
assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
assertThat(actual.getTableName(), is("test"));
Object byteaObj = actual.getAfterRow().get(0);
@@ -155,7 +156,7 @@ class MppdbDecodingPluginTest {
when(timestampUtils.toTimestamp(null,
"2010-12-12")).thenReturn(Timestamp.valueOf("2010-12-12 00:00:00.0"));
when(timestampUtils.toTimestamp(null, "2013-12-11
pst")).thenReturn(Timestamp.valueOf("2013-12-11 16:00:00.0"));
when(timestampUtils.toTimestamp(null, "2003-04-12
04:05:06")).thenReturn(Timestamp.valueOf("2003-04-12 04:05:00.0"));
- WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(new
OpenGaussTimestampUtils(timestampUtils)).decode(data, logSequenceNumber);
+ WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(new
OpenGaussTimestampUtils(timestampUtils), false, false).decode(data,
logSequenceNumber);
assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
assertThat(actual.getTableName(), is("test"));
IntStream.range(0, insertTypes.length).forEach(each ->
assertThat(actual.getAfterRow().get(each).toString(), is(compareValues[each])));
@@ -170,7 +171,7 @@ class MppdbDecodingPluginTest {
tableData.setColumnsType(new String[]{"bytea"});
tableData.setColumnsVal(new String[]{"'\\xff00ab'"});
ByteBuffer data =
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
- WriteRowEvent actual = (WriteRowEvent) new
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+ WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null,
false, false).decode(data, logSequenceNumber);
assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
assertThat(actual.getTableName(), is("test"));
Object byteaObj = actual.getAfterRow().get(0);
@@ -187,7 +188,7 @@ class MppdbDecodingPluginTest {
tableData.setColumnsType(new String[]{"raw"});
tableData.setColumnsVal(new String[]{"'7D'"});
ByteBuffer data =
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
- WriteRowEvent actual = (WriteRowEvent) new
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+ WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null,
false, false).decode(data, logSequenceNumber);
assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
assertThat(actual.getTableName(), is("test"));
Object byteaObj = actual.getAfterRow().get(0);
@@ -198,7 +199,7 @@ class MppdbDecodingPluginTest {
@Test
void assertDecodeUnknownTableType() {
ByteBuffer data = ByteBuffer.wrap("unknown".getBytes());
- assertThat(new MppdbDecodingPlugin(null).decode(data,
logSequenceNumber), instanceOf(PlaceholderEvent.class));
+ assertThat(new MppdbDecodingPlugin(null, false, false).decode(data,
logSequenceNumber), instanceOf(PlaceholderEvent.class));
}
@Test
@@ -210,7 +211,7 @@ class MppdbDecodingPluginTest {
tableData.setColumnsType(new String[]{"character varying"});
tableData.setColumnsVal(new String[]{"1 2 3"});
ByteBuffer data =
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
- assertThrows(IngestException.class, () -> new
MppdbDecodingPlugin(null).decode(data, logSequenceNumber));
+ assertThrows(IngestException.class, () -> new
MppdbDecodingPlugin(null, false, false).decode(data, logSequenceNumber));
}
@Test
@@ -224,11 +225,11 @@ class MppdbDecodingPluginTest {
TimestampUtils timestampUtils = mock(TimestampUtils.class);
when(timestampUtils.toTime(null, "1 2 3")).thenThrow(new
SQLException(""));
ByteBuffer data =
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
- assertThrows(DecodingException.class, () -> new
MppdbDecodingPlugin(new OpenGaussTimestampUtils(timestampUtils),
true).decode(data, logSequenceNumber));
+ assertThrows(DecodingException.class, () -> new
MppdbDecodingPlugin(new OpenGaussTimestampUtils(timestampUtils), true,
false).decode(data, logSequenceNumber));
}
@Test
- void assertDecodeWithXid() {
+ void assertDecodeWithTx() {
MppTableData tableData = new MppTableData();
tableData.setTableName("public.test");
tableData.setOpType("INSERT");
@@ -237,7 +238,7 @@ class MppdbDecodingPluginTest {
tableData.setColumnsVal(new String[]{"'7D'"});
List<String> dataList = Arrays.asList("BEGIN 1",
JsonUtils.toJsonString(tableData), JsonUtils.toJsonString(tableData),
"COMMIT 1 (at 2022-10-27 04:19:39.476261+00) CSN 3468");
- MppdbDecodingPlugin mppdbDecodingPlugin = new
MppdbDecodingPlugin(null, true);
+ MppdbDecodingPlugin mppdbDecodingPlugin = new
MppdbDecodingPlugin(null, true, false);
List<AbstractWALEvent> expectedEvent = new LinkedList<>();
for (String each : dataList) {
expectedEvent.add(mppdbDecodingPlugin.decode(ByteBuffer.wrap(each.getBytes()),
logSequenceNumber));
@@ -245,13 +246,35 @@ class MppdbDecodingPluginTest {
assertThat(expectedEvent.size(), is(4));
AbstractWALEvent actualFirstEvent = expectedEvent.get(0);
assertInstanceOf(BeginTXEvent.class, actualFirstEvent);
- assertThat(((BeginTXEvent) actualFirstEvent).getXid(), is(1L));
AbstractWALEvent actualLastEvent =
expectedEvent.get(expectedEvent.size() - 1);
assertInstanceOf(CommitTXEvent.class, actualLastEvent);
assertThat(((CommitTXEvent) actualLastEvent).getCsn(), is(3468L));
assertThat(((CommitTXEvent) actualLastEvent).getXid(), is(1L));
}
+ @Test
+ void assertParallelDecodeWithTx() {
+ MppTableData tableData = new MppTableData();
+ tableData.setTableName("public.test");
+ tableData.setOpType("INSERT");
+ tableData.setColumnsName(new String[]{"data"});
+ tableData.setColumnsType(new String[]{"raw"});
+ tableData.setColumnsVal(new String[]{"'7D'"});
+ List<String> dataList = Arrays.asList("BEGIN CSN: 951909 first_lsn:
5/59825858", JsonUtils.toJsonString(tableData),
JsonUtils.toJsonString(tableData), "commit xid: 1006076");
+ MppdbDecodingPlugin mppdbDecodingPlugin = new
MppdbDecodingPlugin(null, true, true);
+ List<AbstractWALEvent> actual = new LinkedList<>();
+ for (String each : dataList) {
+
actual.add(mppdbDecodingPlugin.decode(ByteBuffer.wrap(each.getBytes()),
logSequenceNumber));
+ }
+ assertThat(actual.size(), is(4));
+ assertInstanceOf(BeginTXEvent.class, actual.get(0));
+ assertThat(((BeginTXEvent) actual.get(0)).getCsn(), is(951909L));
+ assertThat(((WriteRowEvent)
actual.get(1)).getAfterRow().get(0).toString(), is("7D"));
+ assertThat(((WriteRowEvent)
actual.get(2)).getAfterRow().get(0).toString(), is("7D"));
+ assertThat(((CommitTXEvent) actual.get(3)).getXid(), is(1006076L));
+ assertNull(((CommitTXEvent) actual.get(3)).getCsn());
+ }
+
@Test
void assertDecodeWithTsrange() {
MppTableData tableData = new MppTableData();
@@ -261,7 +284,7 @@ class MppdbDecodingPluginTest {
tableData.setColumnsType(new String[]{"tsrange"});
tableData.setColumnsVal(new String[]{"'[\"2020-01-01
00:00:00\",\"2021-01-01 00:00:00\")'"});
ByteBuffer data =
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
- WriteRowEvent actual = (WriteRowEvent) new
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+ WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null,
false, false).decode(data, logSequenceNumber);
Object byteaObj = actual.getAfterRow().get(0);
assertThat(byteaObj, instanceOf(PGobject.class));
assertThat(byteaObj.toString(), is("[\"2020-01-01
00:00:00\",\"2021-01-01 00:00:00\")"));
@@ -276,7 +299,7 @@ class MppdbDecodingPluginTest {
tableData.setColumnsType(new String[]{"daterange"});
tableData.setColumnsVal(new String[]{"'[2020-01-02,2021-01-02)'"});
ByteBuffer data =
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
- WriteRowEvent actual = (WriteRowEvent) new
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+ WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null,
false, false).decode(data, logSequenceNumber);
Object byteaObj = actual.getAfterRow().get(0);
assertThat(byteaObj, instanceOf(PGobject.class));
assertThat(byteaObj.toString(), is("[2020-01-02,2021-01-02)"));
@@ -291,7 +314,7 @@ class MppdbDecodingPluginTest {
tableData.setColumnsType(new String[]{"tsquery"});
tableData.setColumnsVal(new String[]{"'''fff'' | ''faa'''"});
ByteBuffer data =
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
- WriteRowEvent actual = (WriteRowEvent) new
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+ WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null,
false, false).decode(data, logSequenceNumber);
Object byteaObj = actual.getAfterRow().get(0);
assertThat(byteaObj.toString(), is("'fff' | 'faa'"));
}
@@ -305,7 +328,7 @@ class MppdbDecodingPluginTest {
tableData.setColumnsType(new String[]{"tinyint"});
tableData.setColumnsVal(new String[]{"255"});
ByteBuffer data =
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
- WriteRowEvent actual = (WriteRowEvent) new
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+ WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null,
false, false).decode(data, logSequenceNumber);
Object byteaObj = actual.getAfterRow().get(0);
assertThat(byteaObj, is(255));
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
index e02bd7a5d4f..0f819728867 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
@@ -53,7 +53,7 @@ public final class TestDecodingPlugin implements
DecodingPlugin {
AbstractWALEvent result;
String type = readEventType(data);
if (type.startsWith("BEGIN")) {
- result = new BeginTXEvent(Long.parseLong(readNextSegment(data)));
+ result = new BeginTXEvent(Long.parseLong(readNextSegment(data)),
null);
} else if (type.startsWith("COMMIT")) {
result = new CommitTXEvent(Long.parseLong(readNextSegment(data)),
null);
} else {
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/BeginTXEvent.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/BeginTXEvent.java
index b608e04e5c5..825b5725d79 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/BeginTXEvent.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/BeginTXEvent.java
@@ -27,5 +27,7 @@ import lombok.RequiredArgsConstructor;
@Getter
public final class BeginTXEvent extends AbstractWALEvent {
- private final long xid;
+ private final Long xid;
+
+ private final Long csn;
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index dab22d7bd3d..af0c4de1014 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -144,7 +144,7 @@ class WALEventConverterTest {
@Test
void assertConvertBeginTXEvent() {
- BeginTXEvent beginTXEvent = new BeginTXEvent(100);
+ BeginTXEvent beginTXEvent = new BeginTXEvent(100L, null);
beginTXEvent.setLogSequenceNumber(new
PostgreSQLLogSequenceNumber(logSequenceNumber));
Record record = walEventConverter.convert(beginTXEvent);
assertInstanceOf(PlaceholderRecord.class, record);