This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 5fd224816e [INLONG-11674][Sort] Pulsar Source supports InlongMsg metadata (#11691) 5fd224816e is described below commit 5fd224816e0af2c4dca2109cc4d8a34f80b2ae60 Author: vernedeng <verned...@apache.org> AuthorDate: Wed Jan 22 15:27:09 2025 +0800 [INLONG-11674][Sort] Pulsar Source supports InlongMsg metadata (#11691) --- .../src/main/java/org/apache/inlong/common/enums/MetaField.java | 7 ++++++- .../java/org/apache/inlong/sort/protocol/node/ExtractNode.java | 2 ++ .../inlong/sort/protocol/node/extract/PulsarExtractNode.java | 5 ++++- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java index e6d0bbc6cb..e80d006433 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java @@ -168,7 +168,12 @@ public enum MetaField { /** * Inlong data time for audit. */ - AUDIT_DATA_TIME; + AUDIT_DATA_TIME, + + /** + * Inlong properties in InlongMsg. + */ + INLONG_PROPERTIES; public static MetaField forName(String name) { for (MetaField metaField : values()) { diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java index fb0f9dcd80..c61b12ab32 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java @@ -78,6 +78,8 @@ public abstract class ExtractNode implements Node { public static final String INLONG_MSG_AUDIT_TIME = "value.data-time"; + public static final String INLONG_MSG_PROPERTIES = "value.inlong-msg-properties"; + public static final String CONSUME_AUDIT_TIME = "consume_time"; @JsonProperty("id") diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java index ab90ba7d19..1887aca145 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java @@ -195,6 +195,9 @@ public class PulsarExtractNode extends ExtractNode implements InlongMetric, Meta metadataKey = CONSUME_AUDIT_TIME; } break; + case INLONG_PROPERTIES: + metadataKey = INLONG_MSG_PROPERTIES; + break; default: throw new UnsupportedOperationException(String.format("Unsupported meta field for %s: %s", this.getClass().getSimpleName(), metaField)); @@ -209,7 +212,7 @@ public class PulsarExtractNode extends ExtractNode implements InlongMetric, Meta @Override public Set<MetaField> supportedMetaFields() { - return EnumSet.of(MetaField.AUDIT_DATA_TIME); + return EnumSet.of(MetaField.AUDIT_DATA_TIME, MetaField.INLONG_PROPERTIES); } @Override