sandynz commented on code in PR #30498:
URL: https://github.com/apache/shardingsphere/pull/30498#discussion_r1527914474


##########
kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java:
##########
@@ -91,6 +97,21 @@ private AbstractWALEvent decodeDataWithTX(final String 
dataText) {
         return result;
     }
     
+    private AbstractWALEvent decodeVersionGreaterThan3(final String dataText) {
+        AbstractWALEvent result = new PlaceholderEvent();
+        if (dataText.startsWith("BEGIN")) {
+            int beginIndex = dataText.indexOf("CSN:") + "CSN:".length() + 1;
+            int endIndex = dataText.indexOf("first_lsn") - 1;
+            result = new BeginTXEvent(null, 
Long.parseLong(dataText.substring(beginIndex, endIndex)));

Review Comment:
   Need to check whether there's `first_lsn` or not



##########
kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java:
##########
@@ -60,6 +67,10 @@
 @Slf4j
 public final class OpenGaussWALDumper extends 
AbstractPipelineLifecycleRunnable implements IncrementalDumper {
     
+    private static final Pattern VERSION_PATTERN = Pattern.compile("(\\d+)");

Review Comment:
   Seems `openGauss ` prefix could not be removed in VERSION_PATTERN



##########
kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java:
##########
@@ -57,9 +57,7 @@ public final class MppdbDecodingPlugin implements 
DecodingPlugin {
     
     private final boolean decodeWithTX;
     
-    public MppdbDecodingPlugin(final BaseTimestampUtils timestampUtils) {
-        this(timestampUtils, false);
-    }
+    private final int majorVersion;

Review Comment:
   Could we replace `int majorVersion` to `boolean decodeParallelly`, it's 
error-prone to to transfer majorVersion (e.g. 2, 3 or any other integer) on 
invocation.



##########
kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java:
##########
@@ -91,6 +97,21 @@ private AbstractWALEvent decodeDataWithTX(final String 
dataText) {
         return result;
     }
     
+    private AbstractWALEvent decodeVersionGreaterThan3(final String dataText) {
+        AbstractWALEvent result = new PlaceholderEvent();
+        if (dataText.startsWith("BEGIN")) {
+            int beginIndex = dataText.indexOf("CSN:") + "CSN:".length() + 1;
+            int endIndex = dataText.indexOf("first_lsn") - 1;
+            result = new BeginTXEvent(null, 
Long.parseLong(dataText.substring(beginIndex, endIndex)));
+        } else if (dataText.startsWith("commit")) {

Review Comment:
   It's better to compatible with `COMMIT`, since it's different as `BEGIN`, 
though it's defined as `commit` in document.
   e.g. Use `dataText.startsWith("commit") || dataText.startsWith("COMMIT")`



##########
kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java:
##########
@@ -138,28 +152,61 @@ private void dump() throws SQLException {
         }
     }
     
+    private int getMajorVersion() throws SQLException {
+        StandardPipelineDataSourceConfiguration dataSourceConfig = 
(StandardPipelineDataSourceConfiguration) 
dumperContext.getCommonContext().getDataSourceConfig();
+        try (
+                Connection connection = 
DriverManager.getConnection(dataSourceConfig.getUrl(), 
dataSourceConfig.getUsername(), dataSourceConfig.getPassword());
+                Statement statement = connection.createStatement();
+                ResultSet resultSet = statement.executeQuery("SELECT 
version()")) {
+            resultSet.next();
+            String versionText = resultSet.getString(1);
+            return parseMajorVersion(versionText);
+        }
+    }
+    
+    private int parseMajorVersion(final String versionText) {
+        Matcher matcher = VERSION_PATTERN.matcher(versionText);
+        if (matcher.find()) {
+            log.info("openGauss major version={}, `select version()`={}", 
matcher.group(1), versionText);
+            return Integer.parseInt(matcher.group(1));
+        }

Review Comment:
   No matter `matcher.find()` return true or not, print the log



##########
kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java:
##########
@@ -77,10 +75,18 @@ public AbstractWALEvent decode(final ByteBuffer data, final 
BaseLogSequenceNumbe
     }
     
     private AbstractWALEvent decodeDataWithTX(final String dataText) {
+        if (majorVersion < 3) {
+            return decodeVersionLessThan3(dataText);
+        } else {
+            return decodeVersionGreaterThan3(dataText);
+        }

Review Comment:
   `decodeVersionLessThan3` could be `decodeSerially`, 
`decodeVersionGreaterThan3` could be `decodeParallelly`



##########
kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java:
##########
@@ -138,28 +152,61 @@ private void dump() throws SQLException {
         }
     }
     
+    private int getMajorVersion() throws SQLException {
+        StandardPipelineDataSourceConfiguration dataSourceConfig = 
(StandardPipelineDataSourceConfiguration) 
dumperContext.getCommonContext().getDataSourceConfig();
+        try (
+                Connection connection = 
DriverManager.getConnection(dataSourceConfig.getUrl(), 
dataSourceConfig.getUsername(), dataSourceConfig.getPassword());
+                Statement statement = connection.createStatement();
+                ResultSet resultSet = statement.executeQuery("SELECT 
version()")) {
+            resultSet.next();
+            String versionText = resultSet.getString(1);
+            return parseMajorVersion(versionText);
+        }
+    }
+    
+    private int parseMajorVersion(final String versionText) {
+        Matcher matcher = VERSION_PATTERN.matcher(versionText);
+        if (matcher.find()) {
+            log.info("openGauss major version={}, `select version()`={}", 
matcher.group(1), versionText);
+            return Integer.parseInt(matcher.group(1));
+        }
+        return DEFAULT_VERSION;
+    }
+    
     private PgConnection getReplicationConnectionUnwrap() throws SQLException {
         return 
logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) 
dumperContext.getCommonContext().getDataSourceConfig()).unwrap(PgConnection.class);
     }
     
-    private void processEventWithTX(final AbstractWALEvent event) {
+    private void processEventWithTX(final AbstractWALEvent event, final int 
majorVersion) {
         if (event instanceof BeginTXEvent) {
+            if (majorVersion < 3) {
+                return;
+            }
+            if (!rowEvents.isEmpty()) {
+                
channel.push(rowEvents.stream().map(walEventConverter::convert).collect(Collectors.toList()));
+                rowEvents = new LinkedList<>();
+            }

Review Comment:
   `rowEvents` belongs to previous transaction, and BeginTXEvent belongs to 
next transaction, looks it should not occur if CommitTXEvent parsing works.
   
   Could we remove this block of code and just `log.warn`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to