This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new be72511f [improve] auto create Doris table for new MongoDB collections
in Data (#573)
be72511f is described below
commit be72511f15f1fcfe2ec9fb110c76d0920dab7cee
Author: kwonder0926 <[email protected]>
AuthorDate: Thu Mar 27 18:57:18 2025 +0800
[improve] auto create Doris table for new MongoDB collections in Data (#573)
---
.../apache/doris/flink/tools/cdc/DatabaseSync.java | 46 ++------
.../tools/cdc/mongodb/MongoDBDatabaseSync.java | 3 +
.../flink/tools/cdc/mongodb/MongoDBSchema.java | 23 ++++
.../MongoDBJsonDebeziumSchemaSerializer.java | 29 +++++-
.../serializer/MongoJsonDebeziumDataChange.java | 19 +---
.../serializer/MongoJsonDebeziumSchemaChange.java | 56 +++++++++-
.../flink/tools/cdc/utils/DorisTableUtil.java | 109 +++++++++++++++++++
.../flink/tools/cdc/utils/JsonNodeExtractUtil.java | 62 +++++++++++
.../tools/cdc/mongodb/MongoDBCreateTableTest.java | 116 +++++++++++++++++++++
9 files changed, 404 insertions(+), 59 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 06e49e24..326b4e87 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -28,14 +28,12 @@ import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
-import org.apache.doris.flink.catalog.doris.DorisSchemaFactory;
import org.apache.doris.flink.catalog.doris.DorisSystem;
import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.cfg.DorisConnectionOptions;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.exception.DorisSystemException;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.schema.SchemaChangeMode;
import org.apache.doris.flink.sink.writer.WriteMode;
@@ -44,12 +42,12 @@ import
org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerialize
import org.apache.doris.flink.table.DorisConfigOptions;
import org.apache.doris.flink.tools.cdc.converter.TableNameConverter;
import org.apache.doris.flink.tools.cdc.oracle.OracleDatabaseSync;
+import org.apache.doris.flink.tools.cdc.utils.DorisTableUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
-import java.sql.SQLSyntaxErrorException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -150,7 +148,13 @@ public abstract class DatabaseSync {
// Calculate the mapping relationship between upstream and
downstream tables
tableMapping.put(
schema.getTableIdentifier(), String.format("%s.%s",
targetDb, dorisTable));
- tryCreateTableIfAbsent(dorisSystem, targetDb, dorisTable, schema);
+ DorisTableUtil.tryCreateTableIfAbsent(
+ dorisSystem,
+ targetDb,
+ dorisTable,
+ schema,
+ dorisTableConfig,
+ ignoreIncompatible);
if (!dorisTables.contains(Tuple2.of(targetDb, dorisTable))) {
dorisTables.add(Tuple2.of(targetDb, dorisTable));
@@ -470,40 +474,6 @@ public abstract class DatabaseSync {
}
}
- private void tryCreateTableIfAbsent(
- DorisSystem dorisSystem, String targetDb, String dorisTable,
SourceSchema schema) {
- if (!dorisSystem.tableExists(targetDb, dorisTable)) {
- if (dorisTableConfig.isConvertUniqToPk()
- && CollectionUtil.isNullOrEmpty(schema.primaryKeys)
- && !CollectionUtil.isNullOrEmpty(schema.uniqueIndexs)) {
- schema.primaryKeys = new ArrayList<>(schema.uniqueIndexs);
- }
- TableSchema dorisSchema =
- DorisSchemaFactory.createTableSchema(
- targetDb,
- dorisTable,
- schema.getFields(),
- schema.getPrimaryKeys(),
- dorisTableConfig,
- schema.getTableComment());
- try {
- dorisSystem.createTable(dorisSchema);
- } catch (Exception ex) {
- handleTableCreationFailure(ex);
- }
- }
- }
-
- private void handleTableCreationFailure(Exception ex) throws
DorisSystemException {
- if (ignoreIncompatible && ex.getCause() instanceof
SQLSyntaxErrorException) {
- LOG.warn(
- "Doris schema and source table schema are not compatible.
Error: {} ",
- ex.getCause().toString());
- } else {
- throw new DorisSystemException("Failed to create table due to: ",
ex);
- }
- }
-
protected Properties getJdbcProperties() {
Properties jdbcProps = new Properties();
for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
index ca034ac3..b77ec0e3 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
@@ -240,6 +240,9 @@ public class MongoDBDatabaseSync extends DatabaseSync {
.setTableMapping(tableMapping)
.setTableConf(dorisTableConfig)
.setTargetDatabase(database)
+ .setTargetTablePrefix(tablePrefix)
+ .setTargetTableSuffix(tableSuffix)
+ .setTableNameConverter(converter)
.build();
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
index 984419bc..2e2788de 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
@@ -20,6 +20,7 @@ package org.apache.doris.flink.tools.cdc.mongodb;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
+import com.fasterxml.jackson.databind.JsonNode;
import org.apache.doris.flink.catalog.doris.DorisType;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.tools.cdc.SourceSchema;
@@ -64,6 +65,28 @@ public class MongoDBSchema extends SourceSchema {
primaryKeys.add("_id");
}
+ public MongoDBSchema(
+ JsonNode jsonData, String databaseName, String tableName, String
tableComment)
+ throws Exception {
+ super(databaseName, null, tableName, tableComment);
+ fields = new LinkedHashMap<>();
+ processSampleData(jsonData);
+
+ primaryKeys = new ArrayList<>();
+ primaryKeys.add("_id");
+ }
+
+ @VisibleForTesting
+ protected void processSampleData(JsonNode data) {
+ data.fieldNames()
+ .forEachRemaining(
+ fieldName -> {
+ JsonNode value = data.get(fieldName);
+ String dorisType =
MongoDBType.jsonNodeToDorisType(value);
+ fields.put(fieldName, new FieldSchema(fieldName,
dorisType, null));
+ });
+ }
+
@VisibleForTesting
protected void processSampleData(Document sampleData) {
for (Map.Entry<String, Object> entry : sampleData.entrySet()) {
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
index 296a3727..dec06e91 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
@@ -29,6 +29,7 @@ import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcDataChange;
import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcSchemaChange;
import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext;
import org.apache.doris.flink.tools.cdc.DorisTableConfig;
+import org.apache.doris.flink.tools.cdc.converter.TableNameConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,6 +63,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements
DorisRecordSerialize
private String targetTablePrefix;
private String targetTableSuffix;
+ private TableNameConverter tableNameConverter;
public MongoDBJsonDebeziumSchemaSerializer(
DorisOptions dorisOptions,
@@ -72,7 +74,8 @@ public class MongoDBJsonDebeziumSchemaSerializer implements
DorisRecordSerialize
DorisTableConfig dorisTableConfig,
String targetDatabase,
String targetTablePrefix,
- String targetTableSuffix) {
+ String targetTableSuffix,
+ TableNameConverter tableNameConverter) {
this.dorisOptions = dorisOptions;
this.pattern = pattern;
this.sourceTableName = sourceTableName;
@@ -85,6 +88,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements
DorisRecordSerialize
this.targetDatabase = targetDatabase;
this.targetTablePrefix = targetTablePrefix;
this.targetTableSuffix = targetTableSuffix;
+ this.tableNameConverter = tableNameConverter;
if (executionOptions != null) {
this.lineDelimiter =
executionOptions
@@ -111,6 +115,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements
DorisRecordSerialize
targetTablePrefix,
targetTableSuffix,
enableDelete);
+ changeContext.setTableNameConverter(tableNameConverter);
this.dataChange = new MongoJsonDebeziumDataChange(changeContext);
this.schemaChange = new MongoJsonDebeziumSchemaChange(changeContext);
}
@@ -143,6 +148,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements
DorisRecordSerialize
private String targetDatabase;
private String targetTablePrefix = "";
private String targetTableSuffix = "";
+ private TableNameConverter tableNameConverter;
public MongoDBJsonDebeziumSchemaSerializer.Builder setDorisOptions(
DorisOptions dorisOptions) {
@@ -192,6 +198,24 @@ public class MongoDBJsonDebeziumSchemaSerializer
implements DorisRecordSerialize
return this;
}
+ public MongoDBJsonDebeziumSchemaSerializer.Builder
setTargetTablePrefix(
+ String targetTablePrefix) {
+ this.targetTablePrefix = targetTablePrefix;
+ return this;
+ }
+
+ public MongoDBJsonDebeziumSchemaSerializer.Builder
setTargetTableSuffix(
+ String targetTableSuffix) {
+ this.targetTableSuffix = targetTableSuffix;
+ return this;
+ }
+
+ public MongoDBJsonDebeziumSchemaSerializer.Builder
setTableNameConverter(
+ TableNameConverter converter) {
+ this.tableNameConverter = converter;
+ return this;
+ }
+
public MongoDBJsonDebeziumSchemaSerializer build() {
return new MongoDBJsonDebeziumSchemaSerializer(
dorisOptions,
@@ -202,7 +226,8 @@ public class MongoDBJsonDebeziumSchemaSerializer implements
DorisRecordSerialize
dorisTableConfig,
targetDatabase,
targetTablePrefix,
- targetTableSuffix);
+ targetTableSuffix,
+ tableNameConverter);
}
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java
index 9dbe7ffe..12fa8581 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java
@@ -20,7 +20,6 @@ package org.apache.doris.flink.tools.cdc.mongodb.serializer;
import org.apache.flink.util.StringUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.NullNode;
@@ -30,6 +29,7 @@ import
org.apache.doris.flink.sink.writer.serializer.DorisRecord;
import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcDataChange;
import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext;
import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.apache.doris.flink.tools.cdc.utils.JsonNodeExtractUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -127,16 +127,7 @@ public class MongoJsonDebeziumDataChange extends
CdcDataChange implements Change
@Override
public Map<String, Object> extractAfterRow(JsonNode recordRoot) {
JsonNode dataNode = recordRoot.get(FIELD_DATA);
- Map<String, Object> rowMap = extractRow(dataNode);
- String objectId;
- // if user specifies the `_id` field manually, the $oid field may not
exist
- if (rowMap.get(ID_FIELD) instanceof Map<?, ?>) {
- objectId = ((Map<?, ?>)
rowMap.get(ID_FIELD)).get(OID_FIELD).toString();
- } else {
- objectId = rowMap.get(ID_FIELD).toString();
- }
- rowMap.put(ID_FIELD, objectId);
- return rowMap;
+ return JsonNodeExtractUtil.extractAfterRow(dataNode, objectMapper);
}
private Map<String, Object> extractDeleteRow(JsonNode recordRoot)
@@ -154,10 +145,4 @@ public class MongoJsonDebeziumDataChange extends
CdcDataChange implements Change
row.put(ID_FIELD, objectId);
return row;
}
-
- private Map<String, Object> extractRow(JsonNode recordRow) {
- Map<String, Object> recordMap =
- objectMapper.convertValue(recordRow, new
TypeReference<Map<String, Object>>() {});
- return recordMap != null ? recordMap : new HashMap<>();
- }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java
index c67e856e..cfcf53f2 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java
@@ -17,10 +17,14 @@
package org.apache.doris.flink.tools.cdc.mongodb.serializer;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.StringUtils;
+
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.catalog.doris.DorisSystem;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.cfg.DorisOptions;
@@ -30,8 +34,11 @@ import
org.apache.doris.flink.sink.schema.SchemaChangeManager;
import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcSchemaChange;
import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext;
import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.apache.doris.flink.tools.cdc.mongodb.MongoDBSchema;
import org.apache.doris.flink.tools.cdc.mongodb.MongoDBType;
import org.apache.doris.flink.tools.cdc.mongodb.MongoDateConverter;
+import org.apache.doris.flink.tools.cdc.utils.DorisTableUtil;
+import org.apache.doris.flink.tools.cdc.utils.JsonNodeExtractUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,17 +67,19 @@ public class MongoJsonDebeziumSchemaChange extends
CdcSchemaChange {
private final Map<String, Map<String, String>> tableFields;
- private final SchemaChangeManager schemaChangeManager;
+ private SchemaChangeManager schemaChangeManager;
- private final DorisSystem dorisSystem;
+ private DorisSystem dorisSystem;
public Map<String, String> tableMapping;
private final DorisOptions dorisOptions;
+ public JsonDebeziumChangeContext changeContext;
private final Set<String> specialFields =
new HashSet<>(Arrays.asList(DATE_FIELD, TIMESTAMP_FIELD,
DECIMAL_FIELD, LONG_FIELD));
public MongoJsonDebeziumSchemaChange(JsonDebeziumChangeContext
changeContext) {
+ this.changeContext = changeContext;
this.objectMapper = changeContext.getObjectMapper();
this.dorisOptions = changeContext.getDorisOptions();
this.tableFields = new HashMap<>();
@@ -96,6 +105,35 @@ public class MongoJsonDebeziumSchemaChange extends
CdcSchemaChange {
String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
String dorisTableIdentifier =
getDorisTableIdentifier(cdcTableIdentifier, dorisOptions,
tableMapping);
+
+ // if table dorisTableIdentifier is null, create table
+ if (StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)) {
+ String[] split = cdcTableIdentifier.split("\\.");
+ String targetDb = changeContext.getTargetDatabase();
+ String sourceTable = split[1];
+ String dorisTable =
changeContext.getTableNameConverter().convert(sourceTable);
+ LOG.info(
+ "The table [{}.{}] does not exist. Attempting to
create a new table named: {}.{}",
+ targetDb,
+ sourceTable,
+ targetDb,
+ dorisTable);
+ tableMapping.put(cdcTableIdentifier, String.format("%s.%s",
targetDb, dorisTable));
+ dorisTableIdentifier = tableMapping.get(cdcTableIdentifier);
+ Map<String, Object> stringObjectMap = extractAfterRow(logData);
+ JsonNode jsonNode = objectMapper.valueToTree(stringObjectMap);
+
+ MongoDBSchema mongoSchema = new MongoDBSchema(jsonNode,
targetDb, dorisTable, "");
+
+ mongoSchema.setModel(DataModel.UNIQUE);
+ DorisTableUtil.tryCreateTableIfAbsent(
+ dorisSystem,
+ targetDb,
+ dorisTable,
+ mongoSchema,
+ changeContext.getDorisTableConf());
+ }
+
String[] tableInfo = dorisTableIdentifier.split("\\.");
if (tableInfo.length != 2) {
throw new DorisRuntimeException();
@@ -163,6 +201,10 @@ public class MongoJsonDebeziumSchemaChange extends
CdcSchemaChange {
}
}
+ public Map<String, Object> extractAfterRow(JsonNode recordRoot) {
+ return JsonNodeExtractUtil.extractAfterRow(recordRoot, objectMapper);
+ }
+
private void checkAndUpdateSchemaChange(
JsonNode logData, String dorisTableIdentifier, String database,
String table) {
Map<String, String> tableFieldMap =
tableFields.get(dorisTableIdentifier);
@@ -207,4 +249,14 @@ public class MongoJsonDebeziumSchemaChange extends
CdcSchemaChange {
String db = nameSpace.get(FIELD_DATABASE).asText();
return SourceSchema.getString(db, null, table);
}
+
+ @VisibleForTesting
+ public void setDorisSystem(DorisSystem dorisSystem) {
+ this.dorisSystem = dorisSystem;
+ }
+
+ @VisibleForTesting
+ public void setSchemaChangeManager(SchemaChangeManager
schemaChangeManager) {
+ this.schemaChangeManager = schemaChangeManager;
+ }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/utils/DorisTableUtil.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/utils/DorisTableUtil.java
new file mode 100644
index 00000000..114bf151
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/utils/DorisTableUtil.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.utils;
+
+import org.apache.flink.util.CollectionUtil;
+
+import org.apache.doris.flink.catalog.doris.DorisSchemaFactory;
+import org.apache.doris.flink.catalog.doris.DorisSystem;
+import org.apache.doris.flink.catalog.doris.TableSchema;
+import org.apache.doris.flink.exception.DorisSystemException;
+import org.apache.doris.flink.tools.cdc.DorisTableConfig;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLSyntaxErrorException;
+import java.util.ArrayList;
+
+/** Utility class for Doris table operations. */
+public class DorisTableUtil {
+ private static final Logger LOG =
LoggerFactory.getLogger(DorisTableUtil.class);
+
+ /**
+ * Try to create a table in doris if it doesn't exist.
+ *
+ * @param dorisSystem Doris system instance
+ * @param targetDb Doris database name
+ * @param dorisTable Doris table name
+ * @param schema Doris table schema
+ * @param tableConfig Table configuration
+ * @param ignoreIncompatible Whether to ignore incompatible schema errors
+ * @throws DorisSystemException if table creation fails
+ */
+ public static void tryCreateTableIfAbsent(
+ DorisSystem dorisSystem,
+ String targetDb,
+ String dorisTable,
+ SourceSchema schema,
+ DorisTableConfig tableConfig,
+ boolean ignoreIncompatible)
+ throws DorisSystemException {
+
+ if (!dorisSystem.tableExists(targetDb, dorisTable)) {
+ if (tableConfig.isConvertUniqToPk()
+ && CollectionUtil.isNullOrEmpty(schema.primaryKeys)
+ && !CollectionUtil.isNullOrEmpty(schema.uniqueIndexs)) {
+ schema.primaryKeys = new ArrayList<>(schema.uniqueIndexs);
+ }
+
+ TableSchema dorisSchema =
+ DorisSchemaFactory.createTableSchema(
+ targetDb,
+ dorisTable,
+ schema.getFields(),
+ schema.getPrimaryKeys(),
+ tableConfig,
+ schema.getTableComment());
+ try {
+ dorisSystem.createTable(dorisSchema);
+ } catch (Exception ex) {
+ handleTableCreationFailure(ex, ignoreIncompatible);
+ }
+ }
+ }
+
+ /** Overloaded method without ignoreIncompatible parameter. */
+ public static void tryCreateTableIfAbsent(
+ DorisSystem dorisSystem,
+ String targetDb,
+ String dorisTable,
+ SourceSchema schema,
+ DorisTableConfig tableConfig)
+ throws DorisSystemException {
+ tryCreateTableIfAbsent(dorisSystem, targetDb, dorisTable, schema,
tableConfig, false);
+ }
+
+ /**
+ * Handle table creation failure.
+ *
+ * @param ex Exception that occurred during table creation
+ * @param ignoreIncompatible Whether to ignore incompatible schema errors
+ * @throws DorisSystemException if table creation fails and errors should
not be ignored
+ */
+ private static void handleTableCreationFailure(Exception ex, boolean
ignoreIncompatible)
+ throws DorisSystemException {
+ if (ignoreIncompatible && ex.getCause() instanceof
SQLSyntaxErrorException) {
+ LOG.warn(
+ "Doris schema and source table schema are not compatible.
Error: {} ",
+ ex.getCause().toString());
+ } else {
+ throw new DorisSystemException("Failed to create table due to: ",
ex);
+ }
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/utils/JsonNodeExtractUtil.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/utils/JsonNodeExtractUtil.java
new file mode 100644
index 00000000..93aeca0b
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/utils/JsonNodeExtractUtil.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.utils;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.ID_FIELD;
+import static
org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.OID_FIELD;
+
+/** Utility class for extracting data from JSON nodes */
+public class JsonNodeExtractUtil {
+ private static final Logger LOG =
LoggerFactory.getLogger(JsonNodeExtractUtil.class);
+
+ /**
+ * Extract row data from JsonNode and convert to Map
+ *
+ * @param recordRow JsonNode containing row data
+ * @param objectMapper ObjectMapper instance for JSON conversion
+ * @return Map containing extracted row data
+ */
+ public static Map<String, Object> extractRow(JsonNode recordRow,
ObjectMapper objectMapper) {
+ Map<String, Object> recordMap =
+ objectMapper.convertValue(recordRow, new
TypeReference<Map<String, Object>>() {});
+ return recordMap != null ? recordMap : new HashMap<>();
+ }
+
+ public static Map<String, Object> extractAfterRow(
+ JsonNode recordRoot, ObjectMapper objectMapper) {
+ Map<String, Object> rowMap =
JsonNodeExtractUtil.extractRow(recordRoot, objectMapper);
+ String objectId;
+ // if user specifies the `_id` field manually, the $oid field may not
exist
+ if (rowMap.get(ID_FIELD) instanceof Map<?, ?>) {
+ objectId = ((Map<?, ?>)
rowMap.get(ID_FIELD)).get(OID_FIELD).toString();
+ } else {
+ objectId = rowMap.get(ID_FIELD).toString();
+ }
+ rowMap.put(ID_FIELD, objectId);
+ return rowMap;
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBCreateTableTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBCreateTableTest.java
new file mode 100644
index 00000000..b395d3a7
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBCreateTableTest.java
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.mongodb;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.doris.flink.catalog.doris.DorisSystem;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.sink.schema.SchemaChangeManager;
+import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext;
+import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.TestJsonDebeziumChangeBase;
+import org.apache.doris.flink.tools.cdc.DorisTableConfig;
+import org.apache.doris.flink.tools.cdc.converter.TableNameConverter;
+import
org.apache.doris.flink.tools.cdc.mongodb.serializer.MongoJsonDebeziumSchemaChange;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MongoDBCreateTableTest extends TestJsonDebeziumChangeBase {
+
+ private MongoJsonDebeziumSchemaChange schemaChange;
+
+ @Rule public MockitoRule mockitoRule = MockitoJUnit.rule();
+
+ @Mock private DorisSystem mockDorisSystem;
+
+ @Mock private DorisOptions mockDorisOptions;
+
+ @Mock private SchemaChangeManager mockSchemaManager;
+
+ private final String dbName = "test_db";
+ private final String prefix = "ods_";
+ private final String suffix = "_dt";
+
+ @Before
+ public void setUp() {
+ super.setUp();
+
+ Map<String, String> tableConfig = new HashMap<>();
+ tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1");
+ tableConfig.put(DorisTableConfig.TABLE_BUCKETS, ".*:1");
+
+ JsonDebeziumChangeContext changeContext =
+ new JsonDebeziumChangeContext(
+ mockDorisOptions,
+ tableMapping,
+ null,
+ dbName,
+ new DorisTableConfig(tableConfig),
+ objectMapper,
+ null,
+ lineDelimiter,
+ ignoreUpdateBefore,
+ prefix,
+ suffix,
+ true,
+ new TableNameConverter(prefix, suffix));
+ schemaChange = new MongoJsonDebeziumSchemaChange(changeContext);
+ }
+
+ @Test
+ public void testAutoCreateTable() throws IOException {
+ String newTableName = "test_table";
+ String record =
+ "{"
+ + "\"_id\":\"{\\\"_id\\\": {\\\"_id\\\":
{\\\"$oid\\\": \\\"67d2d13807fe0c4336070cfd\\\"}}}\","
+ + "\"operationType\":\"insert\","
+ + "\"fullDocument\":\"{\\\"_id\\\": {\\\"$oid\\\":
\\\"67d2d13807fe0c4336070cfd\\\"}, \\\"name\\\": \\\"John Doe\\\", \\\"age\\\":
30, \\\"city\\\": \\\"New York\\\"}\","
+ + "\"fullDocumentBeforeChange\":null,"
+ +
"\"source\":{\"ts_ms\":1741869368000,\"snapshot\":\"false\"},"
+ + "\"ts_ms\":1741869368365,"
+ + "\"ns\":{\"db\":\"testDB\",\"coll\":\""
+ + newTableName
+ + "\"},"
+ + "\"to\":null,"
+ + "\"documentKey\":\"{\\\"_id\\\": {\\\"$oid\\\":
\\\"67d2d13807fe0c4336070cfd\\\"}}\","
+ + "\"updateDescription\":null,"
+ + "\"clusterTime\":\"{\\\"$timestamp\\\": {\\\"t\\\":
1741869368, \\\"i\\\": 2}}\","
+ + "\"txnNumber\":null,"
+ + "\"lsid\":null"
+ + "}";
+ schemaChange.setSchemaChangeManager(mockSchemaManager);
+ schemaChange.setDorisSystem(mockDorisSystem);
+
+ JsonNode recordRoot = objectMapper.readTree(record);
+ boolean result = schemaChange.schemaChange(recordRoot);
+ Assert.assertTrue(
+ tableMapping.containsValue(
+ dbName
+ + "."
+ + new TableNameConverter(prefix,
suffix).convert(newTableName)));
+ Assert.assertTrue(result);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]