This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new db8cda9  schema change triple varchar type length (#135)
db8cda9 is described below

commit db8cda9557660ded9e121aa1e6b0defbcee3d2cb
Author: gnehil <[email protected]>
AuthorDate: Tue Apr 25 23:20:51 2023 +0800

    schema change triple varchar type length (#135)
    
    * schema change varchar type length triple
    * varchar length less than 65533
---
 .../sink/writer/JsonDebeziumSchemaSerializer.java  | 47 ++++++++++++++++------
 .../writer/TestJsonDebeziumSchemaSerializer.java   |  9 ++++-
 2 files changed, 42 insertions(+), 14 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
index 8458091..c7295e2 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
@@ -95,18 +95,20 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
             return null;
         }
         Map<String, String> valueMap;
-        if (OP_READ.equals(op) || OP_CREATE.equals(op)) {
-            valueMap = extractAfterRow(recordRoot);
-            addDeleteSign(valueMap,false);
-        } else if (OP_UPDATE.equals(op)) {
-            valueMap = extractAfterRow(recordRoot);
-            addDeleteSign(valueMap,false);
-        } else if (OP_DELETE.equals(op)) {
-            valueMap = extractBeforeRow(recordRoot);
-            addDeleteSign(valueMap,true);
-        } else {
-            LOG.error("parse record fail, unknown op {} in {}",op,record);
-            return null;
+        switch (op) {
+            case OP_READ:
+            case OP_CREATE:
+            case OP_UPDATE:
+                valueMap = extractAfterRow(recordRoot);
+                addDeleteSign(valueMap, false);
+                break;
+            case OP_DELETE:
+                valueMap = extractBeforeRow(recordRoot);
+                addDeleteSign(valueMap, true);
+                break;
+            default:
+                LOG.error("parse record fail, unknown op {} in {}", op, 
record);
+                return null;
         }
         return 
objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8);
     }
@@ -262,7 +264,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
                 String op = matcher.group(1);
                 String col = matcher.group(3);
                 String type = matcher.group(5);
-                type = type == null ? "" : type;
+                type = handleType(type);
                 ddl = String.format(EXECUTE_DDL, 
dorisOptions.getTableIdentifier(), op, col, type);
                 LOG.info("parse ddl:{}", ddl);
                 return ddl;
@@ -306,4 +308,23 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
             return new JsonDebeziumSchemaSerializer(dorisOptions, 
addDropDDLPattern, sourceTableName);
         }
     }
+
+    private String handleType(String type) {
+
+        if (type == null || "".equals(type)) {
+            return "";
+        }
+
+        // varchar len * 3
+        Pattern pattern = Pattern.compile("varchar\\(([1-9][0-9]*)\\)", 
Pattern.CASE_INSENSITIVE);
+        Matcher matcher = pattern.matcher(type);
+        if (matcher.find()) {
+            String len = matcher.group(1);
+            return String.format("varchar(%d)", Math.min(Integer.parseInt(len) 
* 3, 65533));
+        }
+
+        return type;
+
+    }
+
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
index ed5c37f..de5a4de 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
@@ -100,11 +100,18 @@ public class TestJsonDebeziumSchemaSerializer {
 
     @Test
     public void testExtractDDL() throws IOException {
-        String srcDDL = "ALTER TABLE test.t1 add COLUMN c_1 varchar(200)";
+        String srcDDL = "ALTER TABLE test.t1 add COLUMN c_1 varchar(600)";
         String record = 
"{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\
 [...]
         JsonNode recordRoot = objectMapper.readTree(record);
         String ddl = serializer.extractDDL(recordRoot);
         Assert.assertEquals(srcDDL, ddl);
+
+        String targetDDL = "ALTER TABLE test.t1 add COLUMN c_1 varchar(65533)";
+        String record1 = 
"{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,
 [...]
+        JsonNode recordRoot1 = objectMapper.readTree(record1);
+        String ddl1 = serializer.extractDDL(recordRoot1);
+        Assert.assertEquals(targetDDL, ddl1);
+
     }
 
     @Ignore


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to