This is an automated email from the ASF dual-hosted git repository. wuchunfu pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 6aebdc0384 [Improve][Oracle-CDC] Support ReadOnlyLogWriterFlushStrategy (#8912) 6aebdc0384 is described below commit 6aebdc0384a47140a38211f4b4e6aeeeb6dec18a Author: hailin0 <wanghai...@apache.org> AuthorDate: Mon Mar 10 14:10:09 2025 +0800 [Improve][Oracle-CDC] Support ReadOnlyLogWriterFlushStrategy (#8912) --- .../LogMinerStreamingChangeEventSource.java | 10 +++- .../logwriter/ReadOnlyLogWriterFlushStrategy.java | 38 +++++++++++++++ .../oracle/config/OracleSourceConfigFactory.java | 2 + .../ReadOnlyLogWriterFlushStrategyTest.java | 54 ++++++++++++++++++++++ 4 files changed, 103 insertions(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java index a77da296fc..9ad42f1b6b 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java @@ -17,6 +17,8 @@ package io.debezium.connector.oracle.logminer; +import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfigFactory; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +35,7 @@ import io.debezium.connector.oracle.Scn; import io.debezium.connector.oracle.logminer.logwriter.CommitLogWriterFlushStrategy; import io.debezium.connector.oracle.logminer.logwriter.LogWriterFlushStrategy; import io.debezium.connector.oracle.logminer.logwriter.RacCommitLogWriterFlushStrategy; +import io.debezium.connector.oracle.logminer.logwriter.ReadOnlyLogWriterFlushStrategy; import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor; import io.debezium.jdbc.JdbcConfiguration; import io.debezium.pipeline.ErrorHandler; @@ -1012,7 +1015,12 @@ public class LogMinerStreamingChangeEventSource * * @return the strategy to be used to flush Oracle's LGWR process, never {@code null}. */ - private LogWriterFlushStrategy resolveFlushStrategy() { + public LogWriterFlushStrategy resolveFlushStrategy() { + if (connectorConfig + .getConfig() + .getBoolean(OracleSourceConfigFactory.LOG_MINING_READONLY_KEY, false)) { + return new ReadOnlyLogWriterFlushStrategy(); + } if (connectorConfig.isRacSystem()) { return new RacCommitLogWriterFlushStrategy( connectorConfig, jdbcConfiguration, streamingMetrics); diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/logwriter/ReadOnlyLogWriterFlushStrategy.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/logwriter/ReadOnlyLogWriterFlushStrategy.java new file mode 100644 index 0000000000..6e80421a33 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/logwriter/ReadOnlyLogWriterFlushStrategy.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.debezium.connector.oracle.logminer.logwriter; + +import io.debezium.DebeziumException; +import io.debezium.connector.oracle.Scn; + +public class ReadOnlyLogWriterFlushStrategy implements LogWriterFlushStrategy { + @Override + public String getHost() { + throw new DebeziumException("Not applicable when using read-only flushing strategy"); + } + + @Override + public void flush(Scn currentScn) throws InterruptedException { + // no operation + } + + @Override + public void close() throws Exception { + // no operation + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java index d7d6c70061..fa240bc158 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java @@ -41,6 +41,7 @@ public class OracleSourceConfigFactory extends JdbcSourceConfigFactory { public static final String SCHEMA_CHANGE_KEY = "include.schema.changes"; public static final String LOG_MINING_STRATEGY_KEY = "log.mining.strategy"; public static final String LOG_MINING_STRATEGY_DEFAULT = "online_catalog"; + public static final String LOG_MINING_READONLY_KEY = "log.mining.read.only"; private List<String> schemaList; @@ -106,6 +107,7 @@ public class OracleSourceConfigFactory extends JdbcSourceConfigFactory { props.setProperty("connect.timeout.ms", String.valueOf(connectTimeoutMillis)); // disable tombstones props.setProperty("tombstones.on.delete", String.valueOf(false)); + props.setProperty(LOG_MINING_READONLY_KEY, "true"); if (originUrl != null) { props.setProperty("database.url", originUrl); diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/test/java/io/debezium/connector/oracle/logminer/logwriter/ReadOnlyLogWriterFlushStrategyTest.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/test/java/io/debezium/connector/oracle/logminer/logwriter/ReadOnlyLogWriterFlushStrategyTest.java new file mode 100644 index 0000000000..c0d5a7e68b --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/test/java/io/debezium/connector/oracle/logminer/logwriter/ReadOnlyLogWriterFlushStrategyTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.debezium.connector.oracle.logminer.logwriter; + +import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfigFactory; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import io.debezium.DebeziumException; +import io.debezium.config.Configuration; +import io.debezium.connector.oracle.OracleConnectorConfig; +import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ReadOnlyLogWriterFlushStrategyTest { + + @Test + void returnsReadOnlyLogWriterFlushStrategyWhenReadOnlyKeyIsTrue() throws Exception { + OracleConnectorConfig config = mock(OracleConnectorConfig.class); + Configuration configuration = mock(Configuration.class); + when(config.getConfig()).thenReturn(configuration); + when(configuration.getBoolean(OracleSourceConfigFactory.LOG_MINING_READONLY_KEY, false)) + .thenReturn(true); + + LogMinerStreamingChangeEventSource source = + new LogMinerStreamingChangeEventSource( + config, null, null, null, null, null, null, null); + LogWriterFlushStrategy strategy = source.resolveFlushStrategy(); + assertTrue(strategy instanceof ReadOnlyLogWriterFlushStrategy); + + Assertions.assertThrows(DebeziumException.class, () -> strategy.getHost()); + strategy.flush(null); + strategy.close(); + } +}