This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6aebdc0384 [Improve][Oracle-CDC] Support 
ReadOnlyLogWriterFlushStrategy (#8912)
6aebdc0384 is described below

commit 6aebdc0384a47140a38211f4b4e6aeeeb6dec18a
Author: hailin0 <wanghai...@apache.org>
AuthorDate: Mon Mar 10 14:10:09 2025 +0800

    [Improve][Oracle-CDC] Support ReadOnlyLogWriterFlushStrategy (#8912)
---
 .../LogMinerStreamingChangeEventSource.java        | 10 +++-
 .../logwriter/ReadOnlyLogWriterFlushStrategy.java  | 38 +++++++++++++++
 .../oracle/config/OracleSourceConfigFactory.java   |  2 +
 .../ReadOnlyLogWriterFlushStrategyTest.java        | 54 ++++++++++++++++++++++
 4 files changed, 103 insertions(+), 1 deletion(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java
index a77da296fc..9ad42f1b6b 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java
@@ -17,6 +17,8 @@
 
 package io.debezium.connector.oracle.logminer;
 
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfigFactory;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,6 +35,7 @@ import io.debezium.connector.oracle.Scn;
 import 
io.debezium.connector.oracle.logminer.logwriter.CommitLogWriterFlushStrategy;
 import io.debezium.connector.oracle.logminer.logwriter.LogWriterFlushStrategy;
 import 
io.debezium.connector.oracle.logminer.logwriter.RacCommitLogWriterFlushStrategy;
+import 
io.debezium.connector.oracle.logminer.logwriter.ReadOnlyLogWriterFlushStrategy;
 import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor;
 import io.debezium.jdbc.JdbcConfiguration;
 import io.debezium.pipeline.ErrorHandler;
@@ -1012,7 +1015,12 @@ public class LogMinerStreamingChangeEventSource
      *
      * @return the strategy to be used to flush Oracle's LGWR process, never 
{@code null}.
      */
-    private LogWriterFlushStrategy resolveFlushStrategy() {
+    public LogWriterFlushStrategy resolveFlushStrategy() {
+        if (connectorConfig
+                .getConfig()
+                .getBoolean(OracleSourceConfigFactory.LOG_MINING_READONLY_KEY, 
false)) {
+            return new ReadOnlyLogWriterFlushStrategy();
+        }
         if (connectorConfig.isRacSystem()) {
             return new RacCommitLogWriterFlushStrategy(
                     connectorConfig, jdbcConfiguration, streamingMetrics);
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/logwriter/ReadOnlyLogWriterFlushStrategy.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/logwriter/ReadOnlyLogWriterFlushStrategy.java
new file mode 100644
index 0000000000..6e80421a33
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/logwriter/ReadOnlyLogWriterFlushStrategy.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.oracle.logminer.logwriter;
+
+import io.debezium.DebeziumException;
+import io.debezium.connector.oracle.Scn;
+
+public class ReadOnlyLogWriterFlushStrategy implements LogWriterFlushStrategy {
+    @Override
+    public String getHost() {
+        throw new DebeziumException("Not applicable when using read-only 
flushing strategy");
+    }
+
+    @Override
+    public void flush(Scn currentScn) throws InterruptedException {
+        // no operation
+    }
+
+    @Override
+    public void close() throws Exception {
+        // no operation
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
index d7d6c70061..fa240bc158 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
@@ -41,6 +41,7 @@ public class OracleSourceConfigFactory extends 
JdbcSourceConfigFactory {
     public static final String SCHEMA_CHANGE_KEY = "include.schema.changes";
     public static final String LOG_MINING_STRATEGY_KEY = "log.mining.strategy";
     public static final String LOG_MINING_STRATEGY_DEFAULT = "online_catalog";
+    public static final String LOG_MINING_READONLY_KEY = 
"log.mining.read.only";
 
     private List<String> schemaList;
 
@@ -106,6 +107,7 @@ public class OracleSourceConfigFactory extends 
JdbcSourceConfigFactory {
         props.setProperty("connect.timeout.ms", 
String.valueOf(connectTimeoutMillis));
         // disable tombstones
         props.setProperty("tombstones.on.delete", String.valueOf(false));
+        props.setProperty(LOG_MINING_READONLY_KEY, "true");
 
         if (originUrl != null) {
             props.setProperty("database.url", originUrl);
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/test/java/io/debezium/connector/oracle/logminer/logwriter/ReadOnlyLogWriterFlushStrategyTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/test/java/io/debezium/connector/oracle/logminer/logwriter/ReadOnlyLogWriterFlushStrategyTest.java
new file mode 100644
index 0000000000..c0d5a7e68b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/test/java/io/debezium/connector/oracle/logminer/logwriter/ReadOnlyLogWriterFlushStrategyTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.oracle.logminer.logwriter;
+
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfigFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.debezium.DebeziumException;
+import io.debezium.config.Configuration;
+import io.debezium.connector.oracle.OracleConnectorConfig;
+import 
io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ReadOnlyLogWriterFlushStrategyTest {
+
+    @Test
+    void returnsReadOnlyLogWriterFlushStrategyWhenReadOnlyKeyIsTrue() throws 
Exception {
+        OracleConnectorConfig config = mock(OracleConnectorConfig.class);
+        Configuration configuration = mock(Configuration.class);
+        when(config.getConfig()).thenReturn(configuration);
+        
when(configuration.getBoolean(OracleSourceConfigFactory.LOG_MINING_READONLY_KEY,
 false))
+                .thenReturn(true);
+
+        LogMinerStreamingChangeEventSource source =
+                new LogMinerStreamingChangeEventSource(
+                        config, null, null, null, null, null, null, null);
+        LogWriterFlushStrategy strategy = source.resolveFlushStrategy();
+        assertTrue(strategy instanceof ReadOnlyLogWriterFlushStrategy);
+
+        Assertions.assertThrows(DebeziumException.class, () -> 
strategy.getHost());
+        strategy.flush(null);
+        strategy.close();
+    }
+}

Reply via email to