This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 43f9e014 [Fix](cdc) fix enable-delete option not work (#455)
43f9e014 is described below
commit 43f9e014c4742949e9625106252a375a1e51b82c
Author: Petrichor <[email protected]>
AuthorDate: Mon Aug 5 18:19:53 2024 +0800
[Fix](cdc) fix enable-delete option not work (#455)
---
.../serializer/JsonDebeziumSchemaSerializer.java | 5 +-
.../jsondebezium/JsonDebeziumChangeContext.java | 9 +-
.../jsondebezium/JsonDebeziumDataChange.java | 4 +-
.../MongoDBJsonDebeziumSchemaSerializer.java | 5 +-
.../serializer/MongoJsonDebeziumDataChange.java | 4 +-
.../jsondebezium/TestJsonDebeziumDataChange.java | 6 +-
.../TestJsonDebeziumSchemaChangeImpl.java | 3 +-
.../TestJsonDebeziumSchemaChangeImplV2.java | 3 +-
.../jsondebezium/TestSQLParserSchemaChange.java | 3 +-
.../doris/flink/tools/cdc/MySQLDorisE2ECase.java | 116 +++++++++++++++++++++
10 files changed, 148 insertions(+), 10 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index 83dec58b..9c89fce3 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -70,6 +70,7 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
private final boolean newSchemaChange;
private String lineDelimiter = LINE_DELIMITER_DEFAULT;
private boolean ignoreUpdateBefore = true;
+ private boolean enableDelete = true;
// <cdc db.schema.table, doris db.table>
private Map<String, String> tableMapping;
// create table properties
@@ -111,6 +112,7 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
.getStreamLoadProp()
.getProperty(LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT);
this.ignoreUpdateBefore = executionOptions.getIgnoreUpdateBefore();
+ this.enableDelete = executionOptions.getDeletable();
}
}
@@ -149,7 +151,8 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
lineDelimiter,
ignoreUpdateBefore,
targetTablePrefix,
- targetTableSuffix);
+ targetTableSuffix,
+ enableDelete);
initSchemaChangeInstance(changeContext);
this.dataChange = new JsonDebeziumDataChange(changeContext);
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
index d1326c72..2f7764f3 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
@@ -41,6 +41,7 @@ public class JsonDebeziumChangeContext implements
Serializable {
private final Pattern pattern;
private final String lineDelimiter;
private final boolean ignoreUpdateBefore;
+ private final boolean enableDelete;
private final String targetTablePrefix;
private final String targetTableSuffix;
@@ -55,7 +56,8 @@ public class JsonDebeziumChangeContext implements
Serializable {
String lineDelimiter,
boolean ignoreUpdateBefore,
String targetTablePrefix,
- String targetTableSuffix) {
+ String targetTableSuffix,
+ boolean enableDelete) {
this.dorisOptions = dorisOptions;
this.tableMapping = tableMapping;
this.sourceTableName = sourceTableName;
@@ -65,6 +67,7 @@ public class JsonDebeziumChangeContext implements
Serializable {
this.pattern = pattern;
this.lineDelimiter = lineDelimiter;
this.ignoreUpdateBefore = ignoreUpdateBefore;
+ this.enableDelete = enableDelete;
this.targetTablePrefix = targetTablePrefix;
this.targetTableSuffix = targetTableSuffix;
}
@@ -116,6 +119,10 @@ public class JsonDebeziumChangeContext implements
Serializable {
return targetTableSuffix;
}
+ public boolean enableDelete() {
+ return enableDelete;
+ }
+
public DorisTableConfig getDorisTableConf() {
return dorisTableConfig;
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
index 5075adf8..298cfb95 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
@@ -50,6 +50,7 @@ public class JsonDebeziumDataChange extends CdcDataChange {
private final ObjectMapper objectMapper;
private final DorisOptions dorisOptions;
private final boolean ignoreUpdateBefore;
+ private final boolean enableDelete;
private final String lineDelimiter;
private final JsonDebeziumChangeContext changeContext;
@@ -59,6 +60,7 @@ public class JsonDebeziumDataChange extends CdcDataChange {
this.objectMapper = changeContext.getObjectMapper();
this.ignoreUpdateBefore = changeContext.isIgnoreUpdateBefore();
this.lineDelimiter = changeContext.getLineDelimiter();
+ this.enableDelete = changeContext.enableDelete();
}
public DorisRecord serialize(String record, JsonNode recordRoot, String
op) throws IOException {
@@ -87,7 +89,7 @@ public class JsonDebeziumDataChange extends CdcDataChange {
return DorisRecord.of(dorisTableIdentifier,
extractUpdate(recordRoot));
case OP_DELETE:
valueMap = extractBeforeRow(recordRoot);
- addDeleteSign(valueMap, true);
+ addDeleteSign(valueMap, enableDelete);
break;
default:
LOG.error("parse record fail, unknown op {} in {}", op,
record);
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
index b7faec2f..d4a87ff8 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
@@ -50,6 +50,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements
DorisRecordSerialize
private final String sourceTableName;
private String lineDelimiter = LINE_DELIMITER_DEFAULT;
private boolean ignoreUpdateBefore = true;
+ private boolean enableDelete = true;
// <cdc db.schema.table, doris db.table>
private Map<String, String> tableMapping;
// create table properties
@@ -90,6 +91,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements
DorisRecordSerialize
.getStreamLoadProp()
.getProperty(LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT);
this.ignoreUpdateBefore = executionOptions.getIgnoreUpdateBefore();
+ this.enableDelete = executionOptions.getDeletable();
}
init();
}
@@ -107,7 +109,8 @@ public class MongoDBJsonDebeziumSchemaSerializer implements
DorisRecordSerialize
lineDelimiter,
ignoreUpdateBefore,
targetTablePrefix,
- targetTableSuffix);
+ targetTableSuffix,
+ enableDelete);
this.dataChange = new MongoJsonDebeziumDataChange(changeContext);
this.schemaChange = new MongoJsonDebeziumSchemaChange(changeContext);
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java
index 8048e38a..9dbe7ffe 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java
@@ -61,6 +61,7 @@ public class MongoJsonDebeziumDataChange extends
CdcDataChange implements Change
public JsonDebeziumChangeContext changeContext;
public ObjectMapper objectMapper;
public Map<String, String> tableMapping;
+ private final boolean enableDelete;
public MongoJsonDebeziumDataChange(JsonDebeziumChangeContext
changeContext) {
this.changeContext = changeContext;
@@ -68,6 +69,7 @@ public class MongoJsonDebeziumDataChange extends
CdcDataChange implements Change
this.objectMapper = changeContext.getObjectMapper();
this.lineDelimiter = changeContext.getLineDelimiter();
this.tableMapping = changeContext.getTableMapping();
+ this.enableDelete = changeContext.enableDelete();
}
@Override
@@ -93,7 +95,7 @@ public class MongoJsonDebeziumDataChange extends
CdcDataChange implements Change
break;
case OP_DELETE:
valueMap = extractDeleteRow(recordRoot);
- addDeleteSign(valueMap, true);
+ addDeleteSign(valueMap, enableDelete);
break;
default:
LOG.error("parse record fail, unknown op {} in {}", op,
record);
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
index 4891d820..f8098ccc 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java
@@ -50,7 +50,8 @@ public class TestJsonDebeziumDataChange extends
TestJsonDebeziumChangeBase {
lineDelimiter,
ignoreUpdateBefore,
"",
- "");
+ "",
+ true);
dataChange = new JsonDebeziumDataChange(changeContext);
}
@@ -113,7 +114,8 @@ public class TestJsonDebeziumDataChange extends
TestJsonDebeziumChangeBase {
lineDelimiter,
false,
"",
- "");
+ "",
+ true);
dataChange = new JsonDebeziumDataChange(changeContext);
// update t1 set name='doris-update' WHERE id =1;
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java
index caf5542c..e66ecaab 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java
@@ -63,7 +63,8 @@ public class TestJsonDebeziumSchemaChangeImpl extends
TestJsonDebeziumChangeBase
lineDelimiter,
ignoreUpdateBefore,
"",
- "");
+ "",
+ true);
schemaChange = new JsonDebeziumSchemaChangeImpl(changeContext);
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
index 68239618..3eec0fb6 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
@@ -83,7 +83,8 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends
TestJsonDebeziumChangeBa
lineDelimiter,
ignoreUpdateBefore,
"",
- "");
+ "",
+ true);
schemaChange = new JsonDebeziumSchemaChangeImplV2(changeContext);
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
index d3194f81..a7958b70 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
@@ -46,7 +46,8 @@ public class TestSQLParserSchemaChange extends
TestJsonDebeziumChangeBase {
lineDelimiter,
ignoreUpdateBefore,
"",
- "");
+ "",
+ true);
schemaChange = new SQLParserSchemaChange(changeContext);
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
index 6a613841..aeb17c29 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
@@ -503,6 +503,122 @@ public class MySQLDorisE2ECase extends DorisTestBase {
jobClient.cancel().get();
}
+ @Test
+ public void testMySQL2DorisEnableDelete() throws Exception {
+ printClusterStatus();
+ initializeMySQLTable();
+ initializeDorisTable();
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ Map<String, String> flinkMap = new HashMap<>();
+ flinkMap.put("execution.checkpointing.interval", "10s");
+ flinkMap.put("pipeline.operator-chaining", "false");
+ flinkMap.put("parallelism.default", "1");
+
+ Configuration configuration = Configuration.fromMap(flinkMap);
+ env.configure(configuration);
+
+ String database = DATABASE;
+ Map<String, String> mysqlConfig = new HashMap<>();
+ mysqlConfig.put("database-name", DATABASE);
+ mysqlConfig.put("hostname", MYSQL_CONTAINER.getHost());
+ mysqlConfig.put("port", MYSQL_CONTAINER.getMappedPort(3306) + "");
+ mysqlConfig.put("username", MYSQL_USER);
+ mysqlConfig.put("password", MYSQL_PASSWD);
+ mysqlConfig.put("server-time-zone", "Asia/Shanghai");
+ Configuration config = Configuration.fromMap(mysqlConfig);
+
+ Map<String, String> sinkConfig = new HashMap<>();
+ sinkConfig.put("fenodes", getFenodes());
+ sinkConfig.put("username", USERNAME);
+ sinkConfig.put("password", PASSWORD);
+ sinkConfig.put("jdbc-url", String.format(DorisTestBase.URL,
DORIS_CONTAINER.getHost()));
+ sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
+ sinkConfig.put("sink.check-interval", "5000");
+ sinkConfig.put("sink.enable-delete", "false");
+ Configuration sinkConf = Configuration.fromMap(sinkConfig);
+
+ Map<String, String> tableConfig = new HashMap<>();
+ tableConfig.put("replication_num", "1");
+
+ String includingTables = "tbl1|tbl2|tbl3|tbl5";
+ String excludingTables = "";
+ DatabaseSync databaseSync = new MysqlDatabaseSync();
+ databaseSync
+ .setEnv(env)
+ .setDatabase(database)
+ .setConfig(config)
+ .setIncludingTables(includingTables)
+ .setExcludingTables(excludingTables)
+ .setIgnoreDefaultValue(false)
+ .setSinkConfig(sinkConf)
+ .setTableConfig(tableConfig)
+ .setCreateTableOnly(false)
+ .setNewSchemaChange(true)
+ // no single sink
+ .setSingleSink(false)
+ .create();
+ databaseSync.build();
+ JobClient jobClient = env.executeAsync();
+ waitForJobStatus(
+ jobClient,
+ Collections.singletonList(RUNNING),
+ Deadline.fromNow(Duration.ofSeconds(10)));
+
+ // wait 2 times checkpoint
+ Thread.sleep(20000);
+ List<String> expected = Arrays.asList("doris_1,1", "doris_2,2",
"doris_3,3", "doris_5,5");
+ String sql =
+ "select * from ( select * from %s.%s union all select * from
%s.%s union all select * from %s.%s union all select * from %s.%s) res order by
1";
+ String query1 =
+ String.format(
+ sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE,
TABLE_3, DATABASE,
+ TABLE_5);
+ checkResult(expected, query1, 2);
+
+ // add incremental data
+ try (Connection connection =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER,
MYSQL_PASSWD);
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format("insert into %s.%s values
('doris_1_1',10)", DATABASE, TABLE_1));
+ statement.execute(
+ String.format("insert into %s.%s values
('doris_2_1',11)", DATABASE, TABLE_2));
+ statement.execute(
+ String.format("insert into %s.%s values
('doris_3_1',12)", DATABASE, TABLE_3));
+
+ statement.execute(
+ String.format(
+ "update %s.%s set age=18 where name='doris_1'",
DATABASE, TABLE_1));
+ statement.execute(
+ String.format("delete from %s.%s where name='doris_2'",
DATABASE, TABLE_2));
+ statement.execute(
+ String.format("delete from %s.%s where name='doris_3'",
DATABASE, TABLE_3));
+ statement.execute(
+ String.format("delete from %s.%s where name='doris_5'",
DATABASE, TABLE_5));
+ }
+
+ Thread.sleep(20000);
+ List<String> expected2 =
+ Arrays.asList(
+ "doris_1,18",
+ "doris_1_1,10",
+ "doris_2,2",
+ "doris_2_1,11",
+ "doris_3,3",
+ "doris_3_1,12",
+ "doris_5,5");
+ sql =
+ "select * from ( select * from %s.%s union all select * from
%s.%s union all select * from %s.%s union all select * from %s.%s) res order by
1";
+ String query2 =
+ String.format(
+ sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE,
TABLE_3, DATABASE,
+ TABLE_5);
+ checkResult(expected2, query2, 2);
+ jobClient.cancel().get();
+ }
+
private void initializeDorisTable() throws Exception {
try (Connection connection =
DriverManager.getConnection(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]