This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 78ed055 [Improve](schemaChange)schema change support rename column (#206) 78ed055 is described below commit 78ed055fd2e7f7657dadfb1d3f4f4dd9e9caadf7 Author: DongLiang-0 <46414265+donglian...@users.noreply.github.com> AuthorDate: Fri Oct 27 16:02:32 2023 +0800 [Improve](schemaChange)schema change support rename column (#206) --- .../sink/writer/JsonDebeziumSchemaSerializer.java | 20 ++++++- .../flink/sink/writer/SchemaChangeHelper.java | 23 +++++++- .../flink/sink/writer/SchemaChangeHelperTest.java | 65 ++++++++++++++++++++++ .../writer/TestJsonDebeziumSchemaSerializer.java | 25 +++++++++ 4 files changed, 130 insertions(+), 3 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java index fd3c92a..bf7b81f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java @@ -82,6 +82,8 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin public static final String EXECUTE_DDL = "ALTER TABLE %s %s COLUMN %s %s"; // alter table tbl add cloumn aca int private static final String addDropDDLRegex = "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*"; + private static final Pattern renameDDLPattern = Pattern.compile( + "ALTER\\s+TABLE\\s+(\\w+)\\s+RENAME\\s+COLUMN\\s+(\\w+)\\s+TO\\s+(\\w+)", Pattern.CASE_INSENSITIVE); private final Pattern addDropDDLPattern; private DorisOptions dorisOptions; private ObjectMapper objectMapper = new ObjectMapper(); @@ -249,12 +251,24 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin sourceConnector = SourceConnector.valueOf(record.get("source").get("connector").asText().toUpperCase()); fillOriginSchema(columns); } + + // rename ddl + Matcher renameMatcher = renameDDLPattern.matcher(ddl); + if (renameMatcher.find()) { + String oldColumnName = renameMatcher.group(2); + String newColumnName = renameMatcher.group(3); + return SchemaChangeHelper.generateRenameDDLSql( + dorisOptions.getTableIdentifier(), oldColumnName, newColumnName, originFieldSchemaMap); + } + + // add/drop ddl Map<String, FieldSchema> updateFiledSchema = new LinkedHashMap<>(); for (JsonNode column : columns) { buildFieldSchema(updateFiledSchema, column); } SchemaChangeHelper.compareSchema(updateFiledSchema, originFieldSchemaMap); - // In order to avoid operations such as rename or change, which may lead to the accidental deletion of the doris column. + // In order to avoid other source table column change operations other than add/drop/rename, + // which may lead to the accidental deletion of the doris column. Matcher matcher = addDropDDLPattern.matcher(ddl); if (!matcher.find()) { return null; @@ -262,6 +276,10 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin return SchemaChangeHelper.generateDDLSql(dorisOptions.getTableIdentifier()); } + @VisibleForTesting + public void setOriginFieldSchemaMap(Map<String, FieldSchema> originFieldSchemaMap) { + this.originFieldSchemaMap = originFieldSchemaMap; + } @VisibleForTesting public boolean schemaChange(JsonNode recordRoot) { boolean status = false; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java index dc8d83b..8e6307b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java @@ -32,8 +32,9 @@ public class SchemaChangeHelper { private static final List<FieldSchema> addFieldSchemas = Lists.newArrayList(); // Used to determine whether the doris table supports ddl private static final List<DDLSchema> ddlSchemas = Lists.newArrayList(); - public static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s"; - public static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s"; + private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s"; + private static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s"; + private static final String RENAME_DDL = "ALTER TABLE %s RENAME COLUMN %s %s"; public static void compareSchema(Map<String, FieldSchema> updateFiledSchemaMap, Map<String, FieldSchema> originFieldSchemaMap) { @@ -57,6 +58,24 @@ public class SchemaChangeHelper { } } + public static List<String> generateRenameDDLSql(String table, String oldColumnName, String newColumnName, + Map<String, FieldSchema> originFieldSchemaMap) { + ddlSchemas.clear(); + List<String> ddlList = Lists.newArrayList(); + FieldSchema fieldSchema = null; + for (Entry<String, FieldSchema> originFieldSchema : originFieldSchemaMap.entrySet()) { + if (originFieldSchema.getKey().equals(oldColumnName)) { + fieldSchema = originFieldSchema.getValue(); + String renameSQL = String.format(RENAME_DDL, table, oldColumnName, newColumnName); + ddlList.add(renameSQL); + ddlSchemas.add(new DDLSchema(oldColumnName, false)); + } + } + originFieldSchemaMap.remove(oldColumnName); + originFieldSchemaMap.put(newColumnName, fieldSchema); + return ddlList; + } + public static List<String> generateDDLSql(String table) { ddlSchemas.clear(); List<String> ddlList = Lists.newArrayList(); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/SchemaChangeHelperTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/SchemaChangeHelperTest.java new file mode 100644 index 0000000..62906df --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/SchemaChangeHelperTest.java @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.writer; + +import org.apache.doris.flink.catalog.doris.FieldSchema; + +import org.apache.flink.shaded.guava30.com.google.common.collect.Maps; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +public class SchemaChangeHelperTest { + + private final Map<String, FieldSchema> originFieldSchemaMap = Maps.newHashMap(); + private final Map<String, FieldSchema> updateFieldSchemaMap = Maps.newHashMap(); + + @Before + public void setUp() { + originFieldSchemaMap.put("id", new FieldSchema("id", "INT", "", "")); + originFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", "", "")); + originFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)", "", "")); + + updateFieldSchemaMap.put("id", new FieldSchema("id", "INT", "", "")); + updateFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", "", "")); + updateFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)", "", "")); + updateFieldSchemaMap.put("c4", new FieldSchema("c4", "BIGINT", "", "")); + updateFieldSchemaMap.put("c5", new FieldSchema("c5", "DATETIMEV2(0)", "", "")); + } + + @Test + public void testGenerateRenameDDLSql() { + String table = "test.test_sink"; + String oldColumnName = "c3"; + String newColumnName = "c33"; + List<String> ddlSqls = SchemaChangeHelper.generateRenameDDLSql(table, oldColumnName, newColumnName, + originFieldSchemaMap); + Assert.assertEquals(ddlSqls.get(0), "ALTER TABLE test.test_sink RENAME COLUMN c3 c33"); + } + + @Test + public void testGenerateDDLSql() { + SchemaChangeHelper.compareSchema(updateFieldSchemaMap, originFieldSchemaMap); + List<String> ddlSqls = SchemaChangeHelper.generateDDLSql("test.test_sink"); + Assert.assertEquals(ddlSqls.get(0), "ALTER TABLE test.test_sink ADD COLUMN c4 BIGINT"); + Assert.assertEquals(ddlSqls.get(1), "ALTER TABLE test.test_sink ADD COLUMN c5 DATETIMEV2(0)"); + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java index c4bdbc0..59bfe44 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java @@ -31,6 +31,7 @@ import org.apache.doris.flink.rest.models.Field; import org.apache.doris.flink.rest.models.Schema; import org.apache.commons.collections.CollectionUtils; +import org.apache.flink.shaded.guava30.com.google.common.collect.Maps; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Ignore; @@ -286,6 +287,30 @@ public class TestJsonDebeziumSchemaSerializer { Assert.assertEquals(dorisTypeName, "VARCHAR(384)"); } + @Test + public void testExtractDDLListRename() throws IOException { + String columnInfo + = "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1698314781975,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000046\",\"pos\":5197,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000046\\\",\\\"pos\\\":5197,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_se [...] + Map<String, FieldSchema> originFieldSchemaMap = Maps.newHashMap(); + JsonNode record = objectMapper.readTree(columnInfo); + + DorisOptions dorisOptions = DorisOptions.builder().setFenodes("127.0.0.1:8030") + .setTableIdentifier("test.t1") + .setUsername("root") + .setPassword("").build(); + JsonDebeziumSchemaSerializer serializer = JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions) + .build(); + serializer.setSourceConnector("mysql"); + + originFieldSchemaMap.put("id", new FieldSchema("id", "INT", "", "")); + originFieldSchemaMap.put("c2", new FieldSchema("c2", "INT", "", "")); + originFieldSchemaMap.put("c3", new FieldSchema("c3", "VARCHAR(30)", "", "")); + serializer.setOriginFieldSchemaMap(originFieldSchemaMap); + + List<String> ddlList = serializer.extractDDLList(record); + Assert.assertEquals("ALTER TABLE test.t1 RENAME COLUMN c3 c333", ddlList.get(0)); + } + @Ignore @Test public void testSerializeAddColumn() throws IOException, DorisException { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org