This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 12e1a46dc4 [INLONG-8922][Sort] Add TubeMQ source and sink connector on 
flink 1.15 (#9031)
12e1a46dc4 is described below

commit 12e1a46dc43d74acc489798cdd1c6b96b84a2044
Author: Zfancy <47296299+fancycode...@users.noreply.github.com>
AuthorDate: Mon Oct 16 15:58:08 2023 +0800

    [INLONG-8922][Sort] Add TubeMQ source and sink connector on flink 1.15 
(#9031)
---
 .../src/main/assemblies/sort-connectors-v1.15.xml  |   8 +
 .../sort-flink-v1.15/sort-connectors/pom.xml       |   1 +
 .../sort-connectors/tubemq/pom.xml                 | 107 +++++++
 .../inlong/sort/tubemq/FlinkTubeMQConsumer.java    | 351 +++++++++++++++++++++
 .../inlong/sort/tubemq/FlinkTubeMQProducer.java    | 175 ++++++++++
 .../table/DynamicTubeMQDeserializationSchema.java  | 134 ++++++++
 .../table/DynamicTubeMQSerializationSchema.java    |  35 ++
 .../tubemq/table/TubeMQDynamicTableFactory.java    | 233 ++++++++++++++
 .../inlong/sort/tubemq/table/TubeMQOptions.java    | 289 +++++++++++++++++
 .../inlong/sort/tubemq/table/TubeMQTableSink.java  | 131 ++++++++
 .../sort/tubemq/table/TubeMQTableSource.java       | 341 ++++++++++++++++++++
 .../org.apache.flink.table.factories.Factory       |  16 +
 12 files changed, 1821 insertions(+)

diff --git a/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml 
b/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
index 90dfa48893..d71d98dbfa 100644
--- a/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
+++ b/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
@@ -83,5 +83,13 @@
             </includes>
             <fileMode>0644</fileMode>
         </fileSet>
+        <fileSet>
+            
<directory>../inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/target</directory>
+            <outputDirectory>inlong-sort/connectors</outputDirectory>
+            <includes>
+                
<include>sort-connector-tubemq-v1.15-${project.version}.jar</include>
+            </includes>
+            <fileMode>0644</fileMode>
+        </fileSet>
     </fileSets>
 </assembly>
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
index 82a4b2949e..151ed4757b 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
@@ -39,6 +39,7 @@
         <module>iceberg</module>
         <module>mongodb-cdc</module>
         <module>pulsar</module>
+        <module>tubemq</module>
     </modules>
 
     <properties>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
new file mode 100644
index 0000000000..ba5cce64a9
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
@@ -0,0 +1,107 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements. See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License. You may obtain a copy of the License at
+  ~
+  ~ http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.inlong</groupId>
+        <artifactId>sort-connectors-v1.15</artifactId>
+        <version>1.10.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>sort-connector-tubemq-v1.15</artifactId>
+    <packaging>jar</packaging>
+    <name>Apache InLong - Sort-connector-tubemq</name>
+
+    <properties>
+        
<inlong.root.dir>${project.parent.parent.parent.parent.parent.basedir}</inlong.root.dir>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>tubemq-client</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>${plugin.shade.version}</version>
+
+                <executions>
+                    <execution>
+                        <id>shade-flink</id>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+
+                        <configuration>
+
+                            <artifactSet>
+                                <includes>
+                                    <include>org.apache.inlong:*</include>
+                                    <include>com.fasterxml.*:*</include>
+                                </includes>
+                            </artifactSet>
+
+                            <filters>
+                                <filter>
+                                    
<artifact>org.apache.inlong:sort-connector-*</artifact>
+                                    <includes>
+                                        <include>org/apache/inlong/**</include>
+                                        
<include>META-INF/services/org.apache.flink.table.factories.Factory</include>
+                                    </includes>
+                                </filter>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <relocations>
+                                <relocation>
+                                    
<pattern>org.apache.inlong.sort.base</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.tubemq.shaded.org.apache.inlong.sort.base</shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
new file mode 100644
index 0000000000..b890561681
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tubemq;
+
+import org.apache.inlong.sort.tubemq.table.TubeMQOptions;
+import org.apache.inlong.tubemq.client.config.ConsumerConfig;
+import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
+import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
+import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
+import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.inlong.tubemq.corebase.Message;
+import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
+
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.TimeUtils.parseDuration;
+
+/**
+ * The Flink TubeMQ Consumer.
+ *
+ * @param <T> The type of records produced by this data source
+ */
+public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T>
+        implements
+            CheckpointedFunction {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkTubeMQConsumer.class);
+    private static final String TUBE_OFFSET_STATE = "tube-offset-state";
+
+    /**
+     * The address of TubeMQ master, format eg: 127.0.0.1:8715,127.0.0.2:8715.
+     */
+    private final String masterAddress;
+
+    /**
+     * The topic name.
+     */
+    private final String topic;
+
+    /**
+     * The tubemq consumers use this tid set to filter records reading from 
server.
+     */
+    private final TreeSet<String> tidSet;
+
+    /**
+     * The consumer group name.
+     */
+    private final String consumerGroup;
+
+    /**
+     * The deserializer for records.
+     */
+    private final DeserializationSchema<T> deserializationSchema;
+
+    /**
+     * The random key for TubeMQ consumer group when startup.
+     */
+    private final String sessionKey;
+
+    /**
+     * True if consuming message from max offset.
+     */
+    private final boolean consumeFromMax;
+
+    /**
+     * The time to wait if tubemq broker returns message not found.
+     */
+    private final Duration messageNotFoundWaitPeriod;
+
+    /**
+     * The max time to marked source idle.
+     */
+    private final Duration maxIdleTime;
+    /**
+     * The InLong inner format.
+     */
+    private final boolean innerFormat;
+    /**
+     * Flag indicating whether the consumer is still running.
+     **/
+    private volatile boolean running;
+    /**
+     * The state for the offsets of queues.
+     */
+    private transient ListState<Tuple2<String, Long>> offsetsState;
+    /**
+     * The current offsets of partitions which are stored in {@link 
#offsetsState}
+     * once a checkpoint is triggered.
+     * <p>
+     * NOTE: The offsets are populated in the main thread and saved in the
+     * checkpoint thread. Its usage must be guarded by the checkpoint lock.</p>
+     */
+    private transient Map<String, Long> currentOffsets;
+    /**
+     * The TubeMQ session factory.
+     */
+    private transient TubeSingleSessionFactory messageSessionFactory;
+    /**
+     * The TubeMQ pull consumer.
+     */
+    private transient PullMessageConsumer messagePullConsumer;
+
+    /**
+     * Build a TubeMQ source function
+     *
+     * @param masterAddress         the master address of TubeMQ
+     * @param topic                 the topic name
+     * @param tidSet                the  topic's filter condition items
+     * @param consumerGroup         the consumer group name
+     * @param deserializationSchema the deserialize schema
+     * @param configuration         the configure
+     * @param sessionKey            the tube session key
+     */
+    public FlinkTubeMQConsumer(
+            String masterAddress,
+            String topic,
+            TreeSet<String> tidSet,
+            String consumerGroup,
+            DeserializationSchema<T> deserializationSchema,
+            Configuration configuration,
+            String sessionKey,
+            Boolean innerFormat) {
+        checkNotNull(masterAddress, "The master address must not be null.");
+        checkNotNull(topic, "The topic must not be null.");
+        checkNotNull(tidSet, "The tid set must not be null.");
+        checkNotNull(consumerGroup, "The consumer group must not be null.");
+        checkNotNull(deserializationSchema, "The deserialization schema must 
not be null.");
+        checkNotNull(configuration, "The configuration must not be null.");
+
+        this.masterAddress = masterAddress;
+        this.topic = topic;
+        this.tidSet = tidSet;
+        this.consumerGroup = consumerGroup;
+        this.deserializationSchema = deserializationSchema;
+        this.sessionKey = sessionKey;
+
+        // those param set default
+        this.consumeFromMax = 
configuration.getBoolean(TubeMQOptions.BOOTSTRAP_FROM_MAX);
+        this.messageNotFoundWaitPeriod = parseDuration(configuration.getString(
+                TubeMQOptions.MESSAGE_NOT_FOUND_WAIT_PERIOD));
+        this.maxIdleTime = parseDuration(configuration.getString(
+                TubeMQOptions.SOURCE_MAX_IDLE_TIME));
+        this.innerFormat = innerFormat;
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        TypeInformation<Tuple2<String, Long>> typeInformation =
+                new TupleTypeInfo<>(STRING_TYPE_INFO, LONG_TYPE_INFO);
+        ListStateDescriptor<Tuple2<String, Long>> stateDescriptor =
+                new ListStateDescriptor<>(TUBE_OFFSET_STATE, typeInformation);
+
+        OperatorStateStore stateStore = context.getOperatorStateStore();
+        offsetsState = stateStore.getListState(stateDescriptor);
+        currentOffsets = new HashMap<>();
+        if (context.isRestored()) {
+            for (Tuple2<String, Long> tubeOffset : offsetsState.get()) {
+                currentOffsets.put(tubeOffset.f0, tubeOffset.f1);
+            }
+            LOG.info("Successfully restore the offsets {}.", currentOffsets);
+        } else {
+            LOG.info("No restore offsets.");
+        }
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        ConsumerConfig consumerConfig = new ConsumerConfig(masterAddress, 
consumerGroup);
+        consumerConfig.setConsumePosition(consumeFromMax
+                ? ConsumePosition.CONSUMER_FROM_MAX_OFFSET_ALWAYS
+                : ConsumePosition.CONSUMER_FROM_FIRST_OFFSET);
+
+        
consumerConfig.setMsgNotFoundWaitPeriodMs(messageNotFoundWaitPeriod.toMillis());
+
+        final int numTasks = getRuntimeContext().getNumberOfParallelSubtasks();
+        messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
+        messagePullConsumer = 
messageSessionFactory.createPullConsumer(consumerConfig);
+        messagePullConsumer.subscribe(topic, tidSet);
+        messagePullConsumer.completeSubscribe(sessionKey, numTasks, true, 
currentOffsets);
+
+        running = true;
+    }
+
+    @Override
+    public void run(SourceContext<T> ctx) throws Exception {
+
+        Instant lastConsumeInstant = Instant.now();
+
+        while (running) {
+            ConsumerResult consumeResult = messagePullConsumer.getMessage();
+            if (!consumeResult.isSuccess()) {
+                if (!(consumeResult.getErrCode() == 
TErrCodeConstants.BAD_REQUEST
+                        || consumeResult.getErrCode() == 
TErrCodeConstants.NOT_FOUND
+                        || consumeResult.getErrCode() == 
TErrCodeConstants.ALL_PARTITION_FROZEN
+                        || consumeResult.getErrCode() == 
TErrCodeConstants.NO_PARTITION_ASSIGNED
+                        || consumeResult.getErrCode() == 
TErrCodeConstants.ALL_PARTITION_WAITING
+                        || consumeResult.getErrCode() == 
TErrCodeConstants.ALL_PARTITION_INUSE)) {
+                    LOG.info("Could not consume messages from tubemq (errcode: 
{},errmsg: {}).",
+                            consumeResult.getErrCode(),
+                            consumeResult.getErrMsg());
+                }
+
+                Duration idleTime = Duration.between(lastConsumeInstant, 
Instant.now());
+                if (idleTime.compareTo(maxIdleTime) > 0) {
+                    ctx.markAsTemporarilyIdle();
+                }
+
+                continue;
+            }
+
+            List<Message> messageList = consumeResult.getMessageList();
+            lastConsumeInstant = Instant.now();
+
+            List<T> records = new ArrayList<>();
+            lastConsumeInstant = getRecords(lastConsumeInstant, messageList, 
records);
+
+            synchronized (ctx.getCheckpointLock()) {
+
+                for (T record : records) {
+                    ctx.collect(record);
+                }
+                currentOffsets.put(
+                        consumeResult.getPartitionKey(),
+                        consumeResult.getCurrOffset());
+            }
+
+            ConsumerResult confirmResult = messagePullConsumer
+                    .confirmConsume(consumeResult.getConfirmContext(), true);
+            if (!confirmResult.isSuccess()) {
+                if (!(confirmResult.getErrCode() == 
TErrCodeConstants.BAD_REQUEST
+                        || confirmResult.getErrCode() == 
TErrCodeConstants.NOT_FOUND
+                        || confirmResult.getErrCode() == 
TErrCodeConstants.ALL_PARTITION_FROZEN
+                        || confirmResult.getErrCode() == 
TErrCodeConstants.NO_PARTITION_ASSIGNED
+                        || confirmResult.getErrCode() == 
TErrCodeConstants.ALL_PARTITION_WAITING
+                        || confirmResult.getErrCode() == 
TErrCodeConstants.ALL_PARTITION_INUSE)) {
+                    LOG.warn("Could not confirm messages to tubemq (errcode: 
{},errmsg: {}).",
+                            confirmResult.getErrCode(),
+                            confirmResult.getErrMsg());
+                }
+            }
+        }
+    }
+
+    private Instant getRecords(Instant lastConsumeInstant, List<Message> 
messageList, List<T> records)
+            throws Exception {
+        if (messageList != null) {
+            lastConsumeInstant = Instant.now();
+            if (!innerFormat) {
+                for (Message message : messageList) {
+                    T record = 
deserializationSchema.deserialize(message.getData());
+                    records.add(record);
+                }
+            } else {
+                List<RowData> rowDataList = new ArrayList<>();
+                ListCollector<RowData> out = new ListCollector<>(rowDataList);
+                for (Message message : messageList) {
+                    deserializationSchema.deserialize(message.getData(), 
(Collector<T>) out);
+                }
+                rowDataList.forEach(data -> records.add((T) data));
+            }
+        }
+
+        return lastConsumeInstant;
+
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+
+        offsetsState.clear();
+        for (Map.Entry<String, Long> entry : currentOffsets.entrySet()) {
+            offsetsState.add(new Tuple2<>(entry.getKey(), entry.getValue()));
+        }
+
+        LOG.info("Successfully save the offsets in checkpoint {}: {}.",
+                context.getCheckpointId(), currentOffsets);
+    }
+
+    @Override
+    public void cancel() {
+        running = false;
+    }
+
+    @Override
+    public void close() throws Exception {
+
+        cancel();
+
+        if (messagePullConsumer != null) {
+            try {
+                messagePullConsumer.shutdown();
+            } catch (Throwable t) {
+                LOG.warn("Could not properly shutdown the tubemq pull 
consumer.", t);
+            }
+        }
+
+        if (messageSessionFactory != null) {
+            try {
+                messageSessionFactory.shutdown();
+            } catch (Throwable t) {
+                LOG.warn("Could not properly shutdown the tubemq session 
factory.", t);
+            }
+        }
+
+        super.close();
+
+        LOG.info("Closed the tubemq source.");
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQProducer.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQProducer.java
new file mode 100644
index 0000000000..fb2f624961
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQProducer.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tubemq;
+
+import org.apache.inlong.sort.tubemq.table.TubeMQOptions;
+import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
+import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.inlong.tubemq.client.producer.MessageProducer;
+import org.apache.inlong.tubemq.client.producer.MessageSentResult;
+import org.apache.inlong.tubemq.corebase.Message;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.TreeSet;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class FlinkTubeMQProducer<T> extends RichSinkFunction<T> implements 
CheckpointedFunction {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkTubeMQProducer.class);
+
+    private static final String SYSTEM_HEADER_TIME_FORMAT = "yyyyMMddHHmm";
+
+    /**
+     * The address of tubemq master, format eg: 127.0.0.1:8080,127.0.0.2:8081.
+     */
+    private final String masterAddress;
+
+    /**
+     * The topic name.
+     */
+    private final String topic;
+
+    /**
+     * The tubemq consumers use this tid set to filter records reading from 
server.
+     */
+    private final TreeSet<String> tidSet;
+    /**
+     * The serializer for the records sent to tube.
+     */
+    private final SerializationSchema<T> serializationSchema;
+
+    /**
+     * The tubemq producer.
+     */
+    private transient MessageProducer producer;
+
+    /**
+     * The tubemq session factory.
+     */
+    private transient MessageSessionFactory sessionFactory;
+
+    /**
+     * The maximum number of retries.
+     */
+    private final int maxRetries;
+
+    public FlinkTubeMQProducer(String topic,
+            String masterAddress,
+            SerializationSchema<T> serializationSchema,
+            TreeSet<String> tidSet,
+            Configuration configuration) {
+        checkNotNull(topic, "The topic must not be null.");
+        checkNotNull(masterAddress, "The master address must not be null.");
+        checkNotNull(serializationSchema, "The serialization schema must not 
be null.");
+        checkNotNull(tidSet, "The tid set must not be null.");
+        checkNotNull(configuration, "The configuration must not be null.");
+
+        int max_retries = configuration.getInteger(TubeMQOptions.MAX_RETRIES);
+        checkArgument(max_retries > 0);
+
+        this.topic = topic;
+        this.masterAddress = masterAddress;
+        this.serializationSchema = serializationSchema;
+        this.tidSet = tidSet;
+        this.maxRetries = max_retries;
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) 
{
+        // Nothing to do.
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext 
functionInitializationContext) {
+        // Nothing to do.
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        super.open(parameters);
+
+        TubeClientConfig tubeClientConfig = new 
TubeClientConfig(masterAddress);
+        this.sessionFactory = new TubeSingleSessionFactory(tubeClientConfig);
+        this.producer = sessionFactory.createProducer();
+        HashSet<String> hashSet = new HashSet<>();
+        hashSet.add(topic);
+        producer.publish(hashSet);
+    }
+
+    @Override
+    public void invoke(T in, SinkFunction.Context context) throws Exception {
+
+        int retries = 0;
+        Exception exception = null;
+
+        while (maxRetries <= 0 || retries < maxRetries) {
+
+            try {
+                byte[] body = serializationSchema.serialize(in);
+                Message message = new Message(topic, body);
+                MessageSentResult sendResult = producer.sendMessage(message);
+                if (sendResult.isSuccess()) {
+                    return;
+                } else {
+                    LOG.warn("Send msg fail, error code: {}, error message: 
{}",
+                            sendResult.getErrCode(), sendResult.getErrMsg());
+                }
+            } catch (Exception e) {
+                LOG.warn("Could not properly send the message to tube "
+                        + "(retries: {}).", retries, e);
+
+                retries++;
+                exception = ExceptionUtils.firstOrSuppressed(e, exception);
+            }
+        }
+
+        throw new IOException("Could not properly send the message to tube.", 
exception);
+    }
+
+    @Override
+    public void close() throws Exception {
+
+        try {
+            if (producer != null) {
+                producer.shutdown();
+            }
+            if (sessionFactory != null) {
+                sessionFactory.shutdown();
+            }
+        } catch (Throwable e) {
+            LOG.error("Shutdown producer error", e);
+        } finally {
+            super.close();
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
new file mode 100644
index 0000000000..f5880f1a78
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tubemq.table;
+
+import org.apache.inlong.tubemq.corebase.Message;
+
+import com.google.common.base.Objects;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+public class DynamicTubeMQDeserializationSchema implements 
DeserializationSchema<RowData> {
+
+    /**
+     * data buffer message
+     */
+    private final DeserializationSchema<RowData> deserializationSchema;
+
+    /**
+     * {@link MetadataConverter} of how to produce metadata from message.
+     */
+    private final MetadataConverter[] metadataConverters;
+
+    /**
+     * {@link TypeInformation} of the produced {@link RowData} (physical + 
meta data).
+     */
+    private final TypeInformation<RowData> producedTypeInfo;
+
+    /**
+     * status of error
+     */
+    private final boolean ignoreErrors;
+
+    public DynamicTubeMQDeserializationSchema(
+            DeserializationSchema<RowData> schema,
+            MetadataConverter[] metadataConverters,
+            TypeInformation<RowData> producedTypeInfo,
+            boolean ignoreErrors) {
+        this.deserializationSchema = schema;
+        this.metadataConverters = metadataConverters;
+        this.producedTypeInfo = producedTypeInfo;
+        this.ignoreErrors = ignoreErrors;
+    }
+
+    @Override
+    public RowData deserialize(byte[] bytes) throws IOException {
+        return deserializationSchema.deserialize(bytes);
+    }
+
+    @Override
+    public void deserialize(byte[] message, Collector<RowData> out) throws 
IOException {
+        deserializationSchema.deserialize(message, out);
+    }
+
+    @Override
+    public boolean isEndOfStream(RowData rowData) {
+        return false;
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return producedTypeInfo;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof DynamicTubeMQDeserializationSchema)) {
+            return false;
+        }
+        DynamicTubeMQDeserializationSchema that = 
(DynamicTubeMQDeserializationSchema) o;
+        return ignoreErrors == that.ignoreErrors
+                && 
Objects.equal(Arrays.stream(metadataConverters).collect(Collectors.toList()),
+                        
Arrays.stream(that.metadataConverters).collect(Collectors.toList()))
+                && Objects.equal(deserializationSchema, 
that.deserializationSchema)
+                && Objects.equal(producedTypeInfo, that.producedTypeInfo);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(deserializationSchema, metadataConverters, 
producedTypeInfo, ignoreErrors);
+    }
+
+    /**
+     * add metadata column
+     */
+    private void emitRow(Message head, GenericRowData physicalRow, 
Collector<RowData> out) {
+        if (metadataConverters.length == 0) {
+            out.collect(physicalRow);
+            return;
+        }
+        final int physicalArity = physicalRow.getArity();
+        final int metadataArity = metadataConverters.length;
+        final GenericRowData producedRow =
+                new GenericRowData(physicalRow.getRowKind(), physicalArity + 
metadataArity);
+        for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {
+            producedRow.setField(physicalPos, 
physicalRow.getField(physicalPos));
+        }
+        for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
+            producedRow.setField(
+                    physicalArity + metadataPos, 
metadataConverters[metadataPos].read(head));
+        }
+        out.collect(producedRow);
+    }
+
+    interface MetadataConverter extends Serializable {
+
+        Object read(Message head);
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQSerializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQSerializationSchema.java
new file mode 100644
index 0000000000..a1f95fcdd6
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQSerializationSchema.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tubemq.table;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.data.RowData;
+
+public class DynamicTubeMQSerializationSchema implements 
SerializationSchema<RowData> {
+
+    private final SerializationSchema<RowData> serializationSchema;
+
+    public DynamicTubeMQSerializationSchema(SerializationSchema<RowData> 
serializationSchema) {
+        this.serializationSchema = serializationSchema;
+    }
+
+    @Override
+    public byte[] serialize(RowData element) {
+        return serializationSchema.serialize(element);
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
new file mode 100644
index 0000000000..17275d8d11
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tubemq.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
+import static 
org.apache.inlong.sort.tubemq.table.TubeMQOptions.BOOTSTRAP_FROM_MAX;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.GROUP_NAME;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.KEY_FORMAT;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.MASTER_RPC;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.SESSION_KEY;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.STREAMID;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.TOPIC;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.TOPIC_PATTERN;
+import static 
org.apache.inlong.sort.tubemq.table.TubeMQOptions.getTubeMQProperties;
+
+/**
+ * A dynamic table factory implementation for TubeMQ.
+ */
+public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory, 
DynamicTableSinkFactory {
+
+    public static final String IDENTIFIER = "tubemq";
+
+    public static final List<String> INNERFORMATTYPE = 
Arrays.asList("inlong-msg");
+
+    public static boolean innerFormat = false;
+
+    private static DecodingFormat<DeserializationSchema<RowData>> 
getValueDecodingFormat(
+            TableFactoryHelper helper) {
+        return 
helper.discoverOptionalDecodingFormat(DeserializationFormatFactory.class, 
FORMAT)
+                .orElseGet(() -> 
helper.discoverDecodingFormat(DeserializationFormatFactory.class, FORMAT));
+    }
+
+    private static EncodingFormat<SerializationSchema<RowData>> 
getValueEncodingFormat(
+            TableFactoryHelper helper) {
+        return 
helper.discoverOptionalEncodingFormat(SerializationFormatFactory.class, FORMAT)
+                .orElseGet(() -> 
helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT));
+    }
+
+    private static void validatePKConstraints(
+            ObjectIdentifier tableName, CatalogTable catalogTable, Format 
format) {
+        if (catalogTable.getSchema().getPrimaryKey().isPresent()
+                && format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+            Configuration options = 
Configuration.fromMap(catalogTable.getOptions());
+            String formatName = 
options.getOptional(FORMAT).orElse(options.get(FORMAT));
+            innerFormat = INNERFORMATTYPE.contains(formatName);
+            throw new ValidationException(String.format(
+                    "The TubeMQ table '%s' with '%s' format doesn't support 
defining PRIMARY KEY constraint"
+                            + " on the table, because it can't guarantee the 
semantic of primary key.",
+                    tableName.asSummaryString(), formatName));
+        }
+    }
+
+    private static Optional<DecodingFormat<DeserializationSchema<RowData>>> 
getKeyDecodingFormat(
+            TableFactoryHelper helper) {
+        final Optional<DecodingFormat<DeserializationSchema<RowData>>> 
keyDecodingFormat = helper
+                
.discoverOptionalDecodingFormat(DeserializationFormatFactory.class, KEY_FORMAT);
+        keyDecodingFormat.ifPresent(format -> {
+            if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+                throw new ValidationException(String.format(
+                        "A key format should only deal with INSERT-only 
records. "
+                                + "But %s has a changelog mode of %s.",
+                        helper.getOptions().get(KEY_FORMAT),
+                        format.getChangelogMode()));
+            }
+        });
+        return keyDecodingFormat;
+    }
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        final TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+
+        final ReadableConfig tableOptions = helper.getOptions();
+
+        final DecodingFormat<DeserializationSchema<RowData>> 
valueDecodingFormat = getValueDecodingFormat(helper);
+
+        // validate all options
+        helper.validate();
+
+        validatePKConstraints(context.getObjectIdentifier(), 
context.getCatalogTable(), valueDecodingFormat);
+
+        final Configuration properties = 
getTubeMQProperties(context.getCatalogTable().getOptions());
+
+        final DataType physicalDataType = 
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
+
+        return createTubeMQTableSource(
+                physicalDataType,
+                valueDecodingFormat,
+                TubeMQOptions.getSourceTopics(tableOptions),
+                TubeMQOptions.getMasterRpcAddress(tableOptions),
+                TubeMQOptions.getTiSet(tableOptions),
+                TubeMQOptions.getConsumerGroup(tableOptions),
+                TubeMQOptions.getSessionKey(tableOptions),
+                properties);
+    }
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        final TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+
+        final ReadableConfig tableOptions = helper.getOptions();
+
+        final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat 
= getValueEncodingFormat(helper);
+
+        // validate all options
+        helper.validate();
+
+        validatePKConstraints(context.getObjectIdentifier(), 
context.getCatalogTable(), valueEncodingFormat);
+
+        final Configuration properties = 
getTubeMQProperties(context.getCatalogTable().getOptions());
+
+        final DataType physicalDataType = 
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
+
+        return createTubeMQTableSink(
+                physicalDataType,
+                valueEncodingFormat,
+                TubeMQOptions.getSinkTopics(tableOptions),
+                TubeMQOptions.getMasterRpcAddress(tableOptions),
+                TubeMQOptions.getTiSet(tableOptions),
+                properties);
+    }
+
+    protected TubeMQTableSource createTubeMQTableSource(
+            DataType physicalDataType,
+            DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
+            String topic,
+            String url,
+            TreeSet<String> streamId,
+            String consumerGroup,
+            String sessionKey,
+            Configuration properties) {
+        return new TubeMQTableSource(
+                physicalDataType,
+                valueDecodingFormat,
+                url,
+                topic,
+                streamId,
+                consumerGroup,
+                sessionKey,
+                properties,
+                null,
+                null,
+                false,
+                innerFormat);
+    }
+
+    protected TubeMQTableSink createTubeMQTableSink(
+            DataType physicalDataType,
+            EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat,
+            String topic,
+            String masterAddress,
+            TreeSet<String> streamId,
+            Configuration configuration) {
+        return new TubeMQTableSink(
+                physicalDataType,
+                valueEncodingFormat,
+                topic,
+                masterAddress,
+                streamId,
+                configuration);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(MASTER_RPC);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(FORMAT);
+        options.add(TOPIC);
+        options.add(GROUP_NAME);
+        options.add(STREAMID);
+        options.add(SESSION_KEY);
+        options.add(BOOTSTRAP_FROM_MAX);
+        options.add(TOPIC_PATTERN);
+        return options;
+    }
+
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
new file mode 100644
index 0000000000..0085100c87
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tubemq.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.description.Description;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+
+import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
+
+/**
+ * Option utils for tubeMQ table source and sink.
+ */
+public class TubeMQOptions {
+
+    // 
--------------------------------------------------------------------------------------------
+    // Option enumerations
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final String PROPERTIES_PREFIX = "properties.";
+
+    // Start up offset.
+    // Always start from the max consume position.
+    public static final String CONSUMER_FROM_MAX_OFFSET_ALWAYS = "max";
+    // Start from the latest position for the first time. Otherwise start from 
last consume position.
+    public static final String CONSUMER_FROM_LATEST_OFFSET = "latest";
+    // Start from 0 for the first time. Otherwise start from last consume 
position.
+    public static final String CONSUMER_FROM_FIRST_OFFSET = "earliest";
+
+    // 
--------------------------------------------------------------------------------------------
+    // Format options
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final ConfigOption<String> KEY_FORMAT = ConfigOptions
+            .key("key." + FORMAT.key())
+            .stringType()
+            .noDefaultValue()
+            .withDescription("Defines the format identifier for encoding key 
data. "
+                    + "The identifier is used to discover a suitable format 
factory.");
+
+    public static final ConfigOption<List<String>> KEY_FIELDS =
+            ConfigOptions.key("key.fields")
+                    .stringType()
+                    .asList()
+                    .defaultValues()
+                    .withDescription(
+                            "Defines an explicit list of physical columns from 
the table schema "
+                                    + "that configure the data type for the 
key format. By default, this list is "
+                                    + "empty and thus a key is undefined.");
+
+    // 
--------------------------------------------------------------------------------------------
+    // TubeMQ specific options
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final ConfigOption<String> TOPIC =
+            ConfigOptions.key("topic")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Topic names from which the table is read. Either 
'topic' "
+                                    + "or 'topic-pattern' must be set for 
source.");
+
+    public static final ConfigOption<String> TOPIC_PATTERN =
+            ConfigOptions.key("topic-pattern")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional topic pattern from which the table is 
read for source."
+                                    + " Either 'topic' or 'topic-pattern' must 
be set.");
+
+    public static final ConfigOption<String> MASTER_RPC =
+            ConfigOptions.key("master.rpc")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Required TubeMQ master connection 
string");
+
+    public static final ConfigOption<String> GROUP_NAME =
+            ConfigOptions.key("group.name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Required consumer group in TubeMQ consumer");
+
+    public static final ConfigOption<String> 
TUBE_MESSAGE_NOT_FOUND_WAIT_PERIOD =
+            ConfigOptions.key("tubemq.message.not.found.wait.period")
+                    .stringType()
+                    .defaultValue("350ms")
+                    .withDescription("The time of waiting period if "
+                            + "tubeMQ broker return message not found.");
+
+    public static final ConfigOption<Long> TUBE_SUBSCRIBE_RETRY_TIMEOUT =
+            ConfigOptions.key("tubemq.subscribe.retry.timeout")
+                    .longType()
+                    .defaultValue(300000L)
+                    .withDescription("The time of subscribing tubeMQ timeout, 
in millisecond");
+
+    public static final ConfigOption<Integer> SOURCE_EVENT_QUEUE_CAPACITY =
+            ConfigOptions.key("source.event.queue.capacity")
+                    .intType()
+                    .defaultValue(1024);
+
+    public static final ConfigOption<String> SESSION_KEY =
+            ConfigOptions.key("session.key")
+                    .stringType()
+                    .defaultValue("default_session_key")
+                    .withDescription("The session key for this consumer group 
at startup.");
+
+    public static final ConfigOption<List<String>> STREAMID =
+            ConfigOptions.key("topic.streamId")
+                    .stringType()
+                    .asList()
+                    .noDefaultValue()
+                    .withDescription("The streamId owned this topic.");
+
+    public static final ConfigOption<Integer> MAX_RETRIES =
+            ConfigOptions.key("max.retries")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription("The maximum number of retries when an "
+                            + "exception is caught.");
+
+    public static final ConfigOption<Boolean> BOOTSTRAP_FROM_MAX =
+            ConfigOptions.key("bootstrap.from.max")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("True if consuming from the most recent "
+                            + "position when the tubemq source starts.. It 
only takes "
+                            + "effect when the tubemq source does not recover 
from "
+                            + "checkpoints.");
+
+    public static final ConfigOption<String> SOURCE_MAX_IDLE_TIME =
+            ConfigOptions.key("source.task.max.idle.time")
+                    .stringType()
+                    .defaultValue("5min")
+                    .withDescription("The max time of the source marked as 
temporarily idle.");
+
+    public static final ConfigOption<String> MESSAGE_NOT_FOUND_WAIT_PERIOD =
+            ConfigOptions.key("message.not.found.wait.period")
+                    .stringType()
+                    .defaultValue("500ms")
+                    .withDescription("The time of waiting period if tubemq 
broker return message not found.");
+
+    public static final ConfigOption<ValueFieldsStrategy> VALUE_FIELDS_INCLUDE 
=
+            ConfigOptions.key("value.fields-include")
+                    .enumType(ValueFieldsStrategy.class)
+                    .defaultValue(ValueFieldsStrategy.ALL)
+                    .withDescription(
+                            String.format(
+                                    "Defines a strategy how to deal with key 
columns in the data type "
+                                            + "of the value format. By 
default, '%s' physical columns "
+                                            + "of the table schema will be 
included in the value "
+                                            + "format which means that the key 
columns "
+                                            + "appear in the data type for 
both the key and value format.",
+                                    ValueFieldsStrategy.ALL));
+
+    public static final ConfigOption<String> KEY_FIELDS_PREFIX =
+            ConfigOptions.key("key.fields-prefix")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Defines a custom prefix for all 
fields of the key format to avoid "
+                                                    + "name clashes with 
fields of the value format. "
+                                                    + "By default, the prefix 
is empty.")
+                                    .linebreak()
+                                    .text(
+                                            String.format(
+                                                    "If a custom prefix is 
defined, both the table schema "
+                                                            + "and '%s' will 
work with prefixed names.",
+                                                    KEY_FIELDS.key()))
+                                    .linebreak()
+                                    .text(
+                                            "When constructing the data type 
of the key format, "
+                                                    + "the prefix will be 
removed and the "
+                                                    + "non-prefixed names will 
be used within the key format.")
+                                    .linebreak()
+                                    .text(
+                                            String.format(
+                                                    "Please note that this 
option requires that '%s' must be '%s'.",
+                                                    VALUE_FIELDS_INCLUDE.key(),
+                                                    
ValueFieldsStrategy.EXCEPT_KEY))
+                                    .build());
+
+    // 
--------------------------------------------------------------------------------------------
+    // Validation
+    // 
--------------------------------------------------------------------------------------------
+    private static final Set<String> CONSUMER_STARTUP_MODE_ENUMS = new 
HashSet<>(Arrays.asList(
+            CONSUMER_FROM_MAX_OFFSET_ALWAYS,
+            CONSUMER_FROM_LATEST_OFFSET,
+            CONSUMER_FROM_FIRST_OFFSET));
+
+    public static Configuration getTubeMQProperties(Map<String, String> 
tableOptions) {
+        final Configuration tubeMQProperties = new Configuration();
+
+        if (hasTubeMQClientProperties(tableOptions)) {
+            tableOptions.keySet().stream()
+                    .filter(key -> key.startsWith(PROPERTIES_PREFIX))
+                    .forEach(
+                            key -> {
+                                final String value = tableOptions.get(key);
+                                final String subKey = 
key.substring((PROPERTIES_PREFIX).length());
+                                tubeMQProperties.toMap().put(subKey, value);
+                            });
+        }
+        return tubeMQProperties;
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Scan specific options
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * Decides if the table options contains TubeMQ client properties that 
start with prefix
+     * 'properties'.
+     */
+    private static boolean hasTubeMQClientProperties(Map<String, String> 
tableOptions) {
+        return tableOptions.keySet().stream().anyMatch(k -> 
k.startsWith(PROPERTIES_PREFIX));
+    }
+
+    public static String getSourceTopics(ReadableConfig tableOptions) {
+        return tableOptions.getOptional(TOPIC).orElse(null);
+    }
+
+    public static String getSinkTopics(ReadableConfig tableOptions) {
+        return tableOptions.getOptional(TOPIC).orElse(null);
+    }
+
+    public static String getMasterRpcAddress(ReadableConfig tableOptions) {
+        return tableOptions.getOptional(MASTER_RPC).orElse(null);
+    }
+
+    public static TreeSet<String> getTiSet(ReadableConfig tableOptions) {
+        TreeSet<String> set = new TreeSet<>();
+        tableOptions.getOptional(STREAMID).ifPresent(new 
Consumer<List<String>>() {
+
+            @Override
+            public void accept(List<String> strings) {
+                set.addAll(strings);
+            }
+        });
+        return set;
+    }
+
+    public static String getConsumerGroup(ReadableConfig tableOptions) {
+        return tableOptions.getOptional(GROUP_NAME).orElse(null);
+    }
+
+    public static String getSessionKey(ReadableConfig tableOptions) {
+        return 
tableOptions.getOptional(SESSION_KEY).orElse(SESSION_KEY.defaultValue());
+    }
+
+    /**
+     * Strategies to derive the data type of a value format by considering a 
key format.
+     */
+    public enum ValueFieldsStrategy {
+
+        ALL,
+
+        EXCEPT_KEY
+
+    }
+
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSink.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSink.java
new file mode 100644
index 0000000000..5d2f8c2a4d
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSink.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tubemq.table;
+
+import org.apache.inlong.sort.tubemq.FlinkTubeMQProducer;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import java.util.TreeSet;
+
+public class TubeMQTableSink implements DynamicTableSink {
+
+    /**
+     * Format for encoding values from TubeMQ.
+     */
+    private final EncodingFormat<SerializationSchema<RowData>> 
valueEncodingFormat;
+    /**
+     * Data type to configure the formats.
+     */
+    private final DataType physicalDataType;
+    /**
+     * The TubeMQ topic name.
+     */
+    private final String topic;
+    /**
+     * The address of TubeMQ master, format eg: 127.0.0.1:8715,127.0.0.2:8715.
+     */
+    private final String masterAddress;
+    /**
+     * The TubeMQ tid filter collection.
+     */
+    private final TreeSet<String> tidSet;
+    /**
+     * The parameters collection for tubemq producer.
+     */
+    private final Configuration configuration;
+
+    public TubeMQTableSink(
+            DataType physicalDataType,
+            EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat,
+            String topic,
+            String masterAddress,
+            TreeSet<String> tidSet,
+            Configuration configuration) {
+        Preconditions.checkNotNull(valueEncodingFormat, "The serialization 
schema must not be null.");
+        Preconditions.checkNotNull(physicalDataType, "Physical data type must 
not be null.");
+        Preconditions.checkNotNull(topic, "Topic must not be null.");
+        Preconditions.checkNotNull(masterAddress, "Master address must not be 
null.");
+        Preconditions.checkNotNull(configuration, "The configuration must not 
be null.");
+        Preconditions.checkNotNull(tidSet, "The tid set must not be null.");
+
+        this.valueEncodingFormat = valueEncodingFormat;
+        this.physicalDataType = physicalDataType;
+        this.topic = topic;
+        this.masterAddress = masterAddress;
+        this.tidSet = tidSet;
+        this.configuration = configuration;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        return valueEncodingFormat.getChangelogMode();
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        final SerializationSchema<RowData> serialization = 
createSerialization(context,
+                valueEncodingFormat, physicalDataType);
+
+        final FlinkTubeMQProducer<RowData> tubeMQProducer =
+                createTubeMQProducer(topic, masterAddress, serialization, 
configuration);
+
+        return SinkFunctionProvider.of(tubeMQProducer, 1);
+    }
+
+    private FlinkTubeMQProducer<RowData> createTubeMQProducer(
+            String topic,
+            String masterAddress,
+            SerializationSchema<RowData> serializationSchema,
+            Configuration configuration) {
+        final FlinkTubeMQProducer<RowData> tubeMQProducer =
+                new FlinkTubeMQProducer(topic, masterAddress, 
serializationSchema, tidSet, configuration);
+        return tubeMQProducer;
+    }
+
+    private SerializationSchema<RowData> createSerialization(
+            Context context,
+            EncodingFormat<SerializationSchema<RowData>> format,
+            DataType physicalDataType) {
+        return format.createRuntimeEncoder(context, physicalDataType);
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        return new TubeMQTableSink(
+                physicalDataType,
+                valueEncodingFormat,
+                topic,
+                masterAddress,
+                tidSet,
+                configuration);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "TubeMQ table sink";
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
new file mode 100644
index 0000000000..2dc21ca2e8
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tubemq.table;
+
+import org.apache.inlong.sort.tubemq.FlinkTubeMQConsumer;
+import 
org.apache.inlong.sort.tubemq.table.DynamicTubeMQDeserializationSchema.MetadataConverter;
+import org.apache.inlong.tubemq.corebase.Message;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/**
+ * .
+ */
+public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetadata, SupportsWatermarkPushDown {
+
+    private static final String VALUE_METADATA_PREFIX = "value.";
+
+    // 
--------------------------------------------------------------------------------------------
+    // Mutable attributes
+    // 
--------------------------------------------------------------------------------------------
+    /**
+     * Data type to configure the formats.
+     */
+    private final DataType physicalDataType;
+    /**
+     * Format for decoding values from TubeMQ.
+     */
+    private final DecodingFormat<DeserializationSchema<RowData>> 
valueDecodingFormat;
+
+    // -------------------------------------------------------------------
+    /**
+     * The address of TubeMQ master, format eg: 127.0.0.1:8715,127.0.0.2:8715.
+     */
+    private final String masterAddress;
+    /**
+     * The TubeMQ topic name.
+     */
+    private final String topic;
+    /**
+     * The TubeMQ tid filter collection.
+     */
+    private final TreeSet<String> tidSet;
+    /**
+     * The TubeMQ consumer group name.
+     */
+    private final String consumerGroup;
+    /**
+     * The parameters collection for TubeMQ consumer.
+     */
+    private final Configuration configuration;
+    /**
+     * The TubeMQ session key.
+     */
+    private final String sessionKey;
+    /**
+     * Field name of the processing time attribute, null if no processing time
+     * field is defined.
+     */
+    private final Optional<String> proctimeAttribute;
+    /**
+     * status of error
+     */
+    private final boolean ignoreErrors;
+    /**
+     * The InLong inner format.
+     */
+    private final boolean innerFormat;
+    /**
+     * Data type that describes the final output of the source.
+     */
+    protected DataType producedDataType;
+    /**
+     * Metadata that is appended at the end of a physical source row.
+     */
+    protected List<String> metadataKeys;
+    /**
+     * Watermark strategy that is used to generate per-partition watermark.
+     */
+    @Nullable
+    private WatermarkStrategy<RowData> watermarkStrategy;
+
+    public TubeMQTableSource(DataType physicalDataType,
+            DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
+            String masterAddress, String topic,
+            TreeSet<String> tidSet, String consumerGroup, String sessionKey,
+            Configuration configuration, @Nullable WatermarkStrategy<RowData> 
watermarkStrategy,
+            Optional<String> proctimeAttribute, Boolean ignoreErrors, Boolean 
innerFormat) {
+
+        Preconditions.checkNotNull(physicalDataType, "Physical data type must 
not be null.");
+        Preconditions.checkNotNull(valueDecodingFormat, "The deserialization 
schema must not be null.");
+        Preconditions.checkNotNull(masterAddress, "The master address must not 
be null.");
+        Preconditions.checkNotNull(topic, "The topic must not be null.");
+        Preconditions.checkNotNull(tidSet, "The tid set must not be null.");
+        Preconditions.checkNotNull(consumerGroup, "The consumer group must not 
be null.");
+        Preconditions.checkNotNull(configuration, "The configuration must not 
be null.");
+
+        this.physicalDataType = physicalDataType;
+        this.producedDataType = physicalDataType;
+        this.metadataKeys = Collections.emptyList();
+        this.valueDecodingFormat = valueDecodingFormat;
+        this.masterAddress = masterAddress;
+        this.topic = topic;
+        this.tidSet = tidSet;
+        this.consumerGroup = consumerGroup;
+        this.sessionKey = sessionKey;
+        this.configuration = configuration;
+        this.watermarkStrategy = watermarkStrategy;
+        this.proctimeAttribute = proctimeAttribute;
+        this.ignoreErrors = ignoreErrors;
+        this.innerFormat = innerFormat;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return valueDecodingFormat.getChangelogMode();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
+        final LogicalType physicalType = physicalDataType.getLogicalType();
+        final int physicalFieldCount = 
LogicalTypeChecks.getFieldCount(physicalType);
+        final IntStream physicalFields = IntStream.range(0, 
physicalFieldCount);
+        final DeserializationSchema<RowData> deserialization = 
createDeserialization(context,
+                valueDecodingFormat, physicalFields.toArray(), null);
+
+        final TypeInformation<RowData> producedTypeInfo = 
context.createTypeInformation(physicalDataType);
+
+        final FlinkTubeMQConsumer<RowData> tubeMQConsumer = 
createTubeMQConsumer(deserialization, producedTypeInfo,
+                ignoreErrors);
+
+        return SourceFunctionProvider.of(tubeMQConsumer, false);
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        return new TubeMQTableSource(
+                physicalDataType, valueDecodingFormat, masterAddress,
+                topic, tidSet, consumerGroup, sessionKey, configuration,
+                watermarkStrategy, proctimeAttribute, ignoreErrors, 
innerFormat);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "TubeMQ table source";
+    }
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+        valueDecodingFormat
+                .listReadableMetadata()
+                .forEach((key, value) -> metadataMap.put(VALUE_METADATA_PREFIX 
+ key, value));
+
+        // add connector metadata
+        Stream.of(ReadableMetadata.values())
+                .forEachOrdered(m -> metadataMap.putIfAbsent(m.key, 
m.dataType));
+
+        return metadataMap;
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType) {
+        // separate connector and format metadata
+        final List<String> formatMetadataKeys =
+                metadataKeys.stream()
+                        .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
+                        .collect(Collectors.toList());
+        final List<String> connectorMetadataKeys = new 
ArrayList<>(metadataKeys);
+        connectorMetadataKeys.removeAll(formatMetadataKeys);
+
+        // push down format metadata
+        final Map<String, DataType> formatMetadata = 
valueDecodingFormat.listReadableMetadata();
+        if (formatMetadata.size() > 0) {
+            final List<String> requestedFormatMetadataKeys =
+                    formatMetadataKeys.stream()
+                            .map(k -> 
k.substring(VALUE_METADATA_PREFIX.length()))
+                            .collect(Collectors.toList());
+            
valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+        }
+        this.metadataKeys = connectorMetadataKeys;
+        this.producedDataType = producedDataType;
+
+    }
+
+    @Override
+    public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
+        this.watermarkStrategy = watermarkStrategy;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final TubeMQTableSource that = (TubeMQTableSource) o;
+        return Objects.equals(physicalDataType, that.physicalDataType)
+                && Objects.equals(valueDecodingFormat, 
that.valueDecodingFormat)
+                && Objects.equals(masterAddress, that.masterAddress)
+                && Objects.equals(topic, that.topic)
+                && Objects.equals(String.valueOf(tidSet), 
String.valueOf(that.tidSet))
+                && Objects.equals(consumerGroup, that.consumerGroup)
+                && Objects.equals(proctimeAttribute, that.proctimeAttribute)
+                && Objects.equals(watermarkStrategy, that.watermarkStrategy);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                physicalDataType,
+                valueDecodingFormat,
+                masterAddress,
+                topic,
+                tidSet,
+                consumerGroup,
+                configuration,
+                watermarkStrategy,
+                proctimeAttribute);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Metadata handling
+    // 
--------------------------------------------------------------------------------------------
+
+    @Nullable
+    private DeserializationSchema<RowData> createDeserialization(
+            Context context,
+            @Nullable DecodingFormat<DeserializationSchema<RowData>> format,
+            int[] projection,
+            @Nullable String prefix) {
+        if (format == null) {
+            return null;
+        }
+        DataType physicalFormatDataType = 
DataTypeUtils.projectRow(this.physicalDataType, projection);
+        if (prefix != null) {
+            physicalFormatDataType = 
DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
+        }
+        return format.createRuntimeDecoder(context, physicalFormatDataType);
+    }
+
+    protected FlinkTubeMQConsumer<RowData> createTubeMQConsumer(
+            DeserializationSchema<RowData> deserialization,
+            TypeInformation<RowData> producedTypeInfo,
+            boolean ignoreErrors) {
+        final MetadataConverter[] metadataConverters =
+                metadataKeys.stream()
+                        .map(k -> Stream.of(ReadableMetadata.values())
+                                .filter(rm -> rm.key.equals(k))
+                                .findFirst()
+                                .orElseThrow(IllegalStateException::new))
+                        .map(m -> m.converter)
+                        .toArray(MetadataConverter[]::new);
+        final DeserializationSchema<RowData> tubeMQDeserializer = new 
DynamicTubeMQDeserializationSchema(
+                deserialization, metadataConverters, producedTypeInfo, 
ignoreErrors);
+
+        final FlinkTubeMQConsumer<RowData> tubeMQConsumer = new 
FlinkTubeMQConsumer(masterAddress, topic, tidSet,
+                consumerGroup, tubeMQDeserializer, configuration, sessionKey, 
innerFormat);
+        return tubeMQConsumer;
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Metadata handling
+    // 
--------------------------------------------------------------------------------------------
+
+    enum ReadableMetadata {
+
+        TOPIC(
+                "topic",
+                DataTypes.STRING().notNull(),
+                new MetadataConverter() {
+
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(Message msg) {
+                        return StringData.fromString(msg.getTopic());
+                    }
+                });
+
+        final String key;
+
+        final DataType dataType;
+
+        final MetadataConverter converter;
+
+        ReadableMetadata(String key, DataType dataType, MetadataConverter 
converter) {
+            this.key = key;
+            this.dataType = dataType;
+            this.converter = converter;
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000000..ed092ea8e5
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.tubemq.table.TubeMQDynamicTableFactory

Reply via email to