This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fd224816e [INLONG-11674][Sort] Pulsar Source supports InlongMsg 
metadata (#11691)
5fd224816e is described below

commit 5fd224816e0af2c4dca2109cc4d8a34f80b2ae60
Author: vernedeng <verned...@apache.org>
AuthorDate: Wed Jan 22 15:27:09 2025 +0800

    [INLONG-11674][Sort] Pulsar Source supports InlongMsg metadata (#11691)
---
 .../src/main/java/org/apache/inlong/common/enums/MetaField.java    | 7 ++++++-
 .../java/org/apache/inlong/sort/protocol/node/ExtractNode.java     | 2 ++
 .../inlong/sort/protocol/node/extract/PulsarExtractNode.java       | 5 ++++-
 3 files changed, 12 insertions(+), 2 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
index e6d0bbc6cb..e80d006433 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
@@ -168,7 +168,12 @@ public enum MetaField {
     /**
      * Inlong data time for audit.
      */
-    AUDIT_DATA_TIME;
+    AUDIT_DATA_TIME,
+
+    /**
+     * Inlong properties in InlongMsg.
+     */
+    INLONG_PROPERTIES;
 
     public static MetaField forName(String name) {
         for (MetaField metaField : values()) {
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
index fb0f9dcd80..c61b12ab32 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
@@ -78,6 +78,8 @@ public abstract class ExtractNode implements Node {
 
     public static final String INLONG_MSG_AUDIT_TIME = "value.data-time";
 
+    public static final String INLONG_MSG_PROPERTIES = 
"value.inlong-msg-properties";
+
     public static final String CONSUME_AUDIT_TIME = "consume_time";
 
     @JsonProperty("id")
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
index ab90ba7d19..1887aca145 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
@@ -195,6 +195,9 @@ public class PulsarExtractNode extends ExtractNode 
implements InlongMetric, Meta
                     metadataKey = CONSUME_AUDIT_TIME;
                 }
                 break;
+            case INLONG_PROPERTIES:
+                metadataKey = INLONG_MSG_PROPERTIES;
+                break;
             default:
                 throw new 
UnsupportedOperationException(String.format("Unsupported meta field for %s: %s",
                         this.getClass().getSimpleName(), metaField));
@@ -209,7 +212,7 @@ public class PulsarExtractNode extends ExtractNode 
implements InlongMetric, Meta
 
     @Override
     public Set<MetaField> supportedMetaFields() {
-        return EnumSet.of(MetaField.AUDIT_DATA_TIME);
+        return EnumSet.of(MetaField.AUDIT_DATA_TIME, 
MetaField.INLONG_PROPERTIES);
     }
 
     @Override

Reply via email to