This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ca0abf2438 [INLONG-9247][Sort] TubeMQ source support audit when the 
deserialized type is not InlongMsg (#9258)
ca0abf2438 is described below

commit ca0abf243863cac31fe9f89da2c01a812e347b65
Author: vernedeng <verned...@apache.org>
AuthorDate: Mon Nov 13 10:51:32 2023 +0800

    [INLONG-9247][Sort] TubeMQ source support audit when the deserialized type 
is not InlongMsg (#9258)
---
 .../inlong/sort/protocol/node/ExtractNode.java     |   4 +
 .../protocol/node/extract/TubeMQExtractNode.java   |   9 +-
 .../sort-connectors/tubemq/pom.xml                 |   5 +
 .../inlong/sort/tubemq/FlinkTubeMQConsumer.java    |  12 +-
 .../table/DynamicTubeMQDeserializationSchema.java  | 148 ++++-----------------
 ...> DynamicTubeMQTableDeserializationSchema.java} |  41 +++---
 .../tubemq/table/TubeMQDynamicTableFactory.java    |  10 +-
 .../sort/tubemq/table/TubeMQTableSource.java       |  13 +-
 8 files changed, 85 insertions(+), 157 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
index b9d649d621..fc68f0f356 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
@@ -73,6 +73,10 @@ public abstract class ExtractNode implements Node {
 
     public static final String INLONG_MSG = "inlong-msg";
 
+    public static final String INLONG_MSG_AUDIT_TIME = "value.data-time";
+
+    public static final String CONSUME_AUDIT_TIME = "consume_time";
+
     @JsonProperty("id")
     private String id;
     @JsonInclude(Include.NON_NULL)
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
index d67ef15a09..327cb522a2 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
@@ -25,6 +25,7 @@ import org.apache.inlong.sort.protocol.Metadata;
 import org.apache.inlong.sort.protocol.constant.TubeMQConstant;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
 import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
 import org.apache.inlong.sort.protocol.transformation.WatermarkField;
 
 import com.google.common.base.Preconditions;
@@ -129,10 +130,14 @@ public class TubeMQExtractNode extends ExtractNode 
implements Serializable, Inlo
         String metadataKey;
         switch (metaField) {
             case AUDIT_DATA_TIME:
-                metadataKey = "value.data-time";
+                if (format instanceof InLongMsgFormat) {
+                    metadataKey = INLONG_MSG_AUDIT_TIME;
+                } else {
+                    metadataKey = CONSUME_AUDIT_TIME;
+                }
                 break;
             default:
-                throw new 
UnsupportedOperationException(String.format("Unsupport meta field for %s: %s",
+                throw new 
UnsupportedOperationException(String.format("Unsupported meta field for %s: %s",
                         this.getClass().getSimpleName(), metaField));
         }
         return metadataKey;
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
index 5cc11b3783..898c56643c 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
@@ -58,6 +58,11 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
 
     </dependencies>
 
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
index abd69f8ecb..b9fb6d1b0d 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sort.tubemq;
 
+import org.apache.inlong.sort.tubemq.table.DynamicTubeMQDeserializationSchema;
 import org.apache.inlong.sort.tubemq.table.TubeMQOptions;
 import org.apache.inlong.tubemq.client.config.ConsumerConfig;
 import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
@@ -27,7 +28,6 @@ import org.apache.inlong.tubemq.corebase.Message;
 import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
 
 import org.apache.flink.api.common.functions.util.ListCollector;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
@@ -92,7 +92,7 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
     /**
      * The deserializer for records.
      */
-    private final DeserializationSchema<T> deserializationSchema;
+    private final DynamicTubeMQDeserializationSchema<T> deserializationSchema;
 
     /**
      * The random key for TubeMQ consumer group when startup.
@@ -158,7 +158,7 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
             String topic,
             TreeSet<String> streamIdSet,
             String consumerGroup,
-            DeserializationSchema<T> deserializationSchema,
+            DynamicTubeMQDeserializationSchema<T> deserializationSchema,
             Configuration configuration,
             String sessionKey,
             Boolean innerFormat) {
@@ -208,7 +208,7 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
     @Override
     public void open(Configuration parameters) throws Exception {
 
-        deserializationSchema.open(null);
+        deserializationSchema.open();
         ConsumerConfig consumerConfig = new ConsumerConfig(masterAddress, 
consumerGroup);
         consumerConfig.setConsumePosition(consumeFromMax
                 ? ConsumePosition.CONSUMER_FROM_LATEST_OFFSET
@@ -292,14 +292,14 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
             lastConsumeInstant = Instant.now();
             if (!innerFormat) {
                 for (Message message : messageList) {
-                    T record = 
deserializationSchema.deserialize(message.getData());
+                    T record = deserializationSchema.deserialize(message);
                     records.add(record);
                 }
             } else {
                 List<RowData> rowDataList = new ArrayList<>();
                 ListCollector<RowData> out = new ListCollector<>(rowDataList);
                 for (Message message : messageList) {
-                    deserializationSchema.deserialize(message.getData(), 
(Collector<T>) out);
+                    deserializationSchema.deserialize(message, (Collector<T>) 
out);
                 }
                 rowDataList.forEach(data -> records.add((T) data));
             }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
index f68bc4cf5e..4c4eaac841 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
@@ -17,143 +17,45 @@
 
 package org.apache.inlong.sort.tubemq.table;
 
-import org.apache.inlong.sort.base.metric.MetricOption;
-import org.apache.inlong.sort.base.metric.MetricsCollector;
-import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.inlong.tubemq.corebase.Message;
 
-import com.google.common.base.Objects;
-import org.apache.flink.api.common.functions.util.ListCollector;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
 
-public class DynamicTubeMQDeserializationSchema implements 
DeserializationSchema<RowData> {
+public interface DynamicTubeMQDeserializationSchema<T> extends Serializable, 
ResultTypeQueryable<T> {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(DynamicTubeMQDeserializationSchema.class);
-    /**
-     * data buffer message
-     */
-    private final DeserializationSchema<RowData> deserializationSchema;
-
-    /**
-     * {@link MetadataConverter} of how to produce metadata from message.
-     */
-    private final MetadataConverter[] metadataConverters;
+    @PublicEvolving
+    default void open() throws Exception {
+    }
 
     /**
-     * {@link TypeInformation} of the produced {@link RowData} (physical + 
meta data).
+     * Deserializes the byte message.
+     *
+     * @param message The message, as a byte array.
+     * @return The deserialized message as an object (null if the message 
cannot be deserialized).
      */
-    private final TypeInformation<RowData> producedTypeInfo;
+    T deserialize(Message message) throws IOException;
 
     /**
-     * status of error
+     * Deserializes the byte message.
+     *
+     * <p>Can output multiple records through the {@link Collector}. Note that 
number and size of
+     * the produced records should be relatively small. Depending on the 
source implementation
+     * records can be buffered in memory or collecting records might delay 
emitting checkpoint
+     * barrier.
+     *
+     * @param message The message, as a byte array.
+     * @param out The collector to put the resulting messages.
      */
-    private final boolean ignoreErrors;
-
-    private SourceMetricData sourceMetricData;
-
-    private MetricOption metricOption;
-
-    public DynamicTubeMQDeserializationSchema(
-            DeserializationSchema<RowData> schema,
-            MetadataConverter[] metadataConverters,
-            TypeInformation<RowData> producedTypeInfo,
-            boolean ignoreErrors,
-            MetricOption metricOption) {
-        this.deserializationSchema = schema;
-        this.metadataConverters = metadataConverters;
-        this.producedTypeInfo = producedTypeInfo;
-        this.ignoreErrors = ignoreErrors;
-        this.metricOption = metricOption;
-    }
-
-    @Override
-    public void open(InitializationContext context) {
-        if (metricOption != null) {
-            sourceMetricData = new SourceMetricData(metricOption);
+    @PublicEvolving
+    default void deserialize(Message message, Collector<T> out) throws 
IOException {
+        T deserialize = deserialize(message);
+        if (deserialize != null) {
+            out.collect(deserialize);
         }
     }
-
-    @Override
-    public RowData deserialize(byte[] bytes) throws IOException {
-        return deserializationSchema.deserialize(bytes);
-    }
-
-    @Override
-    public void deserialize(byte[] message, Collector<RowData> out) throws 
IOException {
-        List<RowData> rows = new ArrayList<>();
-        deserializationSchema.deserialize(message,
-                new MetricsCollector<>(new ListCollector<>(rows), 
sourceMetricData));
-        rows.forEach(out::collect);
-    }
-
-    @Override
-    public boolean isEndOfStream(RowData rowData) {
-        return false;
-    }
-
-    @Override
-    public TypeInformation<RowData> getProducedType() {
-        return producedTypeInfo;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof DynamicTubeMQDeserializationSchema)) {
-            return false;
-        }
-        DynamicTubeMQDeserializationSchema that = 
(DynamicTubeMQDeserializationSchema) o;
-        return ignoreErrors == that.ignoreErrors
-                && 
Objects.equal(Arrays.stream(metadataConverters).collect(Collectors.toList()),
-                        
Arrays.stream(that.metadataConverters).collect(Collectors.toList()))
-                && Objects.equal(deserializationSchema, 
that.deserializationSchema)
-                && Objects.equal(producedTypeInfo, that.producedTypeInfo);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hashCode(deserializationSchema, metadataConverters, 
producedTypeInfo, ignoreErrors);
-    }
-
-    /**
-     * add metadata column
-     */
-    private void emitRow(Message head, GenericRowData physicalRow, 
Collector<RowData> out) {
-        if (metadataConverters.length == 0) {
-            out.collect(physicalRow);
-            return;
-        }
-        final int physicalArity = physicalRow.getArity();
-        final int metadataArity = metadataConverters.length;
-        final GenericRowData producedRow =
-                new GenericRowData(physicalRow.getRowKind(), physicalArity + 
metadataArity);
-        for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {
-            producedRow.setField(physicalPos, 
physicalRow.getField(physicalPos));
-        }
-        for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
-            producedRow.setField(
-                    physicalArity + metadataPos, 
metadataConverters[metadataPos].read(head));
-        }
-        out.collect(producedRow);
-    }
-
-    interface MetadataConverter extends Serializable {
-
-        Object read(Message head);
-    }
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
similarity index 81%
copy from 
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
copy to 
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
index f68bc4cf5e..8ee154c535 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
@@ -29,8 +29,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -39,9 +37,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
 
-public class DynamicTubeMQDeserializationSchema implements 
DeserializationSchema<RowData> {
+public class DynamicTubeMQTableDeserializationSchema implements 
DynamicTubeMQDeserializationSchema<RowData> {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(DynamicTubeMQDeserializationSchema.class);
     /**
      * data buffer message
      */
@@ -62,46 +59,54 @@ public class DynamicTubeMQDeserializationSchema implements 
DeserializationSchema
      */
     private final boolean ignoreErrors;
 
+    private final boolean innerFormat;
+
     private SourceMetricData sourceMetricData;
 
     private MetricOption metricOption;
 
-    public DynamicTubeMQDeserializationSchema(
+    public DynamicTubeMQTableDeserializationSchema(
             DeserializationSchema<RowData> schema,
             MetadataConverter[] metadataConverters,
             TypeInformation<RowData> producedTypeInfo,
             boolean ignoreErrors,
+            boolean innerFormat,
             MetricOption metricOption) {
         this.deserializationSchema = schema;
         this.metadataConverters = metadataConverters;
         this.producedTypeInfo = producedTypeInfo;
         this.ignoreErrors = ignoreErrors;
+        this.innerFormat = innerFormat;
         this.metricOption = metricOption;
     }
 
     @Override
-    public void open(InitializationContext context) {
+    public void open() {
         if (metricOption != null) {
             sourceMetricData = new SourceMetricData(metricOption);
         }
     }
 
     @Override
-    public RowData deserialize(byte[] bytes) throws IOException {
-        return deserializationSchema.deserialize(bytes);
+    public RowData deserialize(Message message) throws IOException {
+        return deserializationSchema.deserialize(message.getData());
     }
 
     @Override
-    public void deserialize(byte[] message, Collector<RowData> out) throws 
IOException {
+    public void deserialize(Message message, Collector<RowData> out) throws 
IOException {
         List<RowData> rows = new ArrayList<>();
-        deserializationSchema.deserialize(message,
-                new MetricsCollector<>(new ListCollector<>(rows), 
sourceMetricData));
-        rows.forEach(out::collect);
-    }
 
-    @Override
-    public boolean isEndOfStream(RowData rowData) {
-        return false;
+        MetricsCollector<RowData> metricsCollector =
+                new MetricsCollector<>(new ListCollector<>(rows), 
sourceMetricData);
+
+        // reset time stamp if the deserialize schema has not inner format
+        if (!innerFormat) {
+            metricsCollector.resetTimestamp(System.currentTimeMillis());
+        }
+        deserializationSchema.deserialize(message.getData(), metricsCollector);
+
+        rows.forEach(row -> emitRow(message, (GenericRowData) row, out));
+
     }
 
     @Override
@@ -114,10 +119,10 @@ public class DynamicTubeMQDeserializationSchema 
implements DeserializationSchema
         if (this == o) {
             return true;
         }
-        if (!(o instanceof DynamicTubeMQDeserializationSchema)) {
+        if (!(o instanceof DynamicTubeMQTableDeserializationSchema)) {
             return false;
         }
-        DynamicTubeMQDeserializationSchema that = 
(DynamicTubeMQDeserializationSchema) o;
+        DynamicTubeMQTableDeserializationSchema that = 
(DynamicTubeMQTableDeserializationSchema) o;
         return ignoreErrors == that.ignoreErrors
                 && 
Objects.equal(Arrays.stream(metadataConverters).collect(Collectors.toList()),
                         
Arrays.stream(that.metadataConverters).collect(Collectors.toList()))
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
index e3a0a0c6d7..4962364a9c 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sort.tubemq.table;
 
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.ConfigOption;
@@ -66,8 +68,6 @@ public class TubeMQDynamicTableFactory implements 
DynamicTableSourceFactory, Dyn
 
     public static final String IDENTIFIER = "tubemq-inlong";
 
-    public static final String INNERFORMATTYPE = "inlong-msg";
-
     public static boolean innerFormat = false;
 
     private static DecodingFormat<DeserializationSchema<RowData>> 
getValueDecodingFormat(
@@ -120,10 +120,10 @@ public class TubeMQDynamicTableFactory implements 
DynamicTableSourceFactory, Dyn
         final DecodingFormat<DeserializationSchema<RowData>> 
valueDecodingFormat = getValueDecodingFormat(helper);
 
         // validate all options
-        helper.validateExcept(INNERFORMATTYPE);
+        helper.validateExcept(ExtractNode.INLONG_MSG);
 
         validatePKConstraints(context.getObjectIdentifier(), 
context.getCatalogTable(), valueDecodingFormat);
-        innerFormat = INNERFORMATTYPE.equals(tableOptions.get(FORMAT));
+        innerFormat = ExtractNode.INLONG_MSG.equals(tableOptions.get(FORMAT));
 
         final Configuration properties = 
getTubeMQProperties(context.getCatalogTable().getOptions());
 
@@ -156,7 +156,7 @@ public class TubeMQDynamicTableFactory implements 
DynamicTableSourceFactory, Dyn
         final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat 
= getValueEncodingFormat(helper);
 
         // validate all options
-        helper.validateExcept(INNERFORMATTYPE);
+        helper.validateExcept(ExtractNode.INLONG_MSG);
 
         validatePKConstraints(context.getObjectIdentifier(), 
context.getCatalogTable(), valueEncodingFormat);
 
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
index 4a1d332ca8..2d5ddbb3d5 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
@@ -18,8 +18,9 @@
 package org.apache.inlong.sort.tubemq.table;
 
 import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
 import org.apache.inlong.sort.tubemq.FlinkTubeMQConsumer;
-import 
org.apache.inlong.sort.tubemq.table.DynamicTubeMQDeserializationSchema.MetadataConverter;
+import 
org.apache.inlong.sort.tubemq.table.DynamicTubeMQTableDeserializationSchema.MetadataConverter;
 import org.apache.inlong.tubemq.corebase.Message;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
@@ -322,8 +323,9 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
                 .withAuditKeys(auditKeys)
                 .build();
 
-        final DeserializationSchema<RowData> tubeMQDeserializer = new 
DynamicTubeMQDeserializationSchema(
-                deserialization, metadataConverters, producedTypeInfo, 
ignoreErrors, metricOption);
+        final DynamicTubeMQDeserializationSchema<RowData> tubeMQDeserializer =
+                new DynamicTubeMQTableDeserializationSchema(
+                        deserialization, metadataConverters, producedTypeInfo, 
ignoreErrors, innerFormat, metricOption);
 
         final FlinkTubeMQConsumer<RowData> tubeMQConsumer = new 
FlinkTubeMQConsumer(masterAddress, topic, streamIdSet,
                 consumerGroup, tubeMQDeserializer, configuration, sessionKey, 
innerFormat);
@@ -336,6 +338,11 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
 
     enum ReadableMetadata {
 
+        CONSUME_TIME(
+                ExtractNode.CONSUME_AUDIT_TIME,
+                DataTypes.BIGINT().notNull(),
+                m -> System.currentTimeMillis()),
+
         TOPIC(
                 "topic",
                 DataTypes.STRING().notNull(),

Reply via email to