This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c4553ddc46 [INLONG-9202][Sort] Fix audit report error when running 
pulsar -> iceberg in flink1.15 (#9206)
c4553ddc46 is described below

commit c4553ddc46f455e8efa8a3b286cf2783dfcc58bd
Author: Sting <zpen...@connect.ust.hk>
AuthorDate: Fri Nov 3 15:21:01 2023 +0800

    [INLONG-9202][Sort] Fix audit report error when running pulsar -> iceberg 
in flink1.15 (#9206)
---
 .../org/apache/inlong/audit/AuditOperator.java     |  4 ++-
 .../protocol/node/extract/PulsarExtractNode.java   | 31 +++++++++++++++++++++-
 .../sort/protocol/node/load/IcebergLoadNode.java   | 16 ++++++++++-
 inlong-sort/sort-core/pom.xml                      |  6 +++++
 .../org/apache/inlong/sort/base/Constants.java     |  8 ++++++
 .../sort-connectors/pulsar/pom.xml                 |  2 --
 .../table/PulsarTableDeserializationSchema.java    | 10 +++++--
 .../PulsarTableDeserializationSchemaFactory.java   |  6 +----
 .../formats/inlongmsg/InLongMsgDecodingFormat.java | 13 +++++++++
 .../sort/formats/inlongmsg/InLongMsgUtils.java     |  3 +++
 10 files changed, 87 insertions(+), 12 deletions(-)

diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
index a89c30f6c0..e720a75cb4 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
@@ -26,6 +26,7 @@ import org.apache.inlong.audit.util.StatInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.HashMap;
@@ -42,8 +43,9 @@ import static 
org.apache.inlong.audit.protocol.AuditApi.BaseCommand.Type.AUDIT_R
 /**
  * Audit operator, which is singleton.
  */
-public class AuditOperator {
+public class AuditOperator implements Serializable {
 
+    private static final long serialVersionUID = 1L;
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AuditOperator.class);
     private static final String FIELD_SEPARATORS = ":";
     private static final String DEFAULT_AUDIT_TAG = "-1";
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
index cba35f8683..d6634013da 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
@@ -17,8 +17,10 @@
 
 package org.apache.inlong.sort.protocol.node.extract;
 
+import org.apache.inlong.common.enums.MetaField;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.InlongMetric;
+import org.apache.inlong.sort.protocol.Metadata;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
 import org.apache.inlong.sort.protocol.node.format.Format;
 import org.apache.inlong.sort.protocol.transformation.WatermarkField;
@@ -34,13 +36,15 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 @EqualsAndHashCode(callSuper = true)
 @JsonTypeName("pulsarExtract")
 @Data
-public class PulsarExtractNode extends ExtractNode implements InlongMetric {
+public class PulsarExtractNode extends ExtractNode implements InlongMetric, 
Metadata {
 
     private static final long serialVersionUID = 1L;
 
@@ -136,4 +140,29 @@ public class PulsarExtractNode extends ExtractNode 
implements InlongMetric {
     public List<FieldInfo> getPartitionFields() {
         return super.getPartitionFields();
     }
+
+    @Override
+    public String getMetadataKey(MetaField metaField) {
+        String metadataKey;
+        switch (metaField) {
+            case AUDIT_DATA_TIME:
+                metadataKey = "value.data-time";
+                break;
+            default:
+                throw new 
UnsupportedOperationException(String.format("Unsupport meta field for %s: %s",
+                        this.getClass().getSimpleName(), metaField));
+        }
+        return metadataKey;
+    }
+
+    @Override
+    public boolean isVirtual(MetaField metaField) {
+        return true;
+    }
+
+    @Override
+    public Set<MetaField> supportedMetaFields() {
+        return EnumSet.of(MetaField.AUDIT_DATA_TIME);
+    }
+
 }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
index d39114c3b7..6ffaf5f2da 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
@@ -132,9 +132,23 @@ public class IcebergLoadNode extends LoadNode implements 
InlongMetric, Metadata,
         return super.getPartitionFields();
     }
 
+    @Override
+    public String getMetadataKey(MetaField metaField) {
+        String metadataKey;
+        switch (metaField) {
+            case AUDIT_DATA_TIME:
+                metadataKey = "audit_data_time";
+                break;
+            default:
+                throw new 
UnsupportedOperationException(String.format("Unsupport meta field for %s: %s",
+                        this.getClass().getSimpleName(), metaField));
+        }
+        return metadataKey;
+    }
+
     @Override
     public boolean isVirtual(MetaField metaField) {
-        return true;
+        return false;
     }
 
     @Override
diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml
index 7228bfb59c..998905c038 100644
--- a/inlong-sort/sort-core/pom.xml
+++ b/inlong-sort/sort-core/pom.xml
@@ -293,6 +293,12 @@
                     <version>${project.version}</version>
                     <scope>test</scope>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.inlong</groupId>
+                    <artifactId>sort-connector-iceberg-v1.15</artifactId>
+                    <version>${project.version}</version>
+                    <scope>test</scope>
+                </dependency>
             </dependencies>
             <build>
                 <plugins>
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 5065c78c1f..ac7baac4dd 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -421,4 +421,12 @@ public final class Constants {
                     .booleanType()
                     .defaultValue(false)
                     .withDescription("Whether supporting auto create table 
when snapshot, default value is 'false'");
+
+    public static final ConfigOption<String> INNER_FORMAT =
+            ConfigOptions.key("inlong-msg.inner.format")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Inner format");
+
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
index d2b24382a6..7f4afdd848 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
@@ -89,8 +89,6 @@
         <dependency>
             <groupId>com.google.protobuf</groupId>
             <artifactId>protobuf-java</artifactId>
-            <scope>provided</scope>
-            <optional>true</optional>
         </dependency>
 
         <!-- Pulsar Client -->
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
index 237de45b4d..4792d7b287 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sort.pulsar.table;
 
+import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricsCollector;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 
@@ -58,13 +59,15 @@ public class PulsarTableDeserializationSchema implements 
PulsarDeserializationSc
 
     private SourceMetricData sourceMetricData;
 
+    private MetricOption metricOption;
+
     public PulsarTableDeserializationSchema(
             @Nullable DeserializationSchema<RowData> keyDeserialization,
             DeserializationSchema<RowData> valueDeserialization,
             TypeInformation<RowData> producedTypeInfo,
             PulsarRowDataConverter rowDataConverter,
             boolean upsertMode,
-            SourceMetricData sourceMetricData) {
+            MetricOption metricOption) {
         if (upsertMode) {
             checkNotNull(keyDeserialization, "upsert mode must specify a key 
format");
         }
@@ -73,7 +76,7 @@ public class PulsarTableDeserializationSchema implements 
PulsarDeserializationSc
         this.rowDataConverter = checkNotNull(rowDataConverter);
         this.producedTypeInfo = checkNotNull(producedTypeInfo);
         this.upsertMode = upsertMode;
-        this.sourceMetricData = sourceMetricData;
+        this.metricOption = metricOption;
     }
 
     @Override
@@ -82,6 +85,9 @@ public class PulsarTableDeserializationSchema implements 
PulsarDeserializationSc
         if (keyDeserialization != null) {
             keyDeserialization.open(context);
         }
+        if (metricOption != null) {
+            sourceMetricData = new SourceMetricData(metricOption);
+        }
         valueDeserialization.open(context);
     }
 
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java
index 360b07aa69..c063e26539 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java
@@ -172,17 +172,13 @@ public class PulsarTableDeserializationSchemaFactory 
implements Serializable {
                 .withAuditKeys(auditKeys)
                 .build();
 
-        if (metricOption != null) {
-            sourceMetricData = new SourceMetricData(metricOption);
-        }
-
         return new PulsarTableDeserializationSchema(
                 keyDeserialization,
                 valueDeserialization,
                 producedTypeInfo,
                 rowDataConverter,
                 upsertMode,
-                sourceMetricData);
+                metricOption);
     }
 
     public void setProducedDataType(DataType producedDataType) {
diff --git 
a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
 
b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
index 4645290c1b..0f67bbc072 100644
--- 
a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
+++ 
b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
@@ -139,6 +139,19 @@ public class InLongMsgDecodingFormat implements 
DecodingFormat<DeserializationSc
 
     enum ReadableMetadata {
 
+        DATA_TIME(
+                "data-time",
+                DataTypes.BIGINT().notNull(),
+                new MetadataConverter() {
+
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(InLongMsgHead head) {
+                        return head.getTime().getTime();
+                    }
+                }),
+
         CREATE_TIME(
                 "create-time",
                 DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE().notNull(),
diff --git 
a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
 
b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
index 16568d00c3..371ce84808 100644
--- 
a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
+++ 
b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
@@ -54,6 +54,7 @@ public class InLongMsgUtils {
 
     // keys in attributes
     public static final String INLONGMSG_ATTR_STREAM_ID = "streamId";
+    public static final String INLONGMSG_ATTR_TID = "tid";
     public static final String INLONGMSG_ATTR_TIME_T = "t";
     public static final String INLONGMSG_ATTR_TIME_DT = "dt";
     public static final String INLONGMSG_ATTR_ADD_COLUMN_PREFIX = "__addcol";
@@ -118,6 +119,8 @@ public class InLongMsgUtils {
 
         if (attributes.containsKey(INLONGMSG_ATTR_STREAM_ID)) {
             streamId = attributes.get(INLONGMSG_ATTR_STREAM_ID);
+        } else if (attributes.containsKey(INLONGMSG_ATTR_TID)) {
+            streamId = attributes.get(INLONGMSG_ATTR_TID);
         } else {
             throw new IllegalArgumentException("Could not find " + 
INLONGMSG_ATTR_STREAM_ID + " in attributes!");
         }

Reply via email to