This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a2eb1aa42e2c4fa2f72ddc0db7f861556e334e60 Author: Enrique Fernández <[email protected]> AuthorDate: Mon Sep 8 21:18:31 2025 +0200 [improve][io] Upgrade to Debezium 3.2.2 (#24712) Co-authored-by: Lari Hotari <[email protected]> --- pom.xml | 13 +++--- .../apache/pulsar/io/debezium/DebeziumSource.java | 20 +++++---- ...tabaseHistory.java => PulsarSchemaHistory.java} | 48 +++++++++++----------- .../org/apache/pulsar/io/debezium/SerDeUtils.java | 2 +- ...storyTest.java => PulsarSchemaHistoryTest.java} | 40 +++++++++--------- .../io/debezium/mongodb/DebeziumMongoDbSource.java | 7 ++++ .../resources/debezium-mongodb-source-config.yaml | 5 ++- .../io/debezium/mssql/DebeziumMsSqlSource.java | 7 ++++ .../resources/debezium-mssql-source-config.yaml | 7 ++-- pulsar-io/debezium/mysql/pom.xml | 4 +- .../io/debezium/mysql/DebeziumMysqlSource.java | 7 ++++ .../resources/debezium-mysql-source-config.yaml | 7 ++-- .../io/debezium/oracle/DebeziumOracleSource.java | 7 ++++ .../resources/debezium-oracle-source-config.yaml | 5 ++- pulsar-io/debezium/pom.xml | 40 ------------------ .../debezium/postgres/DebeziumPostgresSource.java | 7 ++++ .../resources/debezium-postgres-source-config.yaml | 7 ++-- pulsar-io/mongo/pom.xml | 2 +- .../apache/pulsar/io/mongodb/MongoSinkTest.java | 19 ++++++++- .../apache/pulsar/io/mongodb/MongoSourceTest.java | 12 ++++-- src/owasp-dependency-check-suppressions.xml | 43 ------------------- tests/integration/pom.xml | 2 +- .../containers/DebeziumMongoDbContainer.java | 2 +- .../containers/DebeziumMySQLContainer.java | 2 +- .../containers/DebeziumPostgreSqlContainer.java | 2 +- .../io/sources/PulsarIOSourceRunner.java | 15 +++++++ .../tests/integration/io/sources/SourceTester.java | 2 + .../debezium/DebeziumMongoDbSourceTester.java | 22 +++++----- .../debezium/DebeziumMsSqlSourceTester.java | 14 ++++--- .../debezium/DebeziumMySqlSourceTester.java | 7 ++-- .../debezium/DebeziumOracleDbSourceTester.java | 6 ++- .../debezium/DebeziumPostgreSqlSourceTester.java | 10 ++--- .../debezium/PulsarDebeziumSourcesTest.java | 4 +- tests/scripts/pre-integ-tests.sh | 4 +- 34 files changed, 202 insertions(+), 199 deletions(-) diff --git a/pom.xml b/pom.xml index b397229ea37..8e1cf061bef 100644 --- a/pom.xml +++ b/pom.xml @@ -240,19 +240,16 @@ flexible messaging model and an intuitive client API.</description> <jclouds.version>2.6.0</jclouds.version> <guice.version>5.1.0</guice.version> <sqlite-jdbc.version>3.47.1.0</sqlite-jdbc.version> - <mysql-jdbc.version>8.0.11</mysql-jdbc.version> - <postgresql-jdbc.version>42.5.5</postgresql-jdbc.version> + <postgresql-jdbc.version>42.7.7</postgresql-jdbc.version> <clickhouse-jdbc.version>0.4.6</clickhouse-jdbc.version> - <mariadb-jdbc.version>2.7.5</mariadb-jdbc.version> + <mariadb-jdbc.version>3.5.5</mariadb-jdbc.version> <openmldb-jdbc.version>0.4.4-hotfix1</openmldb-jdbc.version> <json-smart.version>2.5.2</json-smart.version> <opensearch.version>2.16.0</opensearch.version> <elasticsearch-java.version>8.15.3</elasticsearch-java.version> - <debezium.version>1.9.7.Final</debezium.version> - <debezium.postgresql.version>42.5.5</debezium.postgresql.version> - <debezium.mysql.version>8.0.33</debezium.mysql.version> - <!-- Override version that brings CVE-2022-3143 with debezium --> - <wildfly-elytron.version>1.15.16.Final</wildfly-elytron.version> + <debezium.version>3.2.2.Final</debezium.version> + <debezium.postgresql.version>${postgresql-jdbc.version}</debezium.postgresql.version> + <debezium.mysql.version>9.4.0</debezium.mysql.version> <jsonwebtoken.version>0.11.1</jsonwebtoken.version> <opencensus.version>0.28.0</opencensus.version> <hadoop3.version>3.4.0</hadoop3.version> diff --git a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java index 6c422c4f036..f0eaf2938d8 100644 --- a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java +++ b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java @@ -30,7 +30,7 @@ import org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig; @Slf4j public abstract class DebeziumSource extends KafkaConnectSource { private static final String DEFAULT_CONVERTER = "org.apache.kafka.connect.json.JsonConverter"; - private static final String DEFAULT_HISTORY = "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"; + private static final String DEFAULT_HISTORY = "org.apache.pulsar.io.debezium.PulsarSchemaHistory"; private static final String DEFAULT_OFFSET_TOPIC = "debezium-offset-topic"; private static final String DEFAULT_HISTORY_TOPIC = "debezium-history-topic"; @@ -76,6 +76,8 @@ public abstract class DebeziumSource extends KafkaConnectSource { public abstract void setDbConnectorTask(Map<String, Object> config) throws Exception; + public abstract void setDbConnectorClass(Map<String, Object> config) throws Exception; + @Override public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception { setDbConnectorTask(config); @@ -87,28 +89,28 @@ public abstract class DebeziumSource extends KafkaConnectSource { // value.converter setConfigIfNull(config, PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER); - // database.history : implementation class for database history. - setConfigIfNull(config, HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY.name(), DEFAULT_HISTORY); + // schema.history : implementation class for schema history. + setConfigIfNull(config, HistorizedRelationalDatabaseConnectorConfig.SCHEMA_HISTORY.name(), DEFAULT_HISTORY); - // database.history.pulsar.service.url - String pulsarUrl = (String) config.get(PulsarDatabaseHistory.SERVICE_URL.name()); + // schema.history.internal.pulsar.service.url + String pulsarUrl = (String) config.get(PulsarSchemaHistory.SERVICE_URL.name()); String topicNamespace = topicNamespace(sourceContext); // topic.namespace setConfigIfNull(config, PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG, topicNamespace); String sourceName = sourceContext.getSourceName(); - // database.history.pulsar.topic: history topic name - setConfigIfNull(config, PulsarDatabaseHistory.TOPIC.name(), + // schema.history.internal.pulsar.topic: history topic name + setConfigIfNull(config, PulsarSchemaHistory.TOPIC.name(), topicNamespace + "/" + sourceName + "-" + DEFAULT_HISTORY_TOPIC); // offset.storage.topic: offset topic name setConfigIfNull(config, PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG, topicNamespace + "/" + sourceName + "-" + DEFAULT_OFFSET_TOPIC); - // pass pulsar.client.builder if database.history.pulsar.service.url is not provided + // pass pulsar.client.builder if schema.history.internal.pulsar.service.url is not provided if (StringUtils.isEmpty(pulsarUrl)) { String pulsarClientBuilder = SerDeUtils.serialize(sourceContext.getPulsarClientBuilder()); - config.put(PulsarDatabaseHistory.CLIENT_BUILDER.name(), pulsarClientBuilder); + config.put(PulsarSchemaHistory.CLIENT_BUILDER.name(), pulsarClientBuilder); } super.open(config, sourceContext); diff --git a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarSchemaHistory.java similarity index 86% rename from pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java rename to pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarSchemaHistory.java index 7ca0d309cf9..8ae8c663a8e 100644 --- a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java +++ b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarSchemaHistory.java @@ -26,12 +26,12 @@ import io.debezium.annotation.ThreadSafe; import io.debezium.config.Configuration; import io.debezium.config.Field; import io.debezium.document.DocumentReader; -import io.debezium.relational.history.AbstractDatabaseHistory; -import io.debezium.relational.history.DatabaseHistory; -import io.debezium.relational.history.DatabaseHistoryException; -import io.debezium.relational.history.DatabaseHistoryListener; +import io.debezium.relational.history.AbstractSchemaHistory; import io.debezium.relational.history.HistoryRecord; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.SchemaHistory; +import io.debezium.relational.history.SchemaHistoryException; +import io.debezium.relational.history.SchemaHistoryListener; import java.io.IOException; import java.util.Collections; import java.util.HashMap; @@ -52,15 +52,15 @@ import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; /** - * A {@link DatabaseHistory} implementation that records schema changes as normal pulsar messages on the specified + * A {@link SchemaHistory} implementation that records schema changes as normal pulsar messages on the specified * topic, and that recovers the history by establishing a Kafka Consumer re-processing all messages on that topic. */ @Slf4j @ThreadSafe -public final class PulsarDatabaseHistory extends AbstractDatabaseHistory { +public final class PulsarSchemaHistory extends AbstractSchemaHistory { public static final Field TOPIC = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.topic") - .withDisplayName("Database history topic name") + .withDisplayName("Schema history topic name") .withType(Type.STRING) .withWidth(Width.LONG) .withImportance(Importance.HIGH) @@ -97,7 +97,7 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory { TOPIC, SERVICE_URL, CLIENT_BUILDER, - DatabaseHistory.NAME, + SchemaHistory.NAME, READER_CONFIG); private final ObjectMapper mapper = new ObjectMapper(); @@ -113,7 +113,7 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory { public void configure( Configuration config, HistoryRecordComparator comparator, - DatabaseHistoryListener listener, + SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { super.configure(config, comparator, listener, useCatalogBeforeSchema); if (!config.validateAndRecord(ALL_FIELDS, logger::error)) { @@ -148,9 +148,9 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory { } // Copy the relevant portions of the configuration and add useful defaults ... - this.dbHistoryName = config.getString(DatabaseHistory.NAME, UUID.randomUUID().toString()); + this.dbHistoryName = config.getString(SchemaHistory.NAME, UUID.randomUUID().toString()); - log.info("Configure to store the debezium database history {} to pulsar topic {}", + log.info("Configure to store the debezium schema history {} to pulsar topic {}", dbHistoryName, topicName); } @@ -163,7 +163,7 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory { p.send(""); } catch (PulsarClientException pce) { log.error("Failed to initialize storage", pce); - throw new RuntimeException("Failed to initialize storage", pce); + throw new SchemaHistoryException("Failed to initialize storage", pce); } } @@ -172,7 +172,7 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory { try { pulsarClient = clientBuilder.build(); } catch (PulsarClientException e) { - throw new RuntimeException("Failed to create pulsar client to pulsar cluster", e); + throw new SchemaHistoryException("Failed to create pulsar client to pulsar cluster", e); } } } @@ -201,18 +201,18 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory { } @Override - protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException { + protected void storeRecord(HistoryRecord record) throws SchemaHistoryException { if (this.producer == null) { throw new IllegalStateException("No producer is available. Ensure that 'start()'" - + " is called before storing database history records."); + + " is called before storing schema history records."); } if (log.isTraceEnabled()) { - log.trace("Storing record into database history: {}", record); + log.trace("Storing record into schema history: {}", record); } try { producer.send(record.toString()); } catch (PulsarClientException e) { - throw new DatabaseHistoryException(e); + throw new SchemaHistoryException(e); } } @@ -242,7 +242,7 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory { protected void recoverRecords(Consumer<HistoryRecord> records) { setupClientIfNeeded(); try (Reader<String> historyReader = createHistoryReader()) { - log.info("Scanning the database history topic '{}'", topicName); + log.info("Scanning the schema history topic '{}'", topicName); // Read all messages in the topic ... MessageId lastProcessedMessageId = null; @@ -255,15 +255,15 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory { if (!isBlank(msg.getValue())) { HistoryRecord recordObj = new HistoryRecord(reader.read(msg.getValue())); if (log.isTraceEnabled()) { - log.trace("Recovering database history: {}", recordObj); + log.trace("Recovering schema history: {}", recordObj); } if (!recordObj.isValid()) { - log.warn("Skipping invalid database history record '{}'. This is often not an issue," + log.warn("Skipping invalid schema history record '{}'. This is often not an issue," + " but if it happens repeatedly please check the '{}' topic.", recordObj, topicName); } else { records.accept(recordObj); - log.trace("Recovered database history: {}", recordObj); + log.trace("Recovered schema history: {}", recordObj); } } lastProcessedMessageId = msg.getMessageId(); @@ -274,7 +274,7 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory { throw e; } } - log.info("Successfully completed scanning the database history topic '{}'", topicName); + log.info("Successfully completed scanning the schema history topic '{}'", topicName); } catch (IOException ioe) { log.error("Encountered issues on recovering history records", ioe); throw new RuntimeException("Encountered issues on recovering history records", ioe); @@ -287,8 +287,8 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory { try (Reader<String> historyReader = createHistoryReader()) { return historyReader.hasMessageAvailable(); } catch (IOException e) { - log.error("Encountered issues on checking existence of database history", e); - throw new RuntimeException("Encountered issues on checking existence of database history", e); + log.error("Encountered issues on checking existence of schema history", e); + throw new RuntimeException("Encountered issues on checking existence of schema history", e); } } diff --git a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/SerDeUtils.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/SerDeUtils.java index 3d8dd3bd08d..156b19b3f76 100644 --- a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/SerDeUtils.java +++ b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/SerDeUtils.java @@ -37,7 +37,7 @@ public class SerDeUtils { return ois.readObject(); } catch (Exception e) { throw new RuntimeException( - "Failed to initialize the pulsar client to store debezium database history", e); + "Failed to initialize the pulsar client to store debezium schema history", e); } } diff --git a/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarSchemaHistoryTest.java similarity index 87% rename from pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java rename to pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarSchemaHistoryTest.java index 4d076ebbc0d..c593b464c4a 100644 --- a/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java +++ b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarSchemaHistoryTest.java @@ -26,8 +26,8 @@ import io.debezium.config.Configuration; import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; import io.debezium.relational.Tables; import io.debezium.relational.ddl.DdlParser; -import io.debezium.relational.history.DatabaseHistory; -import io.debezium.relational.history.DatabaseHistoryListener; +import io.debezium.relational.history.SchemaHistory; +import io.debezium.relational.history.SchemaHistoryListener; import io.debezium.text.ParsingException; import io.debezium.util.Collect; import java.io.ByteArrayOutputStream; @@ -46,11 +46,11 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; /** - * Test the implementation of {@link PulsarDatabaseHistory}. + * Test the implementation of {@link PulsarSchemaHistory}. */ -public class PulsarDatabaseHistoryTest extends ProducerConsumerBase { +public class PulsarSchemaHistoryTest extends ProducerConsumerBase { - private PulsarDatabaseHistory history; + private PulsarSchemaHistory history; private Map<String, Object> position; private Map<String, String> source; private String topicName; @@ -65,7 +65,7 @@ public class PulsarDatabaseHistoryTest extends ProducerConsumerBase { source = Collect.hashMapOf("server", "my-server"); setLogPosition(0); this.topicName = "persistent://my-property/my-ns/schema-changes-topic"; - this.history = new PulsarDatabaseHistory(); + this.history = new PulsarSchemaHistory(); } @AfterMethod(alwaysRun = true) @@ -78,9 +78,9 @@ public class PulsarDatabaseHistoryTest extends ProducerConsumerBase { private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWithClientBuilder, boolean testWithReaderConfig) throws Exception { Configuration.Builder configBuidler = Configuration.create() - .with(PulsarDatabaseHistory.TOPIC, topicName) - .with(DatabaseHistory.NAME, "my-db-history") - .with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL); + .with(PulsarSchemaHistory.TOPIC, topicName) + .with(SchemaHistory.NAME, "my-db-history") + .with(SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL); if (testWithClientBuilder) { ClientBuilder builder = PulsarClient.builder().serviceUrl(brokerUrl.toString()); @@ -89,18 +89,18 @@ public class PulsarDatabaseHistoryTest extends ProducerConsumerBase { oos.writeObject(builder); oos.flush(); byte[] data = bao.toByteArray(); - configBuidler.with(PulsarDatabaseHistory.CLIENT_BUILDER, Base64.getEncoder().encodeToString(data)); + configBuidler.with(PulsarSchemaHistory.CLIENT_BUILDER, Base64.getEncoder().encodeToString(data)); } } else { - configBuidler.with(PulsarDatabaseHistory.SERVICE_URL, brokerUrl.toString()); + configBuidler.with(PulsarSchemaHistory.SERVICE_URL, brokerUrl.toString()); } if (testWithReaderConfig) { - configBuidler.with(PulsarDatabaseHistory.READER_CONFIG, "{\"subscriptionName\":\"my-subscription\"}"); + configBuidler.with(PulsarSchemaHistory.READER_CONFIG, "{\"subscriptionName\":\"my-subscription\"}"); } // Start up the history ... - history.configure(configBuidler.build(), null, DatabaseHistoryListener.NOOP, true); + history.configure(configBuidler.build(), null, SchemaHistoryListener.NOOP, true); history.start(); // Should be able to call start more than once ... @@ -159,8 +159,8 @@ public class PulsarDatabaseHistoryTest extends ProducerConsumerBase { // Stop the history (which should stop the producer) ... history.stop(); - history = new PulsarDatabaseHistory(); - history.configure(configBuidler.build(), null, DatabaseHistoryListener.NOOP, true); + history = new PulsarSchemaHistory(); + history.configure(configBuidler.build(), null, SchemaHistoryListener.NOOP, true); // no need to start // Recover from the very beginning to just past the first change ... @@ -244,13 +244,13 @@ public class PulsarDatabaseHistoryTest extends ProducerConsumerBase { // Set history to use dummy topic Configuration config = Configuration.create() - .with(PulsarDatabaseHistory.SERVICE_URL, brokerUrl.toString()) - .with(PulsarDatabaseHistory.TOPIC, "persistent://my-property/my-ns/dummytopic") - .with(DatabaseHistory.NAME, "my-db-history") - .with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true) + .with(PulsarSchemaHistory.SERVICE_URL, brokerUrl.toString()) + .with(PulsarSchemaHistory.TOPIC, "persistent://my-property/my-ns/dummytopic") + .with(SchemaHistory.NAME, "my-db-history") + .with(SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true) .build(); - history.configure(config, null, DatabaseHistoryListener.NOOP, true); + history.configure(config, null, SchemaHistoryListener.NOOP, true); history.start(); // dummytopic should not exist yet diff --git a/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java b/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java index 88acbc61c77..996aa5ac725 100644 --- a/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java +++ b/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java @@ -19,6 +19,7 @@ package org.apache.pulsar.io.debezium.mongodb; import java.util.Map; +import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.TaskConfig; import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.debezium.DebeziumSource; @@ -27,8 +28,14 @@ import org.apache.pulsar.io.debezium.DebeziumSource; * A pulsar source that runs debezium mongodb source. */ public class DebeziumMongoDbSource extends DebeziumSource { + private static final String DEFAULT_CONNECTOR = "io.debezium.connector.mongodb.MongoDbConnector"; private static final String DEFAULT_TASK = "io.debezium.connector.mongodb.MongoDbConnectorTask"; + @Override + public void setDbConnectorClass(Map<String, Object> config) throws Exception { + throwExceptionIfConfigNotMatch(config, ConnectorConfig.CONNECTOR_CLASS_CONFIG, DEFAULT_CONNECTOR); + } + @Override public void setDbConnectorTask(Map<String, Object> config) throws Exception { throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG, DEFAULT_TASK); diff --git a/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml b/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml index 8c1564bf4e6..54e0bbd7580 100644 --- a/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml +++ b/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml @@ -28,10 +28,11 @@ parallelism: 1 configs: ## config for pg, docker image: debezium/example-mongodb:0.8 mongodb.hosts: "rs0/mongodb:27017" - mongodb.name: "dbserver1" mongodb.user: "debezium" mongodb.password: "dbz" mongodb.task.id: "1" database.whitelist: "inventory" + topic.prefix: "dbserver1" - database.history.pulsar.service.url: "pulsar://127.0.0.1:6650" + schema.history.internal.pulsar.service.url: "pulsar://127.0.0.1:6650" + connector.class: "io.debezium.connector.mongodb.MongoDbConnector" diff --git a/pulsar-io/debezium/mssql/src/main/java/org/apache/pulsar/io/debezium/mssql/DebeziumMsSqlSource.java b/pulsar-io/debezium/mssql/src/main/java/org/apache/pulsar/io/debezium/mssql/DebeziumMsSqlSource.java index f60653e7aae..0df5eb6444a 100644 --- a/pulsar-io/debezium/mssql/src/main/java/org/apache/pulsar/io/debezium/mssql/DebeziumMsSqlSource.java +++ b/pulsar-io/debezium/mssql/src/main/java/org/apache/pulsar/io/debezium/mssql/DebeziumMsSqlSource.java @@ -19,6 +19,7 @@ package org.apache.pulsar.io.debezium.mssql; import java.util.Map; +import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.TaskConfig; import org.apache.pulsar.io.debezium.DebeziumSource; @@ -27,8 +28,14 @@ import org.apache.pulsar.io.debezium.DebeziumSource; * A pulsar source that runs debezium mssql source. */ public class DebeziumMsSqlSource extends DebeziumSource { + private static final String DEFAULT_CONNECTOR = "io.debezium.connector.sqlserver.SqlServerConnector"; private static final String DEFAULT_TASK = "io.debezium.connector.sqlserver.SqlServerConnectorTask"; + @Override + public void setDbConnectorClass(Map<String, Object> config) throws Exception { + throwExceptionIfConfigNotMatch(config, ConnectorConfig.CONNECTOR_CLASS_CONFIG, DEFAULT_CONNECTOR); + } + @Override public void setDbConnectorTask(Map<String, Object> config) throws Exception { throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG, DEFAULT_TASK); diff --git a/pulsar-io/debezium/mssql/src/main/resources/debezium-mssql-source-config.yaml b/pulsar-io/debezium/mssql/src/main/resources/debezium-mssql-source-config.yaml index c99fb48eab0..de09fedcb59 100644 --- a/pulsar-io/debezium/mssql/src/main/resources/debezium-mssql-source-config.yaml +++ b/pulsar-io/debezium/mssql/src/main/resources/debezium-mssql-source-config.yaml @@ -30,7 +30,8 @@ configs: database.port: "1521" database.user: "sa" database.password: "MyP@ssword1" - database.dbname: "MyDB" - database.server.name: "mssql" + database.names: "MyDB" + topic.prefix: "mssql" - database.history.pulsar.service.url: "pulsar://127.0.0.1:6650" + schema.history.internal.pulsar.service.url: "pulsar://127.0.0.1:6650" + connector.class: "io.debezium.connector.sqlserver.SqlServerConnector" diff --git a/pulsar-io/debezium/mysql/pom.xml b/pulsar-io/debezium/mysql/pom.xml index e884b906f8f..b609dec0fe3 100644 --- a/pulsar-io/debezium/mysql/pom.xml +++ b/pulsar-io/debezium/mysql/pom.xml @@ -55,8 +55,8 @@ <dependencyManagement> <dependencies> <dependency> - <groupId>mysql</groupId> - <artifactId>mysql-connector-java</artifactId> + <groupId>com.mysql</groupId> + <artifactId>mysql-connector-j</artifactId> <version>${debezium.mysql.version}</version> </dependency> </dependencies> diff --git a/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java b/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java index 633535512d2..7a89fa13193 100644 --- a/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java +++ b/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java @@ -19,6 +19,7 @@ package org.apache.pulsar.io.debezium.mysql; import java.util.Map; +import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.TaskConfig; import org.apache.pulsar.io.debezium.DebeziumSource; @@ -26,8 +27,14 @@ import org.apache.pulsar.io.debezium.DebeziumSource; * A pulsar source that runs debezium mysql source. */ public class DebeziumMysqlSource extends DebeziumSource { + private static final String DEFAULT_CONNECTOR = "io.debezium.connector.mysql.MySqlConnector"; private static final String DEFAULT_TASK = "io.debezium.connector.mysql.MySqlConnectorTask"; + @Override + public void setDbConnectorClass(Map<String, Object> config) throws Exception { + throwExceptionIfConfigNotMatch(config, ConnectorConfig.CONNECTOR_CLASS_CONFIG, DEFAULT_CONNECTOR); + } + @Override public void setDbConnectorTask(Map<String, Object> config) throws Exception { throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG, DEFAULT_TASK); diff --git a/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml b/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml index 7a51b0092ac..e278b2276aa 100644 --- a/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml +++ b/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml @@ -32,11 +32,12 @@ configs: database.user: "debezium" database.password: "dbz" database.server.id: "184054" - database.server.name: "dbserver1" database.whitelist: "inventory" + topic.prefix: "dbserver1" - database.history.pulsar.topic: "mysql-history-topic" - database.history.pulsar.service.url: "pulsar://127.0.0.1:6650" + schema.history.internal.pulsar.topic: "mysql-history-topic" + schema.history.internal.pulsar.service.url: "pulsar://127.0.0.1:6650" offset.storage.topic: "mysql-offset-topic" + connector.class: "io.debezium.connector.mysql.MySqlConnector" diff --git a/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumOracleSource.java b/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumOracleSource.java index cb7363ac8cd..7b17203760b 100644 --- a/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumOracleSource.java +++ b/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumOracleSource.java @@ -19,6 +19,7 @@ package org.apache.pulsar.io.debezium.oracle; import java.util.Map; +import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.TaskConfig; import org.apache.pulsar.io.debezium.DebeziumSource; @@ -27,8 +28,14 @@ import org.apache.pulsar.io.debezium.DebeziumSource; * A pulsar source that runs debezium oracle source. */ public class DebeziumOracleSource extends DebeziumSource { + private static final String DEFAULT_CONNECTOR = "io.debezium.connector.oracle.OracleConnector"; private static final String DEFAULT_TASK = "io.debezium.connector.oracle.OracleConnectorTask"; + @Override + public void setDbConnectorClass(Map<String, Object> config) throws Exception { + throwExceptionIfConfigNotMatch(config, ConnectorConfig.CONNECTOR_CLASS_CONFIG, DEFAULT_CONNECTOR); + } + @Override public void setDbConnectorTask(Map<String, Object> config) throws Exception { throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG, DEFAULT_TASK); diff --git a/pulsar-io/debezium/oracle/src/main/resources/debezium-oracle-source-config.yaml b/pulsar-io/debezium/oracle/src/main/resources/debezium-oracle-source-config.yaml index 94173d68a11..8955585979c 100644 --- a/pulsar-io/debezium/oracle/src/main/resources/debezium-oracle-source-config.yaml +++ b/pulsar-io/debezium/oracle/src/main/resources/debezium-oracle-source-config.yaml @@ -33,6 +33,7 @@ configs: database.user: "sysdba" database.password: "oracle" database.dbname: "XE" - database.server.name: "XE" + topic.prefix: "XE" - database.history.pulsar.service.url: "pulsar://127.0.0.1:6650" + schema.history.internal.pulsar.service.url: "pulsar://127.0.0.1:6650" + connector.class: "io.debezium.connector.oracle.OracleConnector" diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/debezium/pom.xml index af34475ff3b..8e668d808d2 100644 --- a/pulsar-io/debezium/pom.xml +++ b/pulsar-io/debezium/pom.xml @@ -31,46 +31,6 @@ <artifactId>pulsar-io-debezium</artifactId> <name>Pulsar IO :: Debezium</name> - <dependencyManagement> - <dependencies> - <dependency> - <groupId>org.wildfly.security</groupId> - <artifactId>wildfly-elytron-sasl-digest</artifactId> - <version>${wildfly-elytron.version}</version> - </dependency> - <dependency> - <groupId>org.wildfly.security</groupId> - <artifactId>wildfly-elytron-sasl-external</artifactId> - <version>${wildfly-elytron.version}</version> - </dependency> - <dependency> - <groupId>org.wildfly.security</groupId> - <artifactId>wildfly-elytron-sasl-gs2</artifactId> - <version>${wildfly-elytron.version}</version> - </dependency> - <dependency> - <groupId>org.wildfly.security</groupId> - <artifactId>wildfly-elytron-sasl-oauth2</artifactId> - <version>${wildfly-elytron.version}</version> - </dependency> - <dependency> - <groupId>org.wildfly.security</groupId> - <artifactId>wildfly-elytron-sasl-plain</artifactId> - <version>${wildfly-elytron.version}</version> - </dependency> - <dependency> - <groupId>org.wildfly.security</groupId> - <artifactId>wildfly-elytron-sasl-scram</artifactId> - <version>${wildfly-elytron.version}</version> - </dependency> - <dependency> - <groupId>org.wildfly.security</groupId> - <artifactId>wildfly-elytron-password-impl</artifactId> - <version>${wildfly-elytron.version}</version> - </dependency> - </dependencies> - </dependencyManagement> - <modules> <module>core</module> <module>mysql</module> diff --git a/pulsar-io/debezium/postgres/src/main/java/org/apache/pulsar/io/debezium/postgres/DebeziumPostgresSource.java b/pulsar-io/debezium/postgres/src/main/java/org/apache/pulsar/io/debezium/postgres/DebeziumPostgresSource.java index ef645aad6b1..54ab110edd0 100644 --- a/pulsar-io/debezium/postgres/src/main/java/org/apache/pulsar/io/debezium/postgres/DebeziumPostgresSource.java +++ b/pulsar-io/debezium/postgres/src/main/java/org/apache/pulsar/io/debezium/postgres/DebeziumPostgresSource.java @@ -19,6 +19,7 @@ package org.apache.pulsar.io.debezium.postgres; import java.util.Map; +import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.TaskConfig; import org.apache.pulsar.io.debezium.DebeziumSource; @@ -27,8 +28,14 @@ import org.apache.pulsar.io.debezium.DebeziumSource; * A pulsar source that runs debezium postgres source. */ public class DebeziumPostgresSource extends DebeziumSource { + private static final String DEFAULT_CONNECTOR = "io.debezium.connector.postgresql.PostgresConnector"; private static final String DEFAULT_TASK = "io.debezium.connector.postgresql.PostgresConnectorTask"; + @Override + public void setDbConnectorClass(Map<String, Object> config) throws Exception { + throwExceptionIfConfigNotMatch(config, ConnectorConfig.CONNECTOR_CLASS_CONFIG, DEFAULT_CONNECTOR); + } + @Override public void setDbConnectorTask(Map<String, Object> config) throws Exception { throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG, DEFAULT_TASK); diff --git a/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml b/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml index 3f6b7eaaba2..4048c73e53b 100644 --- a/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml +++ b/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml @@ -32,7 +32,8 @@ configs: database.user: "postgres" database.password: "postgres" database.dbname: "postgres" - database.server.name: "dbserver1" - schema.whitelist: "inventory" + schema.allow.list: "inventory" + topic.prefix: "dbserver1" - database.history.pulsar.service.url: "pulsar://127.0.0.1:6650" + schema.history.internal.pulsar.service.url: "pulsar://127.0.0.1:6650" + connector.class: "io.debezium.connector.postgresql.PostgresConnector" diff --git a/pulsar-io/mongo/pom.xml b/pulsar-io/mongo/pom.xml index ab268f3f841..90d4c88b1c8 100644 --- a/pulsar-io/mongo/pom.xml +++ b/pulsar-io/mongo/pom.xml @@ -33,7 +33,7 @@ <name>Pulsar IO :: MongoDB</name> <properties> - <mongo-reactivestreams.version>4.1.2</mongo-reactivestreams.version> + <mongo-reactivestreams.version>5.2.0</mongo-reactivestreams.version> </properties> <dependencies> diff --git a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java index e442ed0bcad..ea62e546abc 100644 --- a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java +++ b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java @@ -26,13 +26,22 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.mongodb.MongoBulkWriteException; +import com.mongodb.ServerAddress; import com.mongodb.bulk.BulkWriteError; +import com.mongodb.bulk.BulkWriteInsert; +import com.mongodb.bulk.BulkWriteResult; +import com.mongodb.bulk.BulkWriteUpsert; +import com.mongodb.bulk.WriteConcernError; +import com.mongodb.internal.bulk.WriteRequest; import com.mongodb.reactivestreams.client.MongoClient; import com.mongodb.reactivestreams.client.MongoCollection; import com.mongodb.reactivestreams.client.MongoDatabase; import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.core.SinkContext; import org.bson.BsonDocument; @@ -97,7 +106,15 @@ public class MongoSinkTest { if (throwBulkError) { List<BulkWriteError> writeErrors = Arrays.asList( new BulkWriteError(0, "error", new BsonDocument(), 1)); - exc = new MongoBulkWriteException(null, writeErrors, null, null); + BulkWriteResult result = BulkWriteResult.acknowledged( + WriteRequest.Type.INSERT, 1, 0, + Collections.<BulkWriteUpsert>emptyList(), + Collections.<BulkWriteInsert>emptyList()); + WriteConcernError writeConcernError = null; + ServerAddress serverAddress = new ServerAddress("localhost", 27017); + Set<String> errorLabels = new HashSet<>(); + exc = new MongoBulkWriteException(result, writeErrors, writeConcernError, + serverAddress, errorLabels); } subscriber.onError(exc); return null; diff --git a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceTest.java b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceTest.java index 6da4b1123fd..7243b365e88 100644 --- a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceTest.java +++ b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceTest.java @@ -110,17 +110,21 @@ public class MongoSourceTest { source.open(map, mockSourceContext); - subscriber.onNext(new ChangeStreamDocument<>( - OperationType.INSERT, + subscriber.onNext(new ChangeStreamDocument<Document>( + OperationType.INSERT.getValue(), BsonDocument.parse("{token: true}"), BsonDocument.parse("{db: \"hello\", coll: \"pulsar\"}"), BsonDocument.parse("{db: \"hello2\", coll: \"pulsar2\"}"), new Document("hello", "pulsar"), + new Document("hello", "pulsar"), // documentKey BsonDocument.parse("{_id: 1}"), new BsonTimestamp(1234, 2), - null, + null, // UpdateDescription new BsonInt64(1), - BsonDocument.parse("{id: 1, uid: 1}"))); + BsonDocument.parse("{id: 1, uid: 1}"), + null, // BsonDateTime + null, // SplitEvent + BsonDocument.parse("{extra: 1}"))); Record<byte[]> record = source.read(); diff --git a/src/owasp-dependency-check-suppressions.xml b/src/owasp-dependency-check-suppressions.xml index 1ce7392a489..f302c251f96 100644 --- a/src/owasp-dependency-check-suppressions.xml +++ b/src/owasp-dependency-check-suppressions.xml @@ -364,49 +364,6 @@ <cpe>cpe:/a:apache:solr</cpe> </suppress> - <!-- debezium-related misdetections --> - <suppress> - <notes><![CDATA[ - file name: debezium-connector-mysql-1.9.7.Final.jar - ]]></notes> - <sha1>74c623b4a9b231e2f0e8f6053b38abd3bc487ce2</sha1> - <cve>CVE-2017-15945</cve> - </suppress> - <suppress> - <notes><![CDATA[ - file name: mysql-binlog-connector-java-0.27.2.jar - ]]></notes> - <sha1>23294cd730e29c00b8ddfbde517dfc955aba090f</sha1> - <cve>CVE-2017-15945</cve> - </suppress> - <suppress> - <notes><![CDATA[ - file name: debezium-connector-postgres-1.9.7.Final.jar - ]]></notes> - <sha1>300ff0bbf795643e914b7c8a6d6ba812a8354d62</sha1> - <cve>CVE-2015-0241</cve> - <cve>CVE-2015-0242</cve> - <cve>CVE-2015-0243</cve> - <cve>CVE-2015-0244</cve> - <cve>CVE-2015-3166</cve> - <cve>CVE-2015-3167</cve> - <cve>CVE-2016-0766</cve> - <cve>CVE-2016-0768</cve> - <cve>CVE-2016-0773</cve> - <cve>CVE-2016-5423</cve> - <cve>CVE-2016-5424</cve> - <cve>CVE-2016-7048</cve> - <cve>CVE-2017-14798</cve> - <cve>CVE-2017-7484</cve> - <cve>CVE-2018-1115</cve> - <cve>CVE-2019-10127</cve> - <cve>CVE-2019-10128</cve> - <cve>CVE-2019-10210</cve> - <cve>CVE-2019-10211</cve> - <cve>CVE-2020-25694</cve> - <cve>CVE-2020-25695</cve> - <cve>CVE-2021-23214</cve> - </suppress> <suppress> <notes><![CDATA[ file name: protostream-types-4.4.1.Final.jar diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml index 1d0ac102f13..031e070a3c5 100644 --- a/tests/integration/pom.xml +++ b/tests/integration/pom.xml @@ -34,7 +34,7 @@ <properties> <integrationTestSuiteFile>pulsar.xml</integrationTestSuiteFile> - <mongo-reactivestreams.version>4.1.2</mongo-reactivestreams.version> + <mongo-reactivestreams.version>5.2.0</mongo-reactivestreams.version> </properties> <dependencies> diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java index 481725d145b..6fa2e9ff471 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java @@ -25,7 +25,7 @@ public class DebeziumMongoDbContainer extends ChaosContainer<DebeziumMongoDbCont public static final String NAME = "debezium-mongodb-example"; public static final Integer[] PORTS = { 27017 }; - private static final String IMAGE_NAME = "debezium/example-mongodb:0.10"; + private static final String IMAGE_NAME = "debezium/example-mongodb:3.0.0.Final"; public DebeziumMongoDbContainer(String clusterName) { super(clusterName, IMAGE_NAME); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java index 27d624a6c82..cf59cda868d 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java @@ -26,7 +26,7 @@ public class DebeziumMySQLContainer extends ChaosContainer<DebeziumMySQLContaine public static final String NAME = "debezium-mysql-example"; static final Integer[] PORTS = { 3306 }; - private static final String IMAGE_NAME = "debezium/example-mysql:0.8"; + private static final String IMAGE_NAME = "debezium/example-mysql:3.0.0.Final"; public DebeziumMySQLContainer(String clusterName) { super(clusterName, IMAGE_NAME); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java index 479869c4183..4fd391fd926 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java @@ -26,7 +26,7 @@ public class DebeziumPostgreSqlContainer extends ChaosContainer<DebeziumPostgreS public static final String NAME = "debezium-postgresql-example"; static final Integer[] PORTS = { 5432 }; - private static final String IMAGE_NAME = "debezium/example-postgres:0.10"; + private static final String IMAGE_NAME = "debezium/example-postgres:3.0.0.Final"; public DebeziumPostgreSqlContainer(String clusterName) { super(clusterName, IMAGE_NAME); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/PulsarIOSourceRunner.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/PulsarIOSourceRunner.java index 92918a14006..7cd1c3ebf66 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/PulsarIOSourceRunner.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/PulsarIOSourceRunner.java @@ -272,6 +272,21 @@ public class PulsarIOSourceRunner extends PulsarIOTestRunner { result.getStdout() ); result.assertNoStderr(); + + final String[] packageCommands = { + PulsarCluster.ADMIN_SCRIPT, + "packages", + "delete", + "source://" + tenant + "/" + namespace + "/" + sourceName + "@0" + }; + + try { + ContainerExecResult packageResult = pulsarCluster.getAnyWorker().execCmd(packageCommands); + log.info("Package metadata deletion result: {}", packageResult.getStdout()); + } catch (Exception e) { + log.warn("Failed to delete package metadata for source://{}/{}/{}@0: {}", + tenant, namespace, sourceName, e.getMessage()); + } } protected void getSourceInfoNotFound(String tenant, String namespace, String sourceName) throws Exception { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java index 95b075e660a..bd2c95b5772 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java @@ -58,6 +58,8 @@ public abstract class SourceTester<ServiceContainerT extends GenericContainer> i add("source"); add("op"); add("ts_ms"); + add("ts_us"); + add("ts_ns"); add("transaction"); }}; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java index 4c562f3e244..1826d78f351 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java @@ -42,15 +42,17 @@ public class DebeziumMongoDbSourceTester extends SourceTester<DebeziumMongoDbCon this.pulsarCluster = cluster; pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT; - sourceConfig.put("mongodb.hosts", "rs0/" + DebeziumMongoDbContainer.NAME + ":27017"); - sourceConfig.put("mongodb.name", "dbserver1"); + sourceConfig.put("mongodb.connection.string", + "mongodb://debezium:dbz@" + DebeziumMongoDbContainer.NAME + ":27017/admin?replicaSet=rs0"); sourceConfig.put("mongodb.user", "debezium"); sourceConfig.put("mongodb.password", "dbz"); sourceConfig.put("mongodb.task.id", "1"); + sourceConfig.put("topic.prefix", "dbserver1"); sourceConfig.put("database.include.list", "inventory"); - sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl); + sourceConfig.put("schema.history.internal.pulsar.service.url", pulsarServiceUrl); sourceConfig.put("topic.namespace", "debezium/mongodb"); - sourceConfig.put("capture.mode", "oplog"); + sourceConfig.put("capture.mode", "change_streams_update_full"); + sourceConfig.put("connector.class", "io.debezium.connector.mongodb.MongoDbConnector"); } @Override @@ -69,10 +71,10 @@ public class DebeziumMongoDbSourceTester extends SourceTester<DebeziumMongoDbCon @Override public void prepareInsertEvent() throws Exception { this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c", - "mongo -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " + "mongosh -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " + "--eval 'db.products.find()'"); this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c", - "mongo -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " + "mongosh -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " + "--eval 'db.products.insert({ " + "_id : NumberLong(\"110\")," + "name : \"test-debezium\"," @@ -84,20 +86,20 @@ public class DebeziumMongoDbSourceTester extends SourceTester<DebeziumMongoDbCon @Override public void prepareDeleteEvent() throws Exception { this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c", - "mongo -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " + "mongosh -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " + "--eval 'db.products.find()'"); this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c", - "mongo -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " + "mongosh -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " + "--eval 'db.products.deleteOne({name : \"test-debezium-update\"})'"); } @Override public void prepareUpdateEvent() throws Exception { this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c", - "mongo -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " + "mongosh -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " + "--eval 'db.products.find()'"); this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c", - "mongo -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " + "mongosh -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory " + "--eval 'db.products.update({" + "_id : 110}," + "{$set:{name:\"test-debezium-update\", description: \"this is update description\"}})'"); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java index 25a7544f52b..2295a670660 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java @@ -58,11 +58,15 @@ public class DebeziumMsSqlSourceTester extends SourceTester<DebeziumMsSqlContain sourceConfig.put("database.port", "1433"); sourceConfig.put("database.user", "sa"); sourceConfig.put("database.password", DebeziumMsSqlContainer.SA_PASSWORD); - sourceConfig.put("database.server.name", "mssql"); - sourceConfig.put("database.dbname", "TestDB"); + sourceConfig.put("database.names", "TestDB"); + sourceConfig.put("database.encrypt", "false"); sourceConfig.put("snapshot.mode", "schema_only"); - sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl); + sourceConfig.put("schema.history.internal.pulsar.topic", "debezium-schema-history-mssql"); + sourceConfig.put("schema.history.internal.pulsar.service.url", pulsarServiceUrl); + sourceConfig.put("topic.prefix", "mssql"); sourceConfig.put("topic.namespace", "debezium/mssql"); + sourceConfig.put("task.id", "1"); + sourceConfig.put("connector.class", "io.debezium.connector.sqlserver.SqlServerConnector"); } @Override @@ -145,12 +149,12 @@ public class DebeziumMsSqlSourceTester extends SourceTester<DebeziumMsSqlContain @Override public String keyContains() { - return "mssql.dbo.customers.Key"; + return "TestDB"; } @Override public String valueContains() { - return "mssql.dbo.customers.Value"; + return "TestDB"; } @Override diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java index 2c457cd2fb9..7f9ccbf5d20 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java @@ -58,15 +58,16 @@ public class DebeziumMySqlSourceTester extends SourceTester<DebeziumMySQLContain sourceConfig.put("database.user", "debezium"); sourceConfig.put("database.password", "dbz"); sourceConfig.put("database.server.id", "184054"); - sourceConfig.put("database.server.name", "dbserver1"); - sourceConfig.put("database.whitelist", "inventory"); + sourceConfig.put("topic.prefix", "dbserver1"); + sourceConfig.put("database.include.list", "inventory"); if (!testWithClientBuilder) { - sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl); + sourceConfig.put("schema.history.internal.pulsar.service.url", pulsarServiceUrl); } sourceConfig.put("key.converter", converterClassName); sourceConfig.put("value.converter", converterClassName); sourceConfig.put("topic.namespace", "debezium/mysql-" + (converterClassName.endsWith("AvroConverter") ? "avro" : "json")); + sourceConfig.put("connector.class", "io.debezium.connector.mysql.MySqlConnector"); } @Override diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java index 41db0a7cc18..39b4361ee8a 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java @@ -58,13 +58,15 @@ public class DebeziumOracleDbSourceTester extends SourceTester<DebeziumOracleDbC sourceConfig.put("database.port", "1521"); sourceConfig.put("database.user", "dbzuser"); sourceConfig.put("database.password", "dbz"); - sourceConfig.put("database.server.name", "XE"); + sourceConfig.put("topic.prefix", "XE"); sourceConfig.put("database.dbname", "XE"); sourceConfig.put("snapshot.mode", "schema_only"); sourceConfig.put("schema.include.list", "inv"); - sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl); + sourceConfig.put("schema.history.internal.pulsar.service.url", pulsarServiceUrl); sourceConfig.put("topic.namespace", "debezium/oracle"); + + sourceConfig.put("connector.class", "io.debezium.connector.oracle.OracleConnector"); } @Override diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java index 75b071c0bd8..e23a5f1ad50 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java @@ -70,13 +70,13 @@ public class DebeziumPostgreSqlSourceTester extends SourceTester<DebeziumPostgre sourceConfig.put("database.port", "5432"); sourceConfig.put("database.user", "postgres"); sourceConfig.put("database.password", "postgres"); - sourceConfig.put("database.server.id", "184055"); - sourceConfig.put("database.server.name", "dbserver1"); + sourceConfig.put("topic.prefix", "dbserver1"); sourceConfig.put("database.dbname", "postgres"); - sourceConfig.put("schema.whitelist", "inventory"); - sourceConfig.put("table.blacklist", "inventory.spatial_ref_sys,inventory.geom"); - sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl); + sourceConfig.put("schema.include.list", "inventory"); + sourceConfig.put("table.exclude.list", "inventory.spatial_ref_sys,inventory.geom"); + sourceConfig.put("schema.history.internal.pulsar.service.url", pulsarServiceUrl); sourceConfig.put("topic.namespace", "debezium/postgresql"); + sourceConfig.put("connector.class", "io.debezium.connector.postgresql.PostgresConnector"); } @Override diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java index 50160d94eef..eb229410391 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java @@ -91,7 +91,7 @@ public class PulsarDebeziumSourcesTest extends PulsarIOTestBase { + "-" + functionRuntimeType + "-" + randomName(8); // This is the binlog count that contained in mysql container. - final int numMessages = 47; + final int numMessages = 52; @Cleanup PulsarClient client = PulsarClient.builder() @@ -214,7 +214,7 @@ public class PulsarDebeziumSourcesTest extends PulsarIOTestBase { final String tenant = TopicName.PUBLIC_TENANT; final String namespace = TopicName.DEFAULT_NAMESPACE; final String outputTopicName = "debe-output-topic-name-" + testId.getAndIncrement(); - final String consumeTopicName = "debezium/mssql/mssql.dbo.customers"; + final String consumeTopicName = "debezium/mssql/mssql.TestDB.dbo.customers"; final String sourceName = "test-source-debezium-mssql-" + functionRuntimeType + "-" + randomName(8); final int numMessages = 1; diff --git a/tests/scripts/pre-integ-tests.sh b/tests/scripts/pre-integ-tests.sh index 9f564a3f5f9..f8ba1bdc701 100755 --- a/tests/scripts/pre-integ-tests.sh +++ b/tests/scripts/pre-integ-tests.sh @@ -30,5 +30,5 @@ docker pull apachepulsar/s3mock:latest docker pull alpine/socat:latest docker pull cassandra:3 docker pull confluentinc/cp-kafka:4.0.0 -docker pull debezium/example-mysql:0.8 -docker pull mysql:5.7.22 +docker pull debezium/example-mysql:3.0.0.Final +docker pull mysql:9.1.0
