This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 75d0a5d31 [INLONG-6256][Sort] Support debezium-json format with schema parse for DebeziumJsonDynamicSchemaFormat (#6259) 75d0a5d31 is described below commit 75d0a5d3142e6583f63f08a00826885933533a21 Author: Charles <44659300+yunqingmo...@users.noreply.github.com> AuthorDate: Mon Oct 24 10:32:44 2022 +0800 [INLONG-6256][Sort] Support debezium-json format with schema parse for DebeziumJsonDynamicSchemaFormat (#6259) --- .../format/DebeziumJsonDynamicSchemaFormat.java | 215 ++++++++++++---- .../DebeziumJsonDynamicSchemaFormatTest.java | 7 + ...eziumJsonDynamicSchemaFormatWithSchemaTest.java | 275 +++++++++++++++++++++ 3 files changed, 448 insertions(+), 49 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java index f3bc603f1..8c9fe2d66 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java @@ -18,13 +18,25 @@ package org.apache.inlong.sort.base.format; import org.apache.flink.formats.json.JsonToRowDataConverters.JsonToRowDataConverter; +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.types.RowKind; - +import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; /** * Debezium json dynamic format @@ -33,17 +45,46 @@ public class DebeziumJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat { private static final String IDENTIFIER = "debezium-json"; private static final String DDL_FLAG = "ddl"; - private static final String SCHEMA = "sqlType"; + private static final String SCHEMA = "schema"; + private static final String SQL_TYPE = "sqlType"; private static final String AFTER = "after"; private static final String BEFORE = "before"; private static final String SOURCE = "source"; private static final String PK_NAMES = "pkNames"; private static final String OP_TYPE = "op"; - private static final String OP_READ = "r"; // snapshot read - private static final String OP_CREATE = "c"; // insert - private static final String OP_UPDATE = "u"; // update - private static final String OP_DELETE = "d"; // delete - + private static final String PAYLOAD = "payload"; + private static final String FIELDS = "fields"; + private static final String FIELD = "field"; + private static final String TYPE = "type"; + /** + * Snapshot read + */ + private static final String OP_READ = "r"; + /** + * Insert + */ + private static final String OP_CREATE = "c"; + /** + * Update + */ + private static final String OP_UPDATE = "u"; + /** + * Delete + */ + private static final String OP_DELETE = "d"; + + private static final Map<String, LogicalType> DEBEZIUM_TYPE_2_FLINK_TYPE_MAPPING = + ImmutableMap.<String, LogicalType>builder() + .put("BOOLEAN", new BooleanType()) + .put("INT8", new TinyIntType()) + .put("INT16", new SmallIntType()) + .put("INT32", new IntType()) + .put("INT64", new BigIntType()) + .put("FLOAT32", new FloatType()) + .put("FLOAT64", new DoubleType()) + .put("STRING", new VarCharType()) + .put("BYTES", new VarBinaryType()) + .build(); private static final DebeziumJsonDynamicSchemaFormat FORMAT = new DebeziumJsonDynamicSchemaFormat(); @@ -58,70 +99,138 @@ public class DebeziumJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat { @Override public JsonNode getPhysicalData(JsonNode root) { - JsonNode physicalData = root.get(AFTER); - if (physicalData == null) { - physicalData = root.get(BEFORE); + JsonNode payload = root.get(PAYLOAD); + if (payload == null) { + JsonNode physicalData = root.get(AFTER); + if (physicalData == null) { + physicalData = root.get(BEFORE); + } + return physicalData; } - return physicalData; + return getPhysicalData(payload); } @Override public List<String> extractPrimaryKeyNames(JsonNode data) { List<String> pkNames = new ArrayList<>(); - JsonNode sourceNode = data.get(SOURCE); - if (sourceNode == null) { + JsonNode payload = data.get(PAYLOAD); + if (payload == null) { + JsonNode sourceNode = data.get(SOURCE); + if (sourceNode == null) { + return pkNames; + } + JsonNode pkNamesNode = sourceNode.get(PK_NAMES); + if (pkNamesNode != null && pkNamesNode.isArray()) { + for (int i = 0; i < pkNamesNode.size(); i++) { + pkNames.add(pkNamesNode.get(i).asText()); + } + } return pkNames; } - JsonNode pkNamesNode = sourceNode.get(PK_NAMES); - if (pkNamesNode != null && pkNamesNode.isArray()) { - for (int i = 0; i < pkNamesNode.size(); i++) { - pkNames.add(pkNamesNode.get(i).asText()); - } + return extractPrimaryKeyNames(payload); + } + + @Override + public String parse(JsonNode rootNode, String pattern) throws IOException { + JsonNode payload = rootNode.get(PAYLOAD); + if (payload == null) { + return super.parse(rootNode, pattern); } - return pkNames; + return super.parse(payload, pattern); } @Override public boolean extractDDLFlag(JsonNode data) { - return data.has(DDL_FLAG) ? data.get(DDL_FLAG).asBoolean(false) : false; + JsonNode payload = data.get(PAYLOAD); + if (payload == null) { + return data.has(DDL_FLAG) && data.get(DDL_FLAG).asBoolean(false); + } + return extractDDLFlag(payload); + } + + public RowType extractSchemaFromExtractInfo(JsonNode data, List<String> pkNames) { + JsonNode payload = data.get(PAYLOAD); + if (payload == null) { + JsonNode sourceNode = data.get(SOURCE); + if (sourceNode == null) { + throw new IllegalArgumentException(String.format("Error schema: %s.", data)); + } + JsonNode schemaNode = sourceNode.get(SQL_TYPE); + if (schemaNode == null) { + throw new IllegalArgumentException(String.format("Error schema: %s.", data)); + } + return super.extractSchemaNode(schemaNode, pkNames); + } + return extractSchemaFromExtractInfo(payload, pkNames); } @Override public RowType extractSchema(JsonNode data, List<String> pkNames) { - JsonNode schema = data.get(SCHEMA); - return extractSchemaNode(schema, pkNames); + // first get schema from 'sqlType', fallback to get it from 'schema' + try { + return extractSchemaFromExtractInfo(data, pkNames); + } catch (IllegalArgumentException e) { + JsonNode schema = data.get(SCHEMA); + for (JsonNode field : schema.get(FIELDS)) { + if (AFTER.equals(field.get(FIELD).asText())) { + return extractSchemaNode(field.get(FIELDS), pkNames); + } + } + throw new IllegalArgumentException(String.format("Error schema: %s.", schema)); + } } @Override - public List<RowData> extractRowData(JsonNode data, RowType rowType) { - JsonNode opNode = data.get(OP_TYPE); - JsonNode dataBeforeNode = data.get(BEFORE); - JsonNode dataAfterNode = data.get(AFTER); - if (opNode == null || (dataBeforeNode == null && dataAfterNode == null)) { - throw new IllegalArgumentException( - String.format("Error opNode: %s, or dataBeforeNode: %s, dataAfterNode", - opNode, dataBeforeNode, dataAfterNode)); + public RowType extractSchemaNode(JsonNode schema, List<String> pkNames) { + List<RowType.RowField> fields = new ArrayList<>(); + for (JsonNode field : schema) { + String name = field.get(FIELD).asText(); + LogicalType type = debeziumType2FlinkType(field.get(TYPE).asText()); + if (pkNames.contains(name)) { + type = type.copy(false); + } + fields.add(new RowType.RowField(name, type)); } - List<RowData> rowDataList = new ArrayList<>(); - JsonToRowDataConverter rowDataConverter = rowDataConverters.createConverter(rowType); - - String op = data.get(OP_TYPE).asText(); - if (OP_CREATE.equals(op) || OP_READ.equals(op)) { - RowData rowData = (RowData) rowDataConverter.convert(dataAfterNode); - rowData.setRowKind(RowKind.INSERT); - rowDataList.add(rowData); - } else if (OP_UPDATE.equals(op)) { - RowData rowData = (RowData) rowDataConverter.convert(dataAfterNode); - rowData.setRowKind(RowKind.UPDATE_AFTER); - rowDataList.add(rowData); - } else if (OP_DELETE.equals(op)) { - RowData rowData = (RowData) rowDataConverter.convert(dataBeforeNode); - rowData.setRowKind(RowKind.DELETE); - rowDataList.add(rowData); - } else { - throw new IllegalArgumentException("Unsupported op_type: " + op); + return new RowType(fields); + } + + @Override + public List<RowData> extractRowData(JsonNode data, RowType rowType) { + JsonNode payload = data.get(PAYLOAD); + if (payload == null) { + JsonNode opNode = data.get(OP_TYPE); + JsonNode dataBeforeNode = data.get(BEFORE); + JsonNode dataAfterNode = data.get(AFTER); + if (opNode == null || (dataBeforeNode == null && dataAfterNode == null)) { + throw new IllegalArgumentException( + String.format("Error opNode: %s, or dataBeforeNode: %s, dataAfterNode: %s", + opNode, dataBeforeNode, dataAfterNode)); + } + List<RowData> rowDataList = new ArrayList<>(); + JsonToRowDataConverter rowDataConverter = rowDataConverters.createConverter(rowType); + + String op = data.get(OP_TYPE).asText(); + if (OP_CREATE.equals(op) || OP_READ.equals(op)) { + RowData rowData = (RowData) rowDataConverter.convert(dataAfterNode); + rowData.setRowKind(RowKind.INSERT); + rowDataList.add(rowData); + } else if (OP_UPDATE.equals(op)) { + RowData rowData = (RowData) rowDataConverter.convert(dataBeforeNode); + rowData.setRowKind(RowKind.UPDATE_BEFORE); + rowDataList.add(rowData); + rowData = (RowData) rowDataConverter.convert(dataAfterNode); + rowData.setRowKind(RowKind.UPDATE_AFTER); + rowDataList.add(rowData); + } else if (OP_DELETE.equals(op)) { + RowData rowData = (RowData) rowDataConverter.convert(dataBeforeNode); + rowData.setRowKind(RowKind.DELETE); + rowDataList.add(rowData); + } else { + throw new IllegalArgumentException("Unsupported op_type: " + op); + } + return rowDataList; } - return rowDataList; + return extractRowData(payload, rowType); } /** @@ -133,4 +242,12 @@ public class DebeziumJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat { public String identifier() { return IDENTIFIER; } + + private LogicalType debeziumType2FlinkType(String debeziumType) { + if (DEBEZIUM_TYPE_2_FLINK_TYPE_MAPPING.containsKey(debeziumType.toUpperCase())) { + return DEBEZIUM_TYPE_2_FLINK_TYPE_MAPPING.get(debeziumType.toUpperCase()); + } else { + throw new IllegalArgumentException("Unsupported debeziumType: " + debeziumType.toUpperCase()); + } + } } diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java index bbd51dac6..5e555a657 100644 --- a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java +++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java @@ -88,6 +88,13 @@ public class DebeziumJsonDynamicSchemaFormatTest extends DynamicSchemaFormatBase Assert.assertEquals(values, Arrays.asList("111", "scooter")); } + @Test + public void testExtractPhysicalData() throws IOException { + JsonNode rootNode = (JsonNode) getDynamicSchemaFormat() + .deserialize(getSource().getBytes(StandardCharsets.UTF_8)); + Assert.assertNotNull(((JsonDynamicSchemaFormat) getDynamicSchemaFormat()).getPhysicalData(rootNode)); + } + @SuppressWarnings({"unchecked", "rawtypes"}) @Override protected AbstractDynamicSchemaFormat getDynamicSchemaFormat() { diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatWithSchemaTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatWithSchemaTest.java new file mode 100644 index 000000000..fb268b9a7 --- /dev/null +++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatWithSchemaTest.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.format; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Test for {@link DebeziumJsonDynamicSchemaFormat} + */ +public class DebeziumJsonDynamicSchemaFormatWithSchemaTest extends DynamicSchemaFormatBaseTest<JsonNode> { + + @Override + protected String getSource() { + return "{\n" + + " \"schema\": { \n" + + " \"type\": \"struct\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"type\": \"struct\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"type\": \"int32\",\n" + + " \"optional\": false,\n" + + " \"field\": \"id\"\n" + + " },\n" + + " {\n" + + " \"type\": \"string\",\n" + + " \"optional\": false,\n" + + " \"field\": \"first_name\"\n" + + " },\n" + + " {\n" + + " \"type\": \"string\",\n" + + " \"optional\": false,\n" + + " \"field\": \"last_name\"\n" + + " },\n" + + " {\n" + + " \"type\": \"string\",\n" + + " \"optional\": false,\n" + + " \"field\": \"email\"\n" + + " }\n" + + " ],\n" + + " \"optional\": true,\n" + + " \"name\": \"mysql-server-1.inventory.customers.Value\", \n" + + " \"field\": \"before\"\n" + + " },\n" + + " {\n" + + " \"type\": \"struct\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"type\": \"int32\",\n" + + " \"optional\": false,\n" + + " \"field\": \"id\"\n" + + " },\n" + + " {\n" + + " \"type\": \"string\",\n" + + " \"optional\": false,\n" + + " \"field\": \"first_name\"\n" + + " },\n" + + " {\n" + + " \"type\": \"string\",\n" + + " \"optional\": false,\n" + + " \"field\": \"last_name\"\n" + + " },\n" + + " {\n" + + " \"type\": \"string\",\n" + + " \"optional\": false,\n" + + " \"field\": \"email\"\n" + + " }\n" + + " ],\n" + + " \"optional\": true,\n" + + " \"name\": \"mysql-server-1.inventory.customers.Value\",\n" + + " \"field\": \"after\"\n" + + " },\n" + + " {\n" + + " \"type\": \"struct\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"type\": \"string\",\n" + + " \"optional\": false,\n" + + " \"field\": \"version\"\n" + + " },\n" + + " {\n" + + " \"type\": \"string\",\n" + + " \"optional\": false,\n" + + " \"field\": \"connector\"\n" + + " },\n" + + " {\n" + + " \"type\": \"string\",\n" + + " \"optional\": false,\n" + + " \"field\": \"name\"\n" + + " },\n" + + " {\n" + + " \"type\": \"int64\",\n" + + " \"optional\": false,\n" + + " \"field\": \"ts_ms\"\n" + + " },\n" + + " {\n" + + " \"type\": \"boolean\",\n" + + " \"optional\": true,\n" + + " \"default\": false,\n" + + " \"field\": \"snapshot\"\n" + + " },\n" + + " {\n" + + " \"type\": \"string\",\n" + + " \"optional\": false,\n" + + " \"field\": \"db\"\n" + + " },\n" + + " {\n" + + " \"type\": \"string\",\n" + + " \"optional\": true,\n" + + " \"field\": \"table\"\n" + + " },\n" + + " {\n" + + " \"type\": \"int64\",\n" + + " \"optional\": false,\n" + + " \"field\": \"server_id\"\n" + + " },\n" + + " {\n" + + " \"type\": \"string\",\n" + + " \"optional\": true,\n" + + " \"field\": \"gtid\"\n" + + " },\n" + + " {\n" + + " \"type\": \"string\",\n" + + " \"optional\": false,\n" + + " \"field\": \"file\"\n" + + " },\n" + + " {\n" + + " \"type\": \"int64\",\n" + + " \"optional\": false,\n" + + " \"field\": \"pos\"\n" + + " },\n" + + " {\n" + + " \"type\": \"int32\",\n" + + " \"optional\": false,\n" + + " \"field\": \"row\"\n" + + " },\n" + + " {\n" + + " \"type\": \"int64\",\n" + + " \"optional\": true,\n" + + " \"field\": \"thread\"\n" + + " },\n" + + " {\n" + + " \"type\": \"string\",\n" + + " \"optional\": true,\n" + + " \"field\": \"query\"\n" + + " }\n" + + " ],\n" + + " \"optional\": false,\n" + + " \"name\": \"io.debezium.connector.mysql.Source\", \n" + + " \"field\": \"source\"\n" + + " },\n" + + " {\n" + + " \"type\": \"string\",\n" + + " \"optional\": false,\n" + + " \"field\": \"op\"\n" + + " },\n" + + " {\n" + + " \"type\": \"int64\",\n" + + " \"optional\": true,\n" + + " \"field\": \"ts_ms\"\n" + + " }\n" + + " ],\n" + + " \"optional\": false,\n" + + " \"name\": \"mysql-server-1.inventory.customers.Envelope\" \n" + + " },\n" + + " \"payload\": { \n" + + " \"op\": \"c\", \n" + + " \"ts_ms\": 1465491411815, \n" + + " \"before\": null, \n" + + " \"after\": { \n" + + " \"id\": 1004,\n" + + " \"first_name\": \"Anne\",\n" + + " \"last_name\": \"Kretchmar\",\n" + + " \"email\": \"an...@noanswer.org\"\n" + + " },\n" + + " \"source\": { \n" + + " \"version\": \"1.9.6.Final\",\n" + + " \"connector\": \"mysql\",\n" + + " \"name\": \"mysql-server-1\",\n" + + " \"ts_ms\": 0,\n" + + " \"pkNames\":[\"id\", \"first_name\"]," + + " \"snapshot\": false,\n" + + " \"db\": \"inventory\",\n" + + " \"table\": \"customers\",\n" + + " \"server_id\": 0,\n" + + " \"gtid\": null,\n" + + " \"file\": \"mysql-bin.000003\",\n" + + " \"pos\": 154,\n" + + " \"row\": 0,\n" + + " \"thread\": 7,\n" + + " \"query\": \"INSERT INTO customers (first_name, last_name, email)" + + " VALUES ('Anne', 'Kretchmar', 'an...@noanswer.org')\"\n" + + " }\n" + + " }\n" + + "}"; + } + + @Override + protected Map<String, String> getExpectedValues() { + Map<String, String> expectedValues = new HashMap<>(); + expectedValues.put("${source.db}${source.table}", "inventorycustomers"); + expectedValues.put("${source.db}_${source.table}", "inventory_customers"); + expectedValues.put("prefix_${source.db}_${source.table}_suffix", "prefix_inventory_customers_suffix"); + expectedValues.put("${ \t source.db \t }${ source.table }", "inventorycustomers"); + expectedValues.put("${source.db}_${source.table}_${id}_${first_name}", "inventory_customers_1004_Anne"); + return expectedValues; + } + + @Test + @SuppressWarnings({"unchecked"}) + public void testExtractPrimaryKey() throws IOException { + JsonNode rootNode = (JsonNode) getDynamicSchemaFormat() + .deserialize(getSource().getBytes(StandardCharsets.UTF_8)); + List<String> primaryKeys = getDynamicSchemaFormat().extractPrimaryKeyNames(rootNode); + List<String> values = getDynamicSchemaFormat().extractValues(rootNode, primaryKeys.toArray(new String[]{})); + Assert.assertEquals(values, Arrays.asList("1004", "Anne")); + } + + @Test + public void testExtractPhysicalData() throws IOException { + JsonNode rootNode = (JsonNode) getDynamicSchemaFormat() + .deserialize(getSource().getBytes(StandardCharsets.UTF_8)); + Assert.assertNotNull(((JsonDynamicSchemaFormat) getDynamicSchemaFormat()).getPhysicalData(rootNode)); + } + + @Test + public void testExtractRowType() throws IOException { + JsonNode rootNode = (JsonNode) getDynamicSchemaFormat() + .deserialize(getSource().getBytes(StandardCharsets.UTF_8)); + String[] names = new String[]{"id", "first_name", "last_name", "email"}; + LogicalType[] types = new LogicalType[]{ + new IntType(false), + new VarCharType(false, 1), + new VarCharType(), + new VarCharType() + }; + RowType rowType = RowType.of(true, types, names); + Assert.assertEquals(getDynamicSchemaFormat().extractSchema(rootNode), rowType); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Override + protected AbstractDynamicSchemaFormat getDynamicSchemaFormat() { + return DebeziumJsonDynamicSchemaFormat.getInstance(); + } +}