This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 6936c319fd [INLONG-9075][Sort] TubeMQSource support InlongMsg format (#9076) 6936c319fd is described below commit 6936c319fdfe60adb9aeb7bc60d2adbd7c26697b Author: vernedeng <verned...@apache.org> AuthorDate: Fri Oct 20 14:02:20 2023 +0800 [INLONG-9075][Sort] TubeMQSource support InlongMsg format (#9076) * [INLONG-9075][Sort] TubeMQSource support InlongMsg format --- .../manager/pojo/sort/node/provider/TubeMqProvider.java | 3 ++- .../inlong/manager/pojo/source/tubemq/TubeMQSource.java | 3 +++ .../inlong/sort/protocol/constant/TubeMQConstant.java | 3 ++- .../apache/inlong/sort/protocol/node/ExtractNode.java | 2 ++ .../sort/protocol/node/extract/TubeMQExtractNode.java | 16 ++++++++++++++-- .../protocol/node/extract/TubeMQExtractNodeTest.java | 2 +- .../inlong/sort/parser/TubeMQNodeSqlParseTest.java | 2 +- .../sort/tubemq/table/TubeMQDynamicTableFactory.java | 2 +- .../sort/tubemq/table/TubeMQDynamicTableFactory.java | 8 +++----- .../apache/inlong/sort/tubemq/table/TubeMQOptions.java | 7 +++++++ 10 files changed, 36 insertions(+), 12 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java index 72399221f8..64b0c0a71f 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java @@ -55,6 +55,7 @@ public class TubeMqProvider implements ExtractNodeProvider { source.getSerializationType(), source.getGroupId(), source.getSessionKey(), - source.getTid()); + source.getTid(), + source.getInnerFormat()); } } \ No newline at end of file diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java index 6e6720208c..0a0e49127f 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java @@ -57,6 +57,9 @@ public class TubeMQSource extends StreamSource { @ApiModelProperty("Session key of the TubeMQ") private String sessionKey; + @ApiModelProperty("inlong-msg.inner.format") + private String innerFormat; + /** * The TubeMQ consumers use this tid set to filter records reading from server. */ diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/TubeMQConstant.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/TubeMQConstant.java index e5b575d053..871e3eba41 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/TubeMQConstant.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/TubeMQConstant.java @@ -28,11 +28,12 @@ public class TubeMQConstant { public static final String CONNECTOR = "connector"; - public static final String TUBEMQ = "tubemq"; + public static final String TUBEMQ = "tubemq-inlong"; public static final String MASTER_RPC = "master.rpc"; public static final String FORMAT = "format"; + public static final String INNER_FORMAT = "inlong-msg.inner.format"; public static final String SESSION_KEY = "session.key"; diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java index 09e67d573d..b9d649d621 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java @@ -71,6 +71,8 @@ import java.util.Map; @NoArgsConstructor public abstract class ExtractNode implements Node { + public static final String INLONG_MSG = "inlong-msg"; + @JsonProperty("id") private String id; @JsonInclude(Include.NON_NULL) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java index 71a5347e67..bbcdbbecd5 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java @@ -17,6 +17,7 @@ package org.apache.inlong.sort.protocol.node.extract; +import org.apache.inlong.sort.formats.util.StringUtils; import org.apache.inlong.sort.protocol.FieldInfo; import org.apache.inlong.sort.protocol.constant.TubeMQConstant; import org.apache.inlong.sort.protocol.node.ExtractNode; @@ -72,6 +73,9 @@ public class TubeMQExtractNode extends ExtractNode implements Serializable { @JsonProperty("tid") private TreeSet<String> tid; + @JsonProperty("inlong-msg.inner.format") + private String innerFormat; + @JsonCreator public TubeMQExtractNode( @JsonProperty("id") String id, @@ -84,13 +88,15 @@ public class TubeMQExtractNode extends ExtractNode implements Serializable { @Nonnull @JsonProperty("format") String format, @Nonnull @JsonProperty("groupId") String groupId, @JsonProperty("sessionKey") String sessionKey, - @JsonProperty("tid") TreeSet<String> tid) { + @JsonProperty("tid") TreeSet<String> tid, + @JsonProperty("inlong-msg.inner.format") String innerFormat) { super(id, name, fields, waterMarkField, properties); this.masterRpc = Preconditions.checkNotNull(masterRpc, "TubeMQ masterRpc is null"); this.topic = Preconditions.checkNotNull(topic, "TubeMQ topic is null"); this.format = Preconditions.checkNotNull(format, "Format is null"); this.groupId = Preconditions.checkNotNull(groupId, "Group id is null"); this.sessionKey = sessionKey; + this.innerFormat = innerFormat; this.tid = tid; } @@ -103,9 +109,15 @@ public class TubeMQExtractNode extends ExtractNode implements Serializable { map.put(TubeMQConstant.GROUP_ID, groupId); map.put(TubeMQConstant.FORMAT, format); map.put(TubeMQConstant.SESSION_KEY, sessionKey); + if (format.startsWith(INLONG_MSG)) { + map.put(TubeMQConstant.INNER_FORMAT, innerFormat); + } + if (null != tid && !tid.isEmpty()) { - map.put(TubeMQConstant.TID, tid.toString()); + map.put(TubeMQConstant.TID, StringUtils.concatCsv(tid.toArray(new String[0]), + ',', null, null)); } + return map; } diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNodeTest.java index 3943bdbfe5..3b824e0280 100644 --- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNodeTest.java +++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNodeTest.java @@ -40,7 +40,7 @@ public class TubeMQExtractNodeTest extends SerializeBaseTest<TubeMQExtractNode> new FieldInfo("salary", new FloatFormatInfo())); return new TubeMQExtractNode("1", "tubeMQ_input", fields, null, null, - "127.0.0.1:8715", "inlong", "json", "test", null, null); + "127.0.0.1:8715", "inlong", "json", "test", null, null, null); } } diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TubeMQNodeSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TubeMQNodeSqlParseTest.java index 9d6cb5336c..d9421e9e6d 100644 --- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TubeMQNodeSqlParseTest.java +++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TubeMQNodeSqlParseTest.java @@ -59,7 +59,7 @@ public class TubeMQNodeSqlParseTest extends AbstractTestBase { new FieldInfo("salary", new FloatFormatInfo())); return new TubeMQExtractNode(id, "tubeMQ_input", fields, null, null, - "127.0.0.1:8715", "inlong", "json", "test", null, null); + "127.0.0.1:8715", "inlong", "json", "test", null, null, null); } /** diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java index 28a538c22a..7101a475a7 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java @@ -58,7 +58,7 @@ import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.getTubeMQPropert */ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory { - public static final String IDENTIFIER = "tubemq"; + public static final String IDENTIFIER = "tubemq-inlong"; public static final List<String> INNERFORMATTYPE = Arrays.asList("inlong-msg"); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java index 17275d8d11..f6130b5288 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java @@ -40,9 +40,7 @@ import org.apache.flink.table.factories.SerializationFormatFactory; import org.apache.flink.table.types.DataType; import org.apache.flink.types.RowKind; -import java.util.Arrays; import java.util.HashSet; -import java.util.List; import java.util.Optional; import java.util.Set; import java.util.TreeSet; @@ -63,9 +61,9 @@ import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.getTubeMQPropert */ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { - public static final String IDENTIFIER = "tubemq"; + public static final String IDENTIFIER = "tubemq-inlong"; - public static final List<String> INNERFORMATTYPE = Arrays.asList("inlong-msg"); + public static final String INNERFORMATTYPE = "inlong-msg"; public static boolean innerFormat = false; @@ -87,7 +85,6 @@ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory, Dyn && format.getChangelogMode().containsOnly(RowKind.INSERT)) { Configuration options = Configuration.fromMap(catalogTable.getOptions()); String formatName = options.getOptional(FORMAT).orElse(options.get(FORMAT)); - innerFormat = INNERFORMATTYPE.contains(formatName); throw new ValidationException(String.format( "The TubeMQ table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint" + " on the table, because it can't guarantee the semantic of primary key.", @@ -123,6 +120,7 @@ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory, Dyn helper.validate(); validatePKConstraints(context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat); + innerFormat = INNERFORMATTYPE.equals(tableOptions.get(FORMAT)); final Configuration properties = getTubeMQProperties(context.getCatalogTable().getOptions()); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java index 0085100c87..76f85f0563 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java @@ -77,6 +77,13 @@ public class TubeMQOptions { // TubeMQ specific options // -------------------------------------------------------------------------------------------- + public static final ConfigOption<String> INNER_FORMAT = + ConfigOptions.key("inlong-msg.inner.format") + .stringType() + .noDefaultValue() + .withDescription( + "Inner format"); + public static final ConfigOption<String> TOPIC = ConfigOptions.key("topic") .stringType()