This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ec6a8dadd6 [INLONG-9223][Sort] TubeMQ source support InlongAudit 
(#9236)
ec6a8dadd6 is described below

commit ec6a8dadd698d0733f545ea0d1cbee196b549c92
Author: vernedeng <verned...@apache.org>
AuthorDate: Thu Nov 9 10:22:19 2023 +0800

    [INLONG-9223][Sort] TubeMQ source support InlongAudit (#9236)
    
    * [INLONG-9223][Sort] TubeMQ source support InlongAudit
---
 .../protocol/node/extract/TubeMQExtractNode.java   | 31 +++++++++++++-
 .../sort-connectors/tubemq/pom.xml                 | 11 +++++
 .../inlong/sort/tubemq/FlinkTubeMQConsumer.java    | 31 +++++++-------
 .../table/DynamicTubeMQDeserializationSchema.java  | 29 ++++++++++++-
 .../tubemq/table/TubeMQDynamicTableFactory.java    | 43 +++++++++++++++----
 .../sort/tubemq/table/TubeMQTableSource.java       | 49 ++++++++++++++++------
 .../sort-connectors/tubemq/pom.xml                 |  6 +++
 .../inlong/sort/tubemq/FlinkTubeMQConsumer.java    |  1 +
 .../table/DynamicTubeMQDeserializationSchema.java  | 29 ++++++++++++-
 .../tubemq/table/TubeMQDynamicTableFactory.java    | 25 +++++++++--
 .../sort/tubemq/table/TubeMQTableSource.java       | 29 +++++++++++--
 11 files changed, 238 insertions(+), 46 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
index cc6ac2b5f1..d67ef15a09 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
@@ -17,8 +17,11 @@
 
 package org.apache.inlong.sort.protocol.node.extract;
 
+import org.apache.inlong.common.enums.MetaField;
 import org.apache.inlong.sort.formats.util.StringUtils;
 import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.InlongMetric;
+import org.apache.inlong.sort.protocol.Metadata;
 import org.apache.inlong.sort.protocol.constant.TubeMQConstant;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
 import org.apache.inlong.sort.protocol.node.format.Format;
@@ -35,8 +38,10 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeSet;
 
 /**
@@ -45,7 +50,7 @@ import java.util.TreeSet;
 @EqualsAndHashCode(callSuper = true)
 @JsonTypeName("tubeMQExtract")
 @Data
-public class TubeMQExtractNode extends ExtractNode implements Serializable {
+public class TubeMQExtractNode extends ExtractNode implements Serializable, 
InlongMetric, Metadata {
 
     private static final long serialVersionUID = -2544747886429528474L;
 
@@ -119,4 +124,28 @@ public class TubeMQExtractNode extends ExtractNode 
implements Serializable {
         return String.format("table_%s", super.getId());
     }
 
+    @Override
+    public String getMetadataKey(MetaField metaField) {
+        String metadataKey;
+        switch (metaField) {
+            case AUDIT_DATA_TIME:
+                metadataKey = "value.data-time";
+                break;
+            default:
+                throw new 
UnsupportedOperationException(String.format("Unsupport meta field for %s: %s",
+                        this.getClass().getSimpleName(), metaField));
+        }
+        return metadataKey;
+    }
+
+    @Override
+    public boolean isVirtual(MetaField metaField) {
+        return true;
+    }
+
+    @Override
+    public Set<MetaField> supportedMetaFields() {
+        return EnumSet.of(MetaField.AUDIT_DATA_TIME);
+    }
+
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/pom.xml
index d0cca1e07b..19dd75f435 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/pom.xml
@@ -46,6 +46,17 @@
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>flink-connector-base</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
index 96c27d2b9b..abd69f8ecb 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
@@ -68,8 +68,6 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkTubeMQConsumer.class);
     private static final String TUBE_OFFSET_STATE = "tube-offset-state";
-    private static final String SPLIT_COMMA = ",";
-    private static final String SPLIT_COLON = ":";
 
     /**
      * The address of TubeMQ master, format eg: 127.0.0.1:8715,127.0.0.2:8715.
@@ -82,9 +80,9 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
     private final String topic;
 
     /**
-     * The tubemq consumers use this tid set to filter records reading from 
server.
+     * The tubemq consumers use this streamId set to filter records reading 
from server.
      */
-    private final TreeSet<String> tidSet;
+    private final TreeSet<String> streamIdSet;
 
     /**
      * The consumer group name.
@@ -130,7 +128,7 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
     /**
      * The current offsets of partitions which are stored in {@link 
#offsetsState}
      * once a checkpoint is triggered.
-     *
+     * <p>
      * NOTE: The offsets are populated in the main thread and saved in the
      * checkpoint thread. Its usage must be guarded by the checkpoint lock.</p>
      */
@@ -147,18 +145,18 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
     /**
      * Build a TubeMQ source function
      *
-     * @param masterAddress the master address of TubeMQ
-     * @param topic the topic name
-     * @param tidSet the  topic's filter condition items
-     * @param consumerGroup the consumer group name
+     * @param masterAddress         the master address of TubeMQ
+     * @param topic                 the topic name
+     * @param streamIdSet                the  topic's filter condition items
+     * @param consumerGroup         the consumer group name
      * @param deserializationSchema the deserialize schema
-     * @param configuration the configure
-     * @param sessionKey the tube session key
+     * @param configuration         the configure
+     * @param sessionKey            the tube session key
      */
     public FlinkTubeMQConsumer(
             String masterAddress,
             String topic,
-            TreeSet<String> tidSet,
+            TreeSet<String> streamIdSet,
             String consumerGroup,
             DeserializationSchema<T> deserializationSchema,
             Configuration configuration,
@@ -166,14 +164,14 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
             Boolean innerFormat) {
         checkNotNull(masterAddress, "The master address must not be null.");
         checkNotNull(topic, "The topic must not be null.");
-        checkNotNull(tidSet, "The tid set must not be null.");
+        checkNotNull(streamIdSet, "The streamId set must not be null.");
         checkNotNull(consumerGroup, "The consumer group must not be null.");
         checkNotNull(deserializationSchema, "The deserialization schema must 
not be null.");
         checkNotNull(configuration, "The configuration must not be null.");
 
         this.masterAddress = masterAddress;
         this.topic = topic;
-        this.tidSet = tidSet;
+        this.streamIdSet = streamIdSet;
         this.consumerGroup = consumerGroup;
         this.deserializationSchema = deserializationSchema;
         this.sessionKey = sessionKey;
@@ -210,6 +208,7 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
     @Override
     public void open(Configuration parameters) throws Exception {
 
+        deserializationSchema.open(null);
         ConsumerConfig consumerConfig = new ConsumerConfig(masterAddress, 
consumerGroup);
         consumerConfig.setConsumePosition(consumeFromMax
                 ? ConsumePosition.CONSUMER_FROM_LATEST_OFFSET
@@ -220,7 +219,7 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
         final int numTasks = getRuntimeContext().getNumberOfParallelSubtasks();
         messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
         messagePullConsumer = 
messageSessionFactory.createPullConsumer(consumerConfig);
-        messagePullConsumer.subscribe(topic, tidSet);
+        messagePullConsumer.subscribe(topic, streamIdSet);
         String jobId = getRuntimeContext().getJobId().toString();
         messagePullConsumer.completeSubscribe(sessionKey.concat(jobId), 
numTasks, true, currentOffsets);
 
@@ -305,7 +304,9 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
                 rowDataList.forEach(data -> records.add((T) data));
             }
         }
+
         return lastConsumeInstant;
+
     }
 
     @Override
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
index f5880f1a78..f68bc4cf5e 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
@@ -17,22 +17,31 @@
 
 package org.apache.inlong.sort.tubemq.table;
 
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricsCollector;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.inlong.tubemq.corebase.Message;
 
 import com.google.common.base.Objects;
+import org.apache.flink.api.common.functions.util.ListCollector;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.stream.Collectors;
 
 public class DynamicTubeMQDeserializationSchema implements 
DeserializationSchema<RowData> {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(DynamicTubeMQDeserializationSchema.class);
     /**
      * data buffer message
      */
@@ -53,15 +62,28 @@ public class DynamicTubeMQDeserializationSchema implements 
DeserializationSchema
      */
     private final boolean ignoreErrors;
 
+    private SourceMetricData sourceMetricData;
+
+    private MetricOption metricOption;
+
     public DynamicTubeMQDeserializationSchema(
             DeserializationSchema<RowData> schema,
             MetadataConverter[] metadataConverters,
             TypeInformation<RowData> producedTypeInfo,
-            boolean ignoreErrors) {
+            boolean ignoreErrors,
+            MetricOption metricOption) {
         this.deserializationSchema = schema;
         this.metadataConverters = metadataConverters;
         this.producedTypeInfo = producedTypeInfo;
         this.ignoreErrors = ignoreErrors;
+        this.metricOption = metricOption;
+    }
+
+    @Override
+    public void open(InitializationContext context) {
+        if (metricOption != null) {
+            sourceMetricData = new SourceMetricData(metricOption);
+        }
     }
 
     @Override
@@ -71,7 +93,10 @@ public class DynamicTubeMQDeserializationSchema implements 
DeserializationSchema
 
     @Override
     public void deserialize(byte[] message, Collector<RowData> out) throws 
IOException {
-        deserializationSchema.deserialize(message, out);
+        List<RowData> rows = new ArrayList<>();
+        deserializationSchema.deserialize(message,
+                new MetricsCollector<>(new ListCollector<>(rows), 
sourceMetricData));
+        rows.forEach(out::collect);
     }
 
     @Override
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
index 43fb3e198e..6af6f8f645 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sort.tubemq.table;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
@@ -25,6 +26,7 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.format.Format;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
@@ -32,6 +34,7 @@ import 
org.apache.flink.table.factories.DeserializationFormatFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
+import org.apache.flink.table.factories.SerializationFormatFactory;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.RowKind;
 
@@ -41,6 +44,9 @@ import java.util.Set;
 import java.util.TreeSet;
 
 import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 import static 
org.apache.inlong.sort.tubemq.table.TubeMQOptions.BOOTSTRAP_FROM_MAX;
 import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.CONSUME_GROUP;
 import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.KEY_FORMAT;
@@ -68,13 +74,18 @@ public class TubeMQDynamicTableFactory implements 
DynamicTableSourceFactory {
                 .orElseGet(() -> 
helper.discoverDecodingFormat(DeserializationFormatFactory.class, FORMAT));
     }
 
+    private static EncodingFormat<SerializationSchema<RowData>> 
getValueEncodingFormat(
+            TableFactoryHelper helper) {
+        return 
helper.discoverOptionalEncodingFormat(SerializationFormatFactory.class, FORMAT)
+                .orElseGet(() -> 
helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT));
+    }
+
     private static void validatePKConstraints(
             ObjectIdentifier tableName, CatalogTable catalogTable, Format 
format) {
         if (catalogTable.getSchema().getPrimaryKey().isPresent()
                 && format.getChangelogMode().containsOnly(RowKind.INSERT)) {
             Configuration options = 
Configuration.fromMap(catalogTable.getOptions());
             String formatName = 
options.getOptional(FORMAT).orElse(options.get(FORMAT));
-            innerFormat = INNERFORMATTYPE.equals(formatName);
             throw new ValidationException(String.format(
                     "The TubeMQ table '%s' with '%s' format doesn't support 
defining PRIMARY KEY constraint"
                             + " on the table, because it can't guarantee the 
semantic of primary key.",
@@ -110,10 +121,15 @@ public class TubeMQDynamicTableFactory implements 
DynamicTableSourceFactory {
         helper.validateExcept(INNERFORMATTYPE);
 
         validatePKConstraints(context.getObjectIdentifier(), 
context.getCatalogTable(), valueDecodingFormat);
+        innerFormat = INNERFORMATTYPE.equals(tableOptions.get(FORMAT));
 
         final Configuration properties = 
getTubeMQProperties(context.getCatalogTable().getOptions());
 
-        final DataType physicalDataType = 
context.getCatalogTable().getSchema().toPhysicalRowDataType();
+        final DataType physicalDataType = 
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
+
+        String inlongMetric = 
tableOptions.getOptional(INLONG_METRIC).orElse(null);
+        String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
+        String auditKeys = tableOptions.get(AUDIT_KEYS);
 
         return createTubeMQTableSource(
                 physicalDataType,
@@ -123,7 +139,10 @@ public class TubeMQDynamicTableFactory implements 
DynamicTableSourceFactory {
                 TubeMQOptions.getTiSet(tableOptions),
                 TubeMQOptions.getConsumerGroup(tableOptions),
                 TubeMQOptions.getSessionKey(tableOptions),
-                properties);
+                properties,
+                inlongMetric,
+                auditHostAndPorts,
+                auditKeys);
     }
 
     protected TubeMQTableSource createTubeMQTableSource(
@@ -131,23 +150,29 @@ public class TubeMQDynamicTableFactory implements 
DynamicTableSourceFactory {
             DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
             String topic,
             String url,
-            TreeSet<String> tid,
+            TreeSet<String> streamId,
             String consumerGroup,
             String sessionKey,
-            Configuration properties) {
+            Configuration properties,
+            String inlongMetric,
+            String auditHostAndPorts,
+            String auditKeys) {
         return new TubeMQTableSource(
                 physicalDataType,
                 valueDecodingFormat,
                 url,
                 topic,
-                tid,
+                streamId,
                 consumerGroup,
                 sessionKey,
                 properties,
                 null,
                 null,
                 false,
-                innerFormat);
+                innerFormat,
+                inlongMetric,
+                auditHostAndPorts,
+                auditKeys);
     }
 
     @Override
@@ -172,6 +197,10 @@ public class TubeMQDynamicTableFactory implements 
DynamicTableSourceFactory {
         options.add(SESSION_KEY);
         options.add(BOOTSTRAP_FROM_MAX);
         options.add(TOPIC_PATTERN);
+        options.add(AUDIT_KEYS);
+        options.add(INLONG_METRIC);
+        options.add(INLONG_AUDIT);
         return options;
     }
+
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
index c2642fd351..4a1d332ca8 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sort.tubemq.table;
 
+import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.tubemq.FlinkTubeMQConsumer;
 import 
org.apache.inlong.sort.tubemq.table.DynamicTubeMQDeserializationSchema.MetadataConverter;
 import org.apache.inlong.tubemq.corebase.Message;
@@ -40,6 +41,8 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 import org.apache.flink.table.types.utils.DataTypeUtils;
 import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
@@ -62,6 +65,8 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
 
     private static final String VALUE_METADATA_PREFIX = "value.";
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(TubeMQTableSource.class);
+
     // 
--------------------------------------------------------------------------------------------
     // Mutable attributes
     // 
--------------------------------------------------------------------------------------------
@@ -84,9 +89,9 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
      */
     private final String topic;
     /**
-     * The TubeMQ tid filter collection.
+     * The TubeMQ streamId filter collection.
      */
-    private final TreeSet<String> tidSet;
+    private final TreeSet<String> streamIdSet;
     /**
      * The TubeMQ consumer group name.
      */
@@ -120,6 +125,11 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
      * Metadata that is appended at the end of a physical source row.
      */
     protected List<String> metadataKeys;
+
+    private String inlongMetric;
+    private String auditHostAndPorts;
+    private String auditKeys;
+
     /**
      * Watermark strategy that is used to generate per-partition watermark.
      */
@@ -129,15 +139,16 @@ public class TubeMQTableSource implements 
ScanTableSource, SupportsReadingMetada
     public TubeMQTableSource(DataType physicalDataType,
             DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
             String masterAddress, String topic,
-            TreeSet<String> tidSet, String consumerGroup, String sessionKey,
+            TreeSet<String> streamIdSet, String consumerGroup, String 
sessionKey,
             Configuration configuration, @Nullable WatermarkStrategy<RowData> 
watermarkStrategy,
-            Optional<String> proctimeAttribute, Boolean ignoreErrors, Boolean 
innerFormat) {
+            Optional<String> proctimeAttribute, Boolean ignoreErrors, Boolean 
innerFormat,
+            String inlongMetric, String auditHostAndPorts, String auditKeys) {
 
         Preconditions.checkNotNull(physicalDataType, "Physical data type must 
not be null.");
         Preconditions.checkNotNull(valueDecodingFormat, "The deserialization 
schema must not be null.");
         Preconditions.checkNotNull(masterAddress, "The master address must not 
be null.");
         Preconditions.checkNotNull(topic, "The topic must not be null.");
-        Preconditions.checkNotNull(tidSet, "The tid set must not be null.");
+        Preconditions.checkNotNull(streamIdSet, "The streamId set must not be 
null.");
         Preconditions.checkNotNull(consumerGroup, "The consumer group must not 
be null.");
         Preconditions.checkNotNull(configuration, "The configuration must not 
be null.");
 
@@ -147,7 +158,7 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
         this.valueDecodingFormat = valueDecodingFormat;
         this.masterAddress = masterAddress;
         this.topic = topic;
-        this.tidSet = tidSet;
+        this.streamIdSet = streamIdSet;
         this.consumerGroup = consumerGroup;
         this.sessionKey = sessionKey;
         this.configuration = configuration;
@@ -155,6 +166,9 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
         this.proctimeAttribute = proctimeAttribute;
         this.ignoreErrors = ignoreErrors;
         this.innerFormat = innerFormat;
+        this.inlongMetric = inlongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
+        this.auditKeys = auditKeys;
     }
 
     @Override
@@ -167,6 +181,7 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
         final LogicalType physicalType = physicalDataType.getLogicalType();
         final int physicalFieldCount = 
LogicalTypeChecks.getFieldCount(physicalType);
         final IntStream physicalFields = IntStream.range(0, 
physicalFieldCount);
+
         final DeserializationSchema<RowData> deserialization = 
createDeserialization(context,
                 valueDecodingFormat, physicalFields.toArray(), null);
 
@@ -182,8 +197,9 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
     public DynamicTableSource copy() {
         return new TubeMQTableSource(
                 physicalDataType, valueDecodingFormat, masterAddress,
-                topic, tidSet, consumerGroup, sessionKey, configuration,
-                watermarkStrategy, proctimeAttribute, ignoreErrors, 
innerFormat);
+                topic, streamIdSet, consumerGroup, sessionKey, configuration,
+                watermarkStrategy, proctimeAttribute, ignoreErrors, 
innerFormat,
+                inlongMetric, auditHostAndPorts, auditKeys);
     }
 
     @Override
@@ -247,7 +263,7 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
                 && Objects.equals(valueDecodingFormat, 
that.valueDecodingFormat)
                 && Objects.equals(masterAddress, that.masterAddress)
                 && Objects.equals(topic, that.topic)
-                && Objects.equals(String.valueOf(tidSet), 
String.valueOf(that.tidSet))
+                && Objects.equals(String.valueOf(streamIdSet), 
String.valueOf(that.streamIdSet))
                 && Objects.equals(consumerGroup, that.consumerGroup)
                 && Objects.equals(proctimeAttribute, that.proctimeAttribute)
                 && Objects.equals(watermarkStrategy, that.watermarkStrategy);
@@ -260,7 +276,7 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
                 valueDecodingFormat,
                 masterAddress,
                 topic,
-                tidSet,
+                streamIdSet,
                 consumerGroup,
                 configuration,
                 watermarkStrategy,
@@ -273,7 +289,7 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
 
     @Nullable
     private DeserializationSchema<RowData> createDeserialization(
-            DynamicTableSource.Context context,
+            Context context,
             @Nullable DecodingFormat<DeserializationSchema<RowData>> format,
             int[] projection,
             @Nullable String prefix) {
@@ -299,10 +315,17 @@ public class TubeMQTableSource implements 
ScanTableSource, SupportsReadingMetada
                                 .orElseThrow(IllegalStateException::new))
                         .map(m -> m.converter)
                         .toArray(MetadataConverter[]::new);
+
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withAuditAddress(auditHostAndPorts)
+                .withAuditKeys(auditKeys)
+                .build();
+
         final DeserializationSchema<RowData> tubeMQDeserializer = new 
DynamicTubeMQDeserializationSchema(
-                deserialization, metadataConverters, producedTypeInfo, 
ignoreErrors);
+                deserialization, metadataConverters, producedTypeInfo, 
ignoreErrors, metricOption);
 
-        final FlinkTubeMQConsumer<RowData> tubeMQConsumer = new 
FlinkTubeMQConsumer(masterAddress, topic, tidSet,
+        final FlinkTubeMQConsumer<RowData> tubeMQConsumer = new 
FlinkTubeMQConsumer(masterAddress, topic, streamIdSet,
                 consumerGroup, tubeMQDeserializer, configuration, sessionKey, 
innerFormat);
         return tubeMQConsumer;
     }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
index aec6e19919..5cc11b3783 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
@@ -51,6 +51,12 @@
             <groupId>org.apache.inlong</groupId>
             <artifactId>sort-connector-base</artifactId>
             <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>flink-connector-base</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
     </dependencies>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
index cafa653f65..abd69f8ecb 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
@@ -208,6 +208,7 @@ public class FlinkTubeMQConsumer<T> extends 
RichParallelSourceFunction<T>
     @Override
     public void open(Configuration parameters) throws Exception {
 
+        deserializationSchema.open(null);
         ConsumerConfig consumerConfig = new ConsumerConfig(masterAddress, 
consumerGroup);
         consumerConfig.setConsumePosition(consumeFromMax
                 ? ConsumePosition.CONSUMER_FROM_LATEST_OFFSET
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
index f5880f1a78..f68bc4cf5e 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
@@ -17,22 +17,31 @@
 
 package org.apache.inlong.sort.tubemq.table;
 
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricsCollector;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.inlong.tubemq.corebase.Message;
 
 import com.google.common.base.Objects;
+import org.apache.flink.api.common.functions.util.ListCollector;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.stream.Collectors;
 
 public class DynamicTubeMQDeserializationSchema implements 
DeserializationSchema<RowData> {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(DynamicTubeMQDeserializationSchema.class);
     /**
      * data buffer message
      */
@@ -53,15 +62,28 @@ public class DynamicTubeMQDeserializationSchema implements 
DeserializationSchema
      */
     private final boolean ignoreErrors;
 
+    private SourceMetricData sourceMetricData;
+
+    private MetricOption metricOption;
+
     public DynamicTubeMQDeserializationSchema(
             DeserializationSchema<RowData> schema,
             MetadataConverter[] metadataConverters,
             TypeInformation<RowData> producedTypeInfo,
-            boolean ignoreErrors) {
+            boolean ignoreErrors,
+            MetricOption metricOption) {
         this.deserializationSchema = schema;
         this.metadataConverters = metadataConverters;
         this.producedTypeInfo = producedTypeInfo;
         this.ignoreErrors = ignoreErrors;
+        this.metricOption = metricOption;
+    }
+
+    @Override
+    public void open(InitializationContext context) {
+        if (metricOption != null) {
+            sourceMetricData = new SourceMetricData(metricOption);
+        }
     }
 
     @Override
@@ -71,7 +93,10 @@ public class DynamicTubeMQDeserializationSchema implements 
DeserializationSchema
 
     @Override
     public void deserialize(byte[] message, Collector<RowData> out) throws 
IOException {
-        deserializationSchema.deserialize(message, out);
+        List<RowData> rows = new ArrayList<>();
+        deserializationSchema.deserialize(message,
+                new MetricsCollector<>(new ListCollector<>(rows), 
sourceMetricData));
+        rows.forEach(out::collect);
     }
 
     @Override
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
index 0472353037..e3a0a0c6d7 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
@@ -46,6 +46,9 @@ import java.util.Set;
 import java.util.TreeSet;
 
 import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 import static 
org.apache.inlong.sort.tubemq.table.TubeMQOptions.BOOTSTRAP_FROM_MAX;
 import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.CONSUME_GROUP;
 import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.KEY_FORMAT;
@@ -126,6 +129,10 @@ public class TubeMQDynamicTableFactory implements 
DynamicTableSourceFactory, Dyn
 
         final DataType physicalDataType = 
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
 
+        String inlongMetric = 
tableOptions.getOptional(INLONG_METRIC).orElse(null);
+        String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
+        String auditKeys = tableOptions.get(AUDIT_KEYS);
+
         return createTubeMQTableSource(
                 physicalDataType,
                 valueDecodingFormat,
@@ -134,7 +141,10 @@ public class TubeMQDynamicTableFactory implements 
DynamicTableSourceFactory, Dyn
                 TubeMQOptions.getTiSet(tableOptions),
                 TubeMQOptions.getConsumerGroup(tableOptions),
                 TubeMQOptions.getSessionKey(tableOptions),
-                properties);
+                properties,
+                inlongMetric,
+                auditHostAndPorts,
+                auditKeys);
     }
 
     @Override
@@ -171,7 +181,10 @@ public class TubeMQDynamicTableFactory implements 
DynamicTableSourceFactory, Dyn
             TreeSet<String> streamId,
             String consumerGroup,
             String sessionKey,
-            Configuration properties) {
+            Configuration properties,
+            String inlongMetric,
+            String auditHostAndPorts,
+            String auditKeys) {
         return new TubeMQTableSource(
                 physicalDataType,
                 valueDecodingFormat,
@@ -184,7 +197,10 @@ public class TubeMQDynamicTableFactory implements 
DynamicTableSourceFactory, Dyn
                 null,
                 null,
                 false,
-                innerFormat);
+                innerFormat,
+                inlongMetric,
+                auditHostAndPorts,
+                auditKeys);
     }
 
     protected TubeMQTableSink createTubeMQTableSink(
@@ -225,6 +241,9 @@ public class TubeMQDynamicTableFactory implements 
DynamicTableSourceFactory, Dyn
         options.add(SESSION_KEY);
         options.add(BOOTSTRAP_FROM_MAX);
         options.add(TOPIC_PATTERN);
+        options.add(AUDIT_KEYS);
+        options.add(INLONG_METRIC);
+        options.add(INLONG_AUDIT);
         return options;
     }
 
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
index e79685ff70..4a1d332ca8 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sort.tubemq.table;
 
+import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.tubemq.FlinkTubeMQConsumer;
 import 
org.apache.inlong.sort.tubemq.table.DynamicTubeMQDeserializationSchema.MetadataConverter;
 import org.apache.inlong.tubemq.corebase.Message;
@@ -40,6 +41,8 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 import org.apache.flink.table.types.utils.DataTypeUtils;
 import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
@@ -62,6 +65,8 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
 
     private static final String VALUE_METADATA_PREFIX = "value.";
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(TubeMQTableSource.class);
+
     // 
--------------------------------------------------------------------------------------------
     // Mutable attributes
     // 
--------------------------------------------------------------------------------------------
@@ -120,6 +125,11 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
      * Metadata that is appended at the end of a physical source row.
      */
     protected List<String> metadataKeys;
+
+    private String inlongMetric;
+    private String auditHostAndPorts;
+    private String auditKeys;
+
     /**
      * Watermark strategy that is used to generate per-partition watermark.
      */
@@ -131,7 +141,8 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
             String masterAddress, String topic,
             TreeSet<String> streamIdSet, String consumerGroup, String 
sessionKey,
             Configuration configuration, @Nullable WatermarkStrategy<RowData> 
watermarkStrategy,
-            Optional<String> proctimeAttribute, Boolean ignoreErrors, Boolean 
innerFormat) {
+            Optional<String> proctimeAttribute, Boolean ignoreErrors, Boolean 
innerFormat,
+            String inlongMetric, String auditHostAndPorts, String auditKeys) {
 
         Preconditions.checkNotNull(physicalDataType, "Physical data type must 
not be null.");
         Preconditions.checkNotNull(valueDecodingFormat, "The deserialization 
schema must not be null.");
@@ -155,6 +166,9 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
         this.proctimeAttribute = proctimeAttribute;
         this.ignoreErrors = ignoreErrors;
         this.innerFormat = innerFormat;
+        this.inlongMetric = inlongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
+        this.auditKeys = auditKeys;
     }
 
     @Override
@@ -167,6 +181,7 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
         final LogicalType physicalType = physicalDataType.getLogicalType();
         final int physicalFieldCount = 
LogicalTypeChecks.getFieldCount(physicalType);
         final IntStream physicalFields = IntStream.range(0, 
physicalFieldCount);
+
         final DeserializationSchema<RowData> deserialization = 
createDeserialization(context,
                 valueDecodingFormat, physicalFields.toArray(), null);
 
@@ -183,7 +198,8 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
         return new TubeMQTableSource(
                 physicalDataType, valueDecodingFormat, masterAddress,
                 topic, streamIdSet, consumerGroup, sessionKey, configuration,
-                watermarkStrategy, proctimeAttribute, ignoreErrors, 
innerFormat);
+                watermarkStrategy, proctimeAttribute, ignoreErrors, 
innerFormat,
+                inlongMetric, auditHostAndPorts, auditKeys);
     }
 
     @Override
@@ -299,8 +315,15 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
                                 .orElseThrow(IllegalStateException::new))
                         .map(m -> m.converter)
                         .toArray(MetadataConverter[]::new);
+
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withAuditAddress(auditHostAndPorts)
+                .withAuditKeys(auditKeys)
+                .build();
+
         final DeserializationSchema<RowData> tubeMQDeserializer = new 
DynamicTubeMQDeserializationSchema(
-                deserialization, metadataConverters, producedTypeInfo, 
ignoreErrors);
+                deserialization, metadataConverters, producedTypeInfo, 
ignoreErrors, metricOption);
 
         final FlinkTubeMQConsumer<RowData> tubeMQConsumer = new 
FlinkTubeMQConsumer(masterAddress, topic, streamIdSet,
                 consumerGroup, tubeMQDeserializer, configuration, sessionKey, 
innerFormat);


Reply via email to