This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 222ce60 [Improve](schemaChange)schema change type adapts to other connectors (#205) 222ce60 is described below commit 222ce60833ae114f54497e2404dfee8d93993357 Author: DongLiang-0 <46414265+donglian...@users.noreply.github.com> AuthorDate: Wed Oct 11 10:05:03 2023 +0800 [Improve](schemaChange)schema change type adapts to other connectors (#205) --- .../sink/writer/JsonDebeziumSchemaSerializer.java | 156 +++++++++++++-------- .../flink/sink/writer/SchemaChangeHelper.java | 2 + .../doris/flink/tools/cdc/SourceConnector.java | 36 +++++ .../writer/TestJsonDebeziumSchemaSerializer.java | 138 ++++++++++++++---- 4 files changed, 250 insertions(+), 82 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java index b2d88c6..fd3c92a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.NullNode; import org.apache.commons.codec.binary.Base64; import org.apache.commons.collections.CollectionUtils; + import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; @@ -33,7 +34,12 @@ import org.apache.doris.flink.exception.IllegalArgumentException; import org.apache.doris.flink.rest.RestService; import org.apache.doris.flink.sink.HttpGetWithEntity; import org.apache.doris.flink.sink.writer.SchemaChangeHelper.DDLSchema; +import org.apache.doris.flink.tools.cdc.SourceConnector; import org.apache.doris.flink.tools.cdc.mysql.MysqlType; +import org.apache.doris.flink.tools.cdc.oracle.OracleType; +import org.apache.doris.flink.tools.cdc.postgres.PostgresType; +import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType; + import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.util.StringUtils; import org.apache.http.HttpHeaders; @@ -73,14 +79,15 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin private static final String OP_UPDATE = "u"; // update private static final String OP_DELETE = "d"; // delete - public static final String EXECUTE_DDL = "ALTER TABLE %s %s COLUMN %s %s"; //alter table tbl add cloumn aca int - private static final String addDropDDLRegex = "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*"; + public static final String EXECUTE_DDL = "ALTER TABLE %s %s COLUMN %s %s"; // alter table tbl add cloumn aca int + private static final String addDropDDLRegex + = "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*"; private final Pattern addDropDDLPattern; private DorisOptions dorisOptions; private ObjectMapper objectMapper = new ObjectMapper(); private String database; private String table; - //table name of the cdc upstream, format is db.tbl + // table name of the cdc upstream, format is db.tbl private String sourceTableName; private boolean firstLoad; private boolean firstSchemaChange; @@ -88,11 +95,12 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin private final boolean newSchemaChange; private String lineDelimiter = LINE_DELIMITER_DEFAULT; private boolean ignoreUpdateBefore = true; + private SourceConnector sourceConnector; public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions, - Pattern pattern, - String sourceTableName, - boolean newSchemaChange) { + Pattern pattern, + String sourceTableName, + boolean newSchemaChange) { this.dorisOptions = dorisOptions; this.addDropDDLPattern = pattern == null ? Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE) : pattern; String[] tableInfo = dorisOptions.getTableIdentifier().split("\\."); @@ -109,13 +117,14 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin } public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions, - Pattern pattern, - String sourceTableName, - boolean newSchemaChange, - DorisExecutionOptions executionOptions){ + Pattern pattern, + String sourceTableName, + boolean newSchemaChange, + DorisExecutionOptions executionOptions) { this(dorisOptions, pattern, sourceTableName, newSchemaChange); - if(executionOptions != null){ - this.lineDelimiter = executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT); + if (executionOptions != null) { + this.lineDelimiter = executionOptions.getStreamLoadProp() + .getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT); this.ignoreUpdateBefore = executionOptions.getIgnoreUpdateBefore(); } } @@ -126,7 +135,7 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class); String op = extractJsonNode(recordRoot, "op"); if (Objects.isNull(op)) { - //schema change ddl + // schema change ddl if (newSchemaChange) { schemaChangeV2(recordRoot); } else { @@ -160,20 +169,21 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin /** * Change the update event into two + * * @param recordRoot * @return */ private byte[] extractUpdate(JsonNode recordRoot) throws JsonProcessingException { StringBuilder updateRow = new StringBuilder(); - if(!ignoreUpdateBefore){ - //convert delete + if (!ignoreUpdateBefore) { + // convert delete Map<String, Object> beforeRow = extractBeforeRow(recordRoot); addDeleteSign(beforeRow, true); updateRow.append(objectMapper.writeValueAsString(beforeRow)) .append(this.lineDelimiter); } - //convert insert + // convert insert Map<String, Object> afterRow = extractAfterRow(recordRoot); addDeleteSign(afterRow, false); updateRow.append(objectMapper.writeValueAsString(afterRow)); @@ -207,14 +217,15 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin } private boolean checkSchemaChange(DDLSchema ddlSchema) throws IOException, IllegalArgumentException { - String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API, RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table); - Map<String,Object> param = buildRequestParam(ddlSchema); + String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API, + RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table); + Map<String, Object> param = buildRequestParam(ddlSchema); HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl); httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(param))); boolean success = handleResponse(httpGet); if (!success) { - LOG.warn("schema change can not do table {}.{}",database,table); + LOG.warn("schema change can not do table {}.{}", database, table); } return success; } @@ -224,18 +235,18 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin JsonNode historyRecord = extractHistoryRecord(record); JsonNode tableChanges = historyRecord.get("tableChanges"); String ddl = extractJsonNode(historyRecord, "ddl"); - if(Objects.isNull(tableChanges) || Objects.isNull(ddl)){ + if (Objects.isNull(tableChanges) || Objects.isNull(ddl)) { return new ArrayList<>(); } LOG.debug("received debezium ddl :{}", ddl); JsonNode tableChange = tableChanges.get(0); - Matcher matcher = addDropDDLPattern.matcher(ddl); - if (Objects.isNull(tableChange)|| !tableChange.get("type").asText().equals("ALTER") || !matcher.find()) { + if (Objects.isNull(tableChange) || !tableChange.get("type").asText().equals("ALTER")) { return null; } JsonNode columns = tableChange.get("table").get("columns"); if (firstSchemaChange) { + sourceConnector = SourceConnector.valueOf(record.get("source").get("connector").asText().toUpperCase()); fillOriginSchema(columns); } Map<String, FieldSchema> updateFiledSchema = new LinkedHashMap<>(); @@ -243,25 +254,30 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin buildFieldSchema(updateFiledSchema, column); } SchemaChangeHelper.compareSchema(updateFiledSchema, originFieldSchemaMap); + // In order to avoid operations such as rename or change, which may lead to the accidental deletion of the doris column. + Matcher matcher = addDropDDLPattern.matcher(ddl); + if (!matcher.find()) { + return null; + } return SchemaChangeHelper.generateDDLSql(dorisOptions.getTableIdentifier()); } @VisibleForTesting public boolean schemaChange(JsonNode recordRoot) { boolean status = false; - try{ - if(!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)){ + try { + if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) { return false; } String ddl = extractDDL(recordRoot); - if(StringUtils.isNullOrWhitespaceOnly(ddl)){ + if (StringUtils.isNullOrWhitespaceOnly(ddl)) { LOG.info("ddl can not do schema change:{}", recordRoot); return false; } boolean doSchemaChange = checkSchemaChange(ddl); status = doSchemaChange && execSchemaChange(ddl); LOG.info("schema change status:{}", status); - }catch (Exception ex){ + } catch (Exception ex) { LOG.warn("schema change error :", ex); } return status; @@ -278,17 +294,18 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin } private void addDeleteSign(Map<String, Object> valueMap, boolean delete) { - if(delete){ + if (delete) { valueMap.put(DORIS_DELETE_SIGN, "1"); - }else{ + } else { valueMap.put(DORIS_DELETE_SIGN, "0"); } } private boolean checkSchemaChange(String ddl) throws IOException, IllegalArgumentException { - String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API, RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table); - Map<String,Object> param = buildRequestParam(ddl); - if(param.size() != 2){ + String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API, + RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table); + Map<String, Object> param = buildRequestParam(ddl); + if (param.size() != 2) { return false; } HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl); @@ -296,7 +313,7 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(param))); boolean success = handleResponse(httpGet); if (!success) { - LOG.warn("schema change can not do table {}.{}",database,table); + LOG.warn("schema change can not do table {}.{}", database, table); } return success; } @@ -316,9 +333,9 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin * } */ protected Map<String, Object> buildRequestParam(String ddl) { - Map<String,Object> params = new HashMap<>(); + Map<String, Object> params = new HashMap<>(); Matcher matcher = addDropDDLPattern.matcher(ddl); - if(matcher.find()){ + if (matcher.find()) { String op = matcher.group(1); String col = matcher.group(3); params.put("isDropColumn", op.equalsIgnoreCase("DROP")); @@ -330,7 +347,8 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin private boolean execSchemaChange(String ddl) throws IOException, IllegalArgumentException { Map<String, String> param = new HashMap<>(); param.put("stmt", ddl); - String requestUrl = String.format(SCHEMA_CHANGE_API, RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database); + String requestUrl = String.format(SCHEMA_CHANGE_API, + RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database); HttpPost httpPost = new HttpPost(requestUrl); httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/json"); @@ -340,10 +358,10 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin } protected String extractDatabase(JsonNode record) { - if(record.get("source").has("schema")){ - //compatible with schema + if (record.get("source").has("schema")) { + // compatible with schema return extractJsonNode(record.get("source"), "schema"); - }else{ + } else { return extractJsonNode(record.get("source"), "db"); } } @@ -366,7 +384,7 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin LOG.error("schema change response:{}", loadResult); } } - }catch(Exception e){ + } catch (Exception e) { LOG.error("http request error,", e); } return false; @@ -392,10 +410,10 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin } private JsonNode extractHistoryRecord(JsonNode record) throws JsonProcessingException { - if(record.has("historyRecord")){ + if (record.has("historyRecord")) { return objectMapper.readTree(record.get("historyRecord").asText()); } - //The ddl passed by some scenes will not be included in the historyRecord, such as DebeziumSourceFunction + // The ddl passed by some scenes will not be included in the historyRecord, such as DebeziumSourceFunction return record; } @@ -404,9 +422,9 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin String ddl = extractJsonNode(historyRecord, "ddl"); LOG.debug("received debezium ddl :{}", ddl); if (!Objects.isNull(ddl)) { - //filter add/drop operation + // filter add/drop operation Matcher matcher = addDropDDLPattern.matcher(ddl); - if(matcher.find()){ + if (matcher.find()) { String op = matcher.group(1); String col = matcher.group(3); String type = matcher.group(5); @@ -420,7 +438,8 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin } private String authHeader() { - return "Basic " + new String(Base64.encodeBase64((dorisOptions.getUsername() + ":" + dorisOptions.getPassword()).getBytes(StandardCharsets.UTF_8))); + return "Basic " + new String(Base64.encodeBase64( + (dorisOptions.getUsername() + ":" + dorisOptions.getPassword()).getBytes(StandardCharsets.UTF_8))); } @VisibleForTesting @@ -429,16 +448,12 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin for (JsonNode column : columns) { String fieldName = column.get("name").asText(); if (originFieldSchemaMap.containsKey(fieldName)) { - JsonNode length = column.get("length"); - JsonNode scale = column.get("scale"); - String type = MysqlType.toDorisType(column.get("typeName").asText(), - length == null ? 0 : length.asInt(), - scale == null ? 0 : scale.asInt()); + String dorisTypeName = buildDorisTypeName(column); String defaultValue = handleDefaultValue(extractJsonNode(column, "defaultValueExpression")); String comment = extractJsonNode(column, "comment"); FieldSchema fieldSchema = originFieldSchemaMap.get(fieldName); fieldSchema.setName(fieldName); - fieldSchema.setTypeString(type); + fieldSchema.setTypeString(dorisTypeName); fieldSchema.setComment(comment); fieldSchema.setDefaultValue(defaultValue); } @@ -453,13 +468,36 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin private void buildFieldSchema(Map<String, FieldSchema> filedSchemaMap, JsonNode column) { String fieldName = column.get("name").asText(); - JsonNode length = column.get("length"); - JsonNode scale = column.get("scale"); - String type = MysqlType.toDorisType(column.get("typeName").asText(), - length == null ? 0 : length.asInt(), scale == null ? 0 : scale.asInt()); + String dorisTypeName = buildDorisTypeName(column); String defaultValue = handleDefaultValue(extractJsonNode(column, "defaultValueExpression")); String comment = extractJsonNode(column, "comment"); - filedSchemaMap.put(fieldName, new FieldSchema(fieldName, type, defaultValue, comment)); + filedSchemaMap.put(fieldName, new FieldSchema(fieldName, dorisTypeName, defaultValue, comment)); + } + + @VisibleForTesting + public String buildDorisTypeName(JsonNode column) { + int length = column.get("length") == null ? 0 : column.get("length").asInt(); + int scale = column.get("scale") == null ? 0 : column.get("scale").asInt(); + String sourceTypeName = column.get("typeName").asText(); + String dorisTypeName; + switch (sourceConnector) { + case MYSQL: + dorisTypeName = MysqlType.toDorisType(sourceTypeName, length, scale); + break; + case ORACLE: + dorisTypeName = OracleType.toDorisType(sourceTypeName, length, scale); + break; + case POSTGRES: + dorisTypeName = PostgresType.toDorisType(sourceTypeName, length, scale); + break; + case SQLSERVER: + dorisTypeName = SqlServerType.toDorisType(sourceTypeName, length, scale); + break; + default: + String errMsg = "Not support " + sourceTypeName + " schema change."; + throw new UnsupportedOperationException(errMsg); + } + return dorisTypeName; } private String handleDefaultValue(String defaultValue) { @@ -493,6 +531,11 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin return originFieldSchemaMap; } + @VisibleForTesting + public void setSourceConnector(String sourceConnector) { + this.sourceConnector = SourceConnector.valueOf(sourceConnector.toUpperCase()); + } + public static JsonDebeziumSchemaSerializer.Builder builder() { return new JsonDebeziumSchemaSerializer.Builder(); } @@ -533,7 +576,8 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin } public JsonDebeziumSchemaSerializer build() { - return new JsonDebeziumSchemaSerializer(dorisOptions, addDropDDLPattern, sourceTableName, newSchemaChange, executionOptions); + return new JsonDebeziumSchemaSerializer(dorisOptions, addDropDDLPattern, sourceTableName, newSchemaChange, + executionOptions); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java index 302770c..dc8d83b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java @@ -37,6 +37,8 @@ public class SchemaChangeHelper { public static void compareSchema(Map<String, FieldSchema> updateFiledSchemaMap, Map<String, FieldSchema> originFieldSchemaMap) { + dropFieldSchemas.clear(); + addFieldSchemas.clear(); for (Entry<String, FieldSchema> updateFieldSchema : updateFiledSchemaMap.entrySet()) { String columName = updateFieldSchema.getKey(); if (!originFieldSchemaMap.containsKey(columName)) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceConnector.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceConnector.java new file mode 100644 index 0000000..a404dea --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceConnector.java @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.tools.cdc; + +public enum SourceConnector { + + MYSQL("mysql"), + ORACLE("oracle"), + POSTGRES("postgres"), + SQLSERVER("sqlserver"); + + public final String connectorName; + + SourceConnector(String connectorName) { + this.connectorName = connectorName; + } + + public String getConnectorName() { + return connectorName; + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java index 44816d0..c4bdbc0 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java @@ -67,9 +67,12 @@ public class TestJsonDebeziumSchemaSerializer { @Test public void testSerializeInsert() throws IOException { - //insert into t1 VALUES(1,"doris",'2022-01-01','2022-01-01 10:01:02','2022-01-01 10:01:03'); - byte[] serializedValue = serializer.serialize("{\"before\":null,\"after\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663923840000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":11834,\"row\":0,\"thread\":null,\ [...] - Map<String, String> valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, String>>(){}); + // insert into t1 VALUES(1,"doris",'2022-01-01','2022-01-01 10:01:02','2022-01-01 10:01:03'); + byte[] serializedValue = serializer.serialize( + "{\"before\":null,\"after\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663923840000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":11834,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"c\",\"ts_ms\": [...] + Map<String, String> valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8), + new TypeReference<Map<String, String>>() { + }); Assert.assertEquals("1", valueMap.get("id")); Assert.assertEquals("doris", valueMap.get("name")); Assert.assertEquals("2022-01-01", valueMap.get("dt")); @@ -82,9 +85,12 @@ public class TestJsonDebeziumSchemaSerializer { @Test public void testSerializeUpdate() throws IOException { - //update t1 set name='doris-update' WHERE id =1; - byte[] serializedValue = serializer.serialize("{\"before\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924082000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":n [...] - Map<String, String> valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, String>>(){}); + // update t1 set name='doris-update' WHERE id =1; + byte[] serializedValue = serializer.serialize( + "{\"before\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924082000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\ [...] + Map<String, String> valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8), + new TypeReference<Map<String, String>>() { + }); Assert.assertEquals("1", valueMap.get("id")); Assert.assertEquals("doris-update", valueMap.get("name")); Assert.assertEquals("2022-01-01", valueMap.get("dt")); @@ -97,12 +103,15 @@ public class TestJsonDebeziumSchemaSerializer { @Test public void testSerializeUpdateBefore() throws IOException { serializer = JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions) - .setExecutionOptions(DorisExecutionOptions.builderDefaults().setIgnoreUpdateBefore(false).build()).build(); - //update t1 set name='doris-update' WHERE id =1; - byte[] serializedValue = serializer.serialize("{\"before\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924082000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":n [...] + .setExecutionOptions(DorisExecutionOptions.builderDefaults().setIgnoreUpdateBefore(false).build()) + .build(); + // update t1 set name='doris-update' WHERE id =1; + byte[] serializedValue = serializer.serialize( + "{\"before\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924082000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\ [...] String row = new String(serializedValue, StandardCharsets.UTF_8); String[] split = row.split("\n"); - Map<String, String> valueMap = objectMapper.readValue(split[1], new TypeReference<Map<String, String>>(){}); + Map<String, String> valueMap = objectMapper.readValue(split[1], new TypeReference<Map<String, String>>() { + }); Assert.assertEquals("1", valueMap.get("id")); Assert.assertEquals("doris-update", valueMap.get("name")); Assert.assertEquals("2022-01-01", valueMap.get("dt")); @@ -111,14 +120,18 @@ public class TestJsonDebeziumSchemaSerializer { Assert.assertEquals("0", valueMap.get("__DORIS_DELETE_SIGN__")); Assert.assertEquals(6, valueMap.size()); - Map<String, String> beforeMap = objectMapper.readValue(split[0], new TypeReference<Map<String, String>>(){}); + Map<String, String> beforeMap = objectMapper.readValue(split[0], new TypeReference<Map<String, String>>() { + }); Assert.assertEquals("doris", beforeMap.get("name")); } @Test public void testSerializeDelete() throws IOException { - byte[] serializedValue = serializer.serialize("{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\" [...] - Map<String, String> valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, String>>(){}); + byte[] serializedValue = serializer.serialize( + "{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"d\",\"t [...] + Map<String, String> valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8), + new TypeReference<Map<String, String>>() { + }); Assert.assertEquals("1", valueMap.get("id")); Assert.assertEquals("doris-update", valueMap.get("name")); Assert.assertEquals("2022-01-01", valueMap.get("dt")); @@ -131,13 +144,15 @@ public class TestJsonDebeziumSchemaSerializer { @Test public void testExtractDDL() throws IOException { String srcDDL = "ALTER TABLE test.t1 add COLUMN c_1 varchar(600)"; - String record = "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\ [...] + String record + = "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_ [...] JsonNode recordRoot = objectMapper.readTree(record); String ddl = serializer.extractDDL(recordRoot); Assert.assertEquals(srcDDL, ddl); String targetDDL = "ALTER TABLE test.t1 add COLUMN c_1 varchar(65533)"; - String record1 = "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null, [...] + String record1 + = "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_ [...] JsonNode recordRoot1 = objectMapper.readTree(record1); String ddl1 = serializer.extractDDL(recordRoot1); Assert.assertEquals(targetDDL, ddl1); @@ -156,9 +171,10 @@ public class TestJsonDebeziumSchemaSerializer { String sql7 = "ALTER TABLE test.t1 DROP COLUMN test_time"; String sql8 = "ALTER TABLE test.t1 DROP COLUMN c1"; String sql9 = "ALTER TABLE test.t1 DROP COLUMN cc"; - List<String> srcSqlList = Arrays.asList(sql0, sql1, sql2, sql3, sql4,sql5,sql6,sql7,sql8,sql9); + List<String> srcSqlList = Arrays.asList(sql0, sql1, sql2, sql3, sql4, sql5, sql6, sql7, sql8, sql9); - String record = "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691033764674,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23305,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23305,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\ [...] + String record + = "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691033764674,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23305,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23305,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_ [...] JsonNode recordRoot = objectMapper.readTree(record); List<String> ddlSQLList = serializer.extractDDLList(recordRoot); for (int i = 0; i < ddlSQLList.size(); i++) { @@ -168,9 +184,55 @@ public class TestJsonDebeziumSchemaSerializer { } } + @Test + public void testExtractDDLListCreateTable() throws IOException { + String record + = "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"oracle\",\"name\":\"oracle_logminer\",\"ts_ms\":1696945825065,\"snapshot\":\"true\",\"db\":\"HELOWIN\",\"sequence\":null,\"schema\":\"ADMIN\",\"table\":\"PERSONS\",\"txId\":null,\"scn\":\"1199617\",\"commit_scn\":null,\"lcr_position\":null,\"rs_id\":null,\"ssn\":0,\"redo_thread\":null},\"databaseName\":\"HELOWIN\",\"schemaName\":\"ADMIN\",\"ddl\":\"\\n CREATE TABLE \\\"ADMIN\\\".\\\"PERSONS\\\" \\n (\\t\\\"ID\ [...] + JsonNode recordRoot = objectMapper.readTree(record); + List<String> ddlSQLList = serializer.extractDDLList(recordRoot); + Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList)); + } + + @Test + public void testExtractDDLListTruncateTable() throws IOException { + String record + = "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696944601264,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink11\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":5719,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":5719,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null, [...] + JsonNode recordRoot = objectMapper.readTree(record); + List<String> ddlSQLList = serializer.extractDDLList(recordRoot); + Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList)); + } + + @Test + public void testExtractDDLListDropTable() throws IOException { + String record + = "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696944747956,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink11\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":5901,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":5901,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null, [...] + JsonNode recordRoot = objectMapper.readTree(record); + List<String> ddlSQLList = serializer.extractDDLList(recordRoot); + Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList)); + } + + @Test + public void testExtractDDLListChangeColumn() throws IOException { + String record + = "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696945030603,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":6521,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6521,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\ [...] + JsonNode recordRoot = objectMapper.readTree(record); + List<String> ddlSQLList = serializer.extractDDLList(recordRoot); + Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList)); + } + + @Test + public void testExtractDDLListModifyColumn() throws IOException { + String record + = "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696945306941,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":6738,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6738,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\ [...] + JsonNode recordRoot = objectMapper.readTree(record); + List<String> ddlSQLList = serializer.extractDDLList(recordRoot); + Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList)); + } + @Test public void testExtractDDLListRenameColumn() throws IOException { - String record = "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691034519226,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23752,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23752,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\ [...] + String record + = "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691034519226,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23752,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23752,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_ [...] JsonNode recordRoot = objectMapper.readTree(record); List<String> ddlSQLList = serializer.extractDDLList(recordRoot); Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList)); @@ -184,7 +246,9 @@ public class TestJsonDebeziumSchemaSerializer { srcFiledSchemaMap.put("test_time", new FieldSchema("test_time", "DATETIMEV2(0)", null, null)); srcFiledSchemaMap.put("c1", new FieldSchema("c1", "INT", "'100'", null)); - String columnsString = "[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"name\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":50,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"co [...] + serializer.setSourceConnector("mysql"); + String columnsString + = "[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"name\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":50,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null, [...] JsonNode columns = objectMapper.readTree(columnsString); serializer.fillOriginSchema(columns); Map<String, FieldSchema> originFieldSchemaMap = serializer.getOriginFieldSchemaMap(); @@ -202,11 +266,32 @@ public class TestJsonDebeziumSchemaSerializer { } } + @Test + public void testBuildMysql2DorisTypeName() throws IOException { + String columnInfo + = "{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"10\",\"enumValues\":[]}"; + serializer.setSourceConnector("mysql"); + JsonNode columns = objectMapper.readTree(columnInfo); + String dorisTypeName = serializer.buildDorisTypeName(columns); + Assert.assertEquals(dorisTypeName, "INT"); + } + + @Test + public void testBuildOracle2DorisTypeName() throws IOException { + String columnInfo + = "{\"name\":\"NAME\",\"jdbcType\":12,\"nativeType\":null,\"typeName\":\"VARCHAR2\",\"typeExpression\":\"VARCHAR2\",\"charsetName\":null,\"length\":128,\"scale\":null,\"position\":2,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null}"; + serializer.setSourceConnector("oracle"); + JsonNode columns = objectMapper.readTree(columnInfo); + String dorisTypeName = serializer.buildDorisTypeName(columns); + Assert.assertEquals(dorisTypeName, "VARCHAR(384)"); + } + @Ignore @Test public void testSerializeAddColumn() throws IOException, DorisException { // alter table t1 add column c_1 varchar(200) - String record = "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\ [...] + String record + = "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_ [...] JsonNode recordRoot = objectMapper.readTree(record); boolean flag = serializer.schemaChange(recordRoot); Assert.assertEquals(true, flag); @@ -220,8 +305,9 @@ public class TestJsonDebeziumSchemaSerializer { @Ignore @Test public void testSerializeDropColumn() throws IOException, DorisException { - //alter table t1 drop column c_1; - String ddl = "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663925897321,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13298,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13298,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\" [...] + // alter table t1 drop column c_1; + String ddl + = "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663925897321,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13298,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13298,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_ [...] JsonNode recordRoot = objectMapper.readTree(ddl); boolean flag = serializer.schemaChange(recordRoot); Assert.assertEquals(true, flag); @@ -230,13 +316,13 @@ public class TestJsonDebeziumSchemaSerializer { Assert.assertNull(targetField); } - private static Field getField(String column) throws DorisException{ - //get table schema + private static Field getField(String column) throws DorisException { + // get table schema Schema schema = RestService.getSchema(dorisOptions, DorisReadOptions.builder().build(), LOG); List<Field> properties = schema.getProperties(); Field targetField = null; - for(Field field : properties){ - if(column.equals(field.getName())){ + for (Field field : properties) { + if (column.equals(field.getName())) { targetField = field; break; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org