This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 78ed055  [Improve](schemaChange)schema change support rename column 
(#206)
78ed055 is described below

commit 78ed055fd2e7f7657dadfb1d3f4f4dd9e9caadf7
Author: DongLiang-0 <46414265+donglian...@users.noreply.github.com>
AuthorDate: Fri Oct 27 16:02:32 2023 +0800

    [Improve](schemaChange)schema change support rename column (#206)
---
 .../sink/writer/JsonDebeziumSchemaSerializer.java  | 20 ++++++-
 .../flink/sink/writer/SchemaChangeHelper.java      | 23 +++++++-
 .../flink/sink/writer/SchemaChangeHelperTest.java  | 65 ++++++++++++++++++++++
 .../writer/TestJsonDebeziumSchemaSerializer.java   | 25 +++++++++
 4 files changed, 130 insertions(+), 3 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
index fd3c92a..bf7b81f 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
@@ -82,6 +82,8 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
     public static final String EXECUTE_DDL = "ALTER TABLE %s %s COLUMN %s %s"; 
// alter table tbl add cloumn aca int
     private static final String addDropDDLRegex
             = 
"ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
+    private static final Pattern renameDDLPattern = Pattern.compile(
+            
"ALTER\\s+TABLE\\s+(\\w+)\\s+RENAME\\s+COLUMN\\s+(\\w+)\\s+TO\\s+(\\w+)", 
Pattern.CASE_INSENSITIVE);
     private final Pattern addDropDDLPattern;
     private DorisOptions dorisOptions;
     private ObjectMapper objectMapper = new ObjectMapper();
@@ -249,12 +251,24 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
             sourceConnector = 
SourceConnector.valueOf(record.get("source").get("connector").asText().toUpperCase());
             fillOriginSchema(columns);
         }
+
+        // rename ddl
+        Matcher renameMatcher = renameDDLPattern.matcher(ddl);
+        if (renameMatcher.find()) {
+            String oldColumnName = renameMatcher.group(2);
+            String newColumnName = renameMatcher.group(3);
+            return SchemaChangeHelper.generateRenameDDLSql(
+                    dorisOptions.getTableIdentifier(), oldColumnName, 
newColumnName, originFieldSchemaMap);
+        }
+
+        // add/drop ddl
         Map<String, FieldSchema> updateFiledSchema = new LinkedHashMap<>();
         for (JsonNode column : columns) {
             buildFieldSchema(updateFiledSchema, column);
         }
         SchemaChangeHelper.compareSchema(updateFiledSchema, 
originFieldSchemaMap);
-        // In order to avoid operations such as rename or change, which may 
lead to the accidental deletion of the doris column.
+        // In order to avoid other source table column change operations other 
than add/drop/rename,
+        // which may lead to the accidental deletion of the doris column.
         Matcher matcher = addDropDDLPattern.matcher(ddl);
         if (!matcher.find()) {
             return null;
@@ -262,6 +276,10 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         return 
SchemaChangeHelper.generateDDLSql(dorisOptions.getTableIdentifier());
     }
 
+    @VisibleForTesting
+    public void setOriginFieldSchemaMap(Map<String, FieldSchema> 
originFieldSchemaMap) {
+        this.originFieldSchemaMap = originFieldSchemaMap;
+    }
     @VisibleForTesting
     public boolean schemaChange(JsonNode recordRoot) {
         boolean status = false;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java
index dc8d83b..8e6307b 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java
@@ -32,8 +32,9 @@ public class SchemaChangeHelper {
     private static final List<FieldSchema> addFieldSchemas = 
Lists.newArrayList();
     // Used to determine whether the doris table supports ddl
     private static final List<DDLSchema> ddlSchemas = Lists.newArrayList();
-    public static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s";
-    public static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s";
+    private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s";
+    private static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s";
+    private static final String RENAME_DDL = "ALTER TABLE %s RENAME COLUMN %s 
%s";
 
     public static void compareSchema(Map<String, FieldSchema> 
updateFiledSchemaMap,
             Map<String, FieldSchema> originFieldSchemaMap) {
@@ -57,6 +58,24 @@ public class SchemaChangeHelper {
         }
     }
 
+    public static List<String> generateRenameDDLSql(String table, String 
oldColumnName, String newColumnName,
+            Map<String, FieldSchema> originFieldSchemaMap) {
+        ddlSchemas.clear();
+        List<String> ddlList = Lists.newArrayList();
+        FieldSchema fieldSchema = null;
+        for (Entry<String, FieldSchema> originFieldSchema : 
originFieldSchemaMap.entrySet()) {
+            if (originFieldSchema.getKey().equals(oldColumnName)) {
+                fieldSchema = originFieldSchema.getValue();
+                String renameSQL = String.format(RENAME_DDL, table, 
oldColumnName, newColumnName);
+                ddlList.add(renameSQL);
+                ddlSchemas.add(new DDLSchema(oldColumnName, false));
+            }
+        }
+        originFieldSchemaMap.remove(oldColumnName);
+        originFieldSchemaMap.put(newColumnName, fieldSchema);
+        return ddlList;
+    }
+
     public static List<String> generateDDLSql(String table) {
         ddlSchemas.clear();
         List<String> ddlList = Lists.newArrayList();
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/SchemaChangeHelperTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/SchemaChangeHelperTest.java
new file mode 100644
index 0000000..62906df
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/SchemaChangeHelperTest.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer;
+
+import org.apache.doris.flink.catalog.doris.FieldSchema;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+
+public class SchemaChangeHelperTest {
+
+    private final Map<String, FieldSchema> originFieldSchemaMap = 
Maps.newHashMap();
+    private final Map<String, FieldSchema> updateFieldSchemaMap = 
Maps.newHashMap();
+
+    @Before
+    public void setUp() {
+        originFieldSchemaMap.put("id", new FieldSchema("id", "INT", "", ""));
+        originFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", "", ""));
+        originFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)", 
"", ""));
+
+        updateFieldSchemaMap.put("id", new FieldSchema("id", "INT", "", ""));
+        updateFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", "", ""));
+        updateFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)", 
"", ""));
+        updateFieldSchemaMap.put("c4", new FieldSchema("c4", "BIGINT", "", 
""));
+        updateFieldSchemaMap.put("c5", new FieldSchema("c5", "DATETIMEV2(0)", 
"", ""));
+    }
+
+    @Test
+    public void testGenerateRenameDDLSql() {
+        String table = "test.test_sink";
+        String oldColumnName = "c3";
+        String newColumnName = "c33";
+        List<String> ddlSqls = SchemaChangeHelper.generateRenameDDLSql(table, 
oldColumnName, newColumnName,
+                originFieldSchemaMap);
+        Assert.assertEquals(ddlSqls.get(0), "ALTER TABLE test.test_sink RENAME 
COLUMN c3 c33");
+    }
+
+    @Test
+    public void testGenerateDDLSql() {
+        SchemaChangeHelper.compareSchema(updateFieldSchemaMap, 
originFieldSchemaMap);
+        List<String> ddlSqls = 
SchemaChangeHelper.generateDDLSql("test.test_sink");
+        Assert.assertEquals(ddlSqls.get(0), "ALTER TABLE test.test_sink ADD 
COLUMN c4 BIGINT");
+        Assert.assertEquals(ddlSqls.get(1), "ALTER TABLE test.test_sink ADD 
COLUMN c5 DATETIMEV2(0)");
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
index c4bdbc0..59bfe44 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
@@ -31,6 +31,7 @@ import org.apache.doris.flink.rest.models.Field;
 import org.apache.doris.flink.rest.models.Schema;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Maps;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -286,6 +287,30 @@ public class TestJsonDebeziumSchemaSerializer {
         Assert.assertEquals(dorisTypeName, "VARCHAR(384)");
     }
 
+    @Test
+    public void testExtractDDLListRename() throws IOException {
+        String columnInfo
+                = 
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1698314781975,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000046\",\"pos\":5197,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000046\\\",\\\"pos\\\":5197,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_se
 [...]
+        Map<String, FieldSchema> originFieldSchemaMap = Maps.newHashMap();
+        JsonNode record = objectMapper.readTree(columnInfo);
+
+        DorisOptions dorisOptions = 
DorisOptions.builder().setFenodes("127.0.0.1:8030")
+                .setTableIdentifier("test.t1")
+                .setUsername("root")
+                .setPassword("").build();
+        JsonDebeziumSchemaSerializer serializer = 
JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions)
+                .build();
+        serializer.setSourceConnector("mysql");
+
+        originFieldSchemaMap.put("id", new FieldSchema("id", "INT", "", ""));
+        originFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", "", ""));
+        originFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)", 
"", ""));
+        serializer.setOriginFieldSchemaMap(originFieldSchemaMap);
+
+        List<String> ddlList = serializer.extractDDLList(record);
+        Assert.assertEquals("ALTER TABLE test.t1 RENAME COLUMN c3 c333", 
ddlList.get(0));
+    }
+
     @Ignore
     @Test
     public void testSerializeAddColumn() throws IOException, DorisException {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to