This is an automated email from the ASF dual-hosted git repository. pacinogong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 4627e1cab [INLONG-7121][Sort] Fix the parsing error of Flink SQL for MongoDB (#7122) 4627e1cab is described below commit 4627e1cabf6f87fb22ef09556a9019a2660dd0ae Author: kuansix <490305...@qq.com> AuthorDate: Wed Jan 4 10:15:38 2023 +0800 [INLONG-7121][Sort] Fix the parsing error of Flink SQL for MongoDB (#7122) --- .../protocol/node/extract/MongoExtractNode.java | 39 +++++++++++++++++++++- .../node/extract/MongoExtractNodeTest.java | 4 +++ .../inlong/sort/parser/impl/FlinkSqlParser.java | 33 +++++++++++++++--- 3 files changed, 70 insertions(+), 6 deletions(-) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNode.java index 881ab472d..38daa67ef 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNode.java @@ -120,6 +120,40 @@ public class MongoExtractNode extends ExtractNode implements InlongMetric, Metad return options; } + @Override + public String getMetadataKey(MetaField metaField) { + String metadataKey; + switch (metaField) { + case TABLE_NAME: + metadataKey = "table_name"; + break; + case COLLECTION_NAME: + metadataKey = "collection_name"; + break; + case SCHEMA_NAME: + metadataKey = "schema_name"; + break; + case DATABASE_NAME: + metadataKey = "database_name"; + break; + case OP_TS: + metadataKey = "op_ts"; + break; + case DATA_DEBEZIUM: + case DATA_BYTES_DEBEZIUM: + metadataKey = "meta.data_debezium"; + break; + case DATA_CANAL: + case DATA_BYTES_CANAL: + metadataKey = "meta.data_canal"; + break; + default: + throw new UnsupportedOperationException(String.format("Unsupport meta field for %s: %s", + this.getClass().getSimpleName(), metaField)); + } + return metadataKey; + } + @Override public boolean isVirtual(MetaField metaField) { return true; @@ -127,6 +161,9 @@ public class MongoExtractNode extends ExtractNode implements InlongMetric, Metad @Override public Set<MetaField> supportedMetaFields() { - return EnumSet.of(MetaField.PROCESS_TIME, MetaField.COLLECTION_NAME, MetaField.DATABASE_NAME, MetaField.OP_TS); + return EnumSet.of(MetaField.PROCESS_TIME, MetaField.COLLECTION_NAME, + MetaField.DATABASE_NAME, MetaField.OP_TS, + MetaField.DATA_DEBEZIUM, MetaField.DATA_BYTES_DEBEZIUM, + MetaField.DATA_CANAL, MetaField.DATA_BYTES_CANAL); } } diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNodeTest.java index ff4911e0b..db803122e 100644 --- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNodeTest.java +++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/MongoExtractNodeTest.java @@ -51,6 +51,10 @@ public class MongoExtractNodeTest extends SerializeBaseTest<MongoExtractNode> { formatMap.put(MetaField.COLLECTION_NAME, "STRING METADATA FROM 'collection_name' VIRTUAL"); formatMap.put(MetaField.DATABASE_NAME, "STRING METADATA FROM 'database_name' VIRTUAL"); formatMap.put(MetaField.OP_TS, "TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL"); + formatMap.put(MetaField.DATA_BYTES_DEBEZIUM, "BYTES METADATA FROM 'meta.data_debezium' VIRTUAL"); + formatMap.put(MetaField.DATA_DEBEZIUM, "STRING METADATA FROM 'meta.data_debezium' VIRTUAL"); + formatMap.put(MetaField.DATA_CANAL, "STRING METADATA FROM 'meta.data_canal' VIRTUAL"); + formatMap.put(MetaField.DATA_BYTES_CANAL, "BYTES METADATA FROM 'meta.data_canal' VIRTUAL"); MongoExtractNode node = getTestObject(); boolean formatEquals = true; for (MetaField metaField : node.supportedMetaFields()) { diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java index 44cc679eb..22f3b968f 100644 --- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java +++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java @@ -40,6 +40,7 @@ import org.apache.inlong.sort.protocol.enums.FilterStrategy; import org.apache.inlong.sort.protocol.node.ExtractNode; import org.apache.inlong.sort.protocol.node.LoadNode; import org.apache.inlong.sort.protocol.node.Node; +import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode; import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode; import org.apache.inlong.sort.protocol.node.transform.DistinctNode; import org.apache.inlong.sort.protocol.node.transform.TransformNode; @@ -75,6 +76,7 @@ public class FlinkSqlParser implements Parser { private static final Logger log = LoggerFactory.getLogger(FlinkSqlParser.class); + public static final String SOURCE_MULTIPLE_ENABLE_KEY = "source.multiple.enable"; private final TableEnvironment tableEnv; private final GroupInfo groupInfo; private final Set<String> hasParsedSet = new HashSet<>(); @@ -742,8 +744,9 @@ public class FlinkSqlParser implements Parser { } StringBuilder sb = new StringBuilder("CREATE TABLE `"); sb.append(node.genTableName()).append("`(\n"); - sb.append(genPrimaryKey(node.getPrimaryKey())); - sb.append(parseFields(node.getFields(), node)); + String filterPrimaryKey = getFilterPrimaryKey(node); + sb.append(genPrimaryKey(node.getPrimaryKey(), filterPrimaryKey)); + sb.append(parseFields(node.getFields(), node, filterPrimaryKey)); if (node instanceof ExtractNode) { ExtractNode extractNode = (ExtractNode) node; if (extractNode.getWatermarkField() != null) { @@ -759,6 +762,19 @@ public class FlinkSqlParser implements Parser { return sb.toString(); } + /** + * Get filter PrimaryKey for Mongo when multi-sink mode + */ + private String getFilterPrimaryKey(Node node) { + if (node instanceof MongoExtractNode) { + if (null != node.getProperties().get(SOURCE_MULTIPLE_ENABLE_KEY) + && node.getProperties().get(SOURCE_MULTIPLE_ENABLE_KEY).equals("true")) { + return node.getPrimaryKey(); + } + } + return null; + } + /** * Gen create table DDL for hbase load */ @@ -857,11 +873,15 @@ public class FlinkSqlParser implements Parser { * * @param fields The fields defined in node * @param node The abstract of extract, transform, load + * @param filterPrimaryKey filter PrimaryKey, use for mongo * @return Field formats in select sql */ - private String parseFields(List<FieldInfo> fields, Node node) { + private String parseFields(List<FieldInfo> fields, Node node, String filterPrimaryKey) { StringBuilder sb = new StringBuilder(); for (FieldInfo field : fields) { + if (StringUtils.isNotBlank(filterPrimaryKey) && field.getName().equals(filterPrimaryKey)) { + continue; + } sb.append(" `").append(field.getName()).append("` "); if (field instanceof MetaFieldInfo) { if (!(node instanceof Metadata)) { @@ -890,10 +910,13 @@ public class FlinkSqlParser implements Parser { * Generate primary key format in sql * * @param primaryKey The primary key of table + * @param filterPrimaryKey filter PrimaryKey, use for mongo * @return Primary key format in sql */ - private String genPrimaryKey(String primaryKey) { - if (StringUtils.isNotBlank(primaryKey)) { + private String genPrimaryKey(String primaryKey, String filterPrimaryKey) { + boolean checkPrimaryKeyFlag = StringUtils.isNotBlank(primaryKey) + && (StringUtils.isBlank(filterPrimaryKey) || !primaryKey.equals(filterPrimaryKey)); + if (checkPrimaryKeyFlag) { primaryKey = String.format(" PRIMARY KEY (%s) NOT ENFORCED,\n", StringUtils.join(formatFields(primaryKey.split(",")), ",")); } else {