sandynz commented on code in PR #30498: URL: https://github.com/apache/shardingsphere/pull/30498#discussion_r1527914474
########## kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java: ########## @@ -91,6 +97,21 @@ private AbstractWALEvent decodeDataWithTX(final String dataText) { return result; } + private AbstractWALEvent decodeVersionGreaterThan3(final String dataText) { + AbstractWALEvent result = new PlaceholderEvent(); + if (dataText.startsWith("BEGIN")) { + int beginIndex = dataText.indexOf("CSN:") + "CSN:".length() + 1; + int endIndex = dataText.indexOf("first_lsn") - 1; + result = new BeginTXEvent(null, Long.parseLong(dataText.substring(beginIndex, endIndex))); Review Comment: Need to check whether there's `first_lsn` or not ########## kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java: ########## @@ -60,6 +67,10 @@ @Slf4j public final class OpenGaussWALDumper extends AbstractPipelineLifecycleRunnable implements IncrementalDumper { + private static final Pattern VERSION_PATTERN = Pattern.compile("(\\d+)"); Review Comment: Seems `openGauss ` prefix could not be removed in VERSION_PATTERN ########## kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java: ########## @@ -57,9 +57,7 @@ public final class MppdbDecodingPlugin implements DecodingPlugin { private final boolean decodeWithTX; - public MppdbDecodingPlugin(final BaseTimestampUtils timestampUtils) { - this(timestampUtils, false); - } + private final int majorVersion; Review Comment: Could we replace `int majorVersion` to `boolean decodeParallelly`, it's error-prone to to transfer majorVersion (e.g. 2, 3 or any other integer) on invocation. ########## kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java: ########## @@ -91,6 +97,21 @@ private AbstractWALEvent decodeDataWithTX(final String dataText) { return result; } + private AbstractWALEvent decodeVersionGreaterThan3(final String dataText) { + AbstractWALEvent result = new PlaceholderEvent(); + if (dataText.startsWith("BEGIN")) { + int beginIndex = dataText.indexOf("CSN:") + "CSN:".length() + 1; + int endIndex = dataText.indexOf("first_lsn") - 1; + result = new BeginTXEvent(null, Long.parseLong(dataText.substring(beginIndex, endIndex))); + } else if (dataText.startsWith("commit")) { Review Comment: It's better to compatible with `COMMIT`, since it's different as `BEGIN`, though it's defined as `commit` in document. e.g. Use `dataText.startsWith("commit") || dataText.startsWith("COMMIT")` ########## kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java: ########## @@ -138,28 +152,61 @@ private void dump() throws SQLException { } } + private int getMajorVersion() throws SQLException { + StandardPipelineDataSourceConfiguration dataSourceConfig = (StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig(); + try ( + Connection connection = DriverManager.getConnection(dataSourceConfig.getUrl(), dataSourceConfig.getUsername(), dataSourceConfig.getPassword()); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery("SELECT version()")) { + resultSet.next(); + String versionText = resultSet.getString(1); + return parseMajorVersion(versionText); + } + } + + private int parseMajorVersion(final String versionText) { + Matcher matcher = VERSION_PATTERN.matcher(versionText); + if (matcher.find()) { + log.info("openGauss major version={}, `select version()`={}", matcher.group(1), versionText); + return Integer.parseInt(matcher.group(1)); + } Review Comment: No matter `matcher.find()` return true or not, print the log ########## kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java: ########## @@ -77,10 +75,18 @@ public AbstractWALEvent decode(final ByteBuffer data, final BaseLogSequenceNumbe } private AbstractWALEvent decodeDataWithTX(final String dataText) { + if (majorVersion < 3) { + return decodeVersionLessThan3(dataText); + } else { + return decodeVersionGreaterThan3(dataText); + } Review Comment: `decodeVersionLessThan3` could be `decodeSerially`, `decodeVersionGreaterThan3` could be `decodeParallelly` ########## kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java: ########## @@ -138,28 +152,61 @@ private void dump() throws SQLException { } } + private int getMajorVersion() throws SQLException { + StandardPipelineDataSourceConfiguration dataSourceConfig = (StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig(); + try ( + Connection connection = DriverManager.getConnection(dataSourceConfig.getUrl(), dataSourceConfig.getUsername(), dataSourceConfig.getPassword()); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery("SELECT version()")) { + resultSet.next(); + String versionText = resultSet.getString(1); + return parseMajorVersion(versionText); + } + } + + private int parseMajorVersion(final String versionText) { + Matcher matcher = VERSION_PATTERN.matcher(versionText); + if (matcher.find()) { + log.info("openGauss major version={}, `select version()`={}", matcher.group(1), versionText); + return Integer.parseInt(matcher.group(1)); + } + return DEFAULT_VERSION; + } + private PgConnection getReplicationConnectionUnwrap() throws SQLException { return logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig()).unwrap(PgConnection.class); } - private void processEventWithTX(final AbstractWALEvent event) { + private void processEventWithTX(final AbstractWALEvent event, final int majorVersion) { if (event instanceof BeginTXEvent) { + if (majorVersion < 3) { + return; + } + if (!rowEvents.isEmpty()) { + channel.push(rowEvents.stream().map(walEventConverter::convert).collect(Collectors.toList())); + rowEvents = new LinkedList<>(); + } Review Comment: `rowEvents` belongs to previous transaction, and BeginTXEvent belongs to next transaction, looks it should not occur if CommitTXEvent parsing works. Could we remove this block of code and just `log.warn`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@shardingsphere.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org