This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b3f42f216 [INLONG-11446][TubeMQ] Remove legacy codes for flink-tube 
connector (#11448)
7b3f42f216 is described below

commit 7b3f42f21685a8053dd92f4e9dd472a621b1e2f9
Author: AloysZhang <aloyszh...@apache.org>
AuthorDate: Fri Nov 1 16:43:42 2024 +0800

    [INLONG-11446][TubeMQ] Remove legacy codes for flink-tube connector (#11448)
---
 inlong-tubemq/tubemq-connectors/pom.xml            |   1 -
 .../tubemq-connector-flink/pom.xml                 | 106 ------
 .../org/apache/flink/connectors/tubemq/Tubemq.java | 162 ----------
 .../flink/connectors/tubemq/TubemqOptions.java     |  61 ----
 .../connectors/tubemq/TubemqSinkFunction.java      | 182 -----------
 .../connectors/tubemq/TubemqSourceFunction.java    | 358 ---------------------
 .../flink/connectors/tubemq/TubemqTableSink.java   | 130 --------
 .../flink/connectors/tubemq/TubemqTableSource.java | 235 --------------
 .../tubemq/TubemqTableSourceSinkFactory.java       | 244 --------------
 .../flink/connectors/tubemq/TubemqValidator.java   |  77 -----
 .../org.apache.flink.table.factories.TableFactory  |  16 -
 .../apache/flink/connectors/tubemq/TubemqTest.java | 101 ------
 .../src/test/resources/log4j2.properties           |  21 --
 13 files changed, 1694 deletions(-)

diff --git a/inlong-tubemq/tubemq-connectors/pom.xml 
b/inlong-tubemq/tubemq-connectors/pom.xml
index e9c4b1b147..1fc71477c3 100644
--- a/inlong-tubemq/tubemq-connectors/pom.xml
+++ b/inlong-tubemq/tubemq-connectors/pom.xml
@@ -29,7 +29,6 @@
     <name>Apache InLong - TubeMQ Connectors</name>
 
     <modules>
-        <module>tubemq-connector-flink</module>
         <module>tubemq-connector-flume</module>
         <module>tubemq-connector-spark</module>
     </modules>
diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/pom.xml 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/pom.xml
deleted file mode 100644
index 47bb6ccce7..0000000000
--- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/pom.xml
+++ /dev/null
@@ -1,106 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <groupId>org.apache.inlong</groupId>
-        <artifactId>tubemq-connectors</artifactId>
-        <version>2.1.0-SNAPSHOT</version>
-    </parent>
-
-    <artifactId>tubemq-connector-flink</artifactId>
-    <name>Apache InLong - TubeMQ Connectors-flink</name>
-
-    <properties>
-        
<inlong.root.dir>${project.parent.parent.parent.basedir}</inlong.root.dir>
-        <flink.version>1.13.5</flink.version>
-        <scala.binary.version>2.11</scala.binary.version>
-    </properties>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.inlong</groupId>
-            <artifactId>tubemq-client</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.inlong</groupId>
-            <artifactId>tubemq-core</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-core</artifactId>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-runtime_${scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-common</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-            <optional>true</optional>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-            <optional>true</optional>
-        </dependency>
-
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-common</artifactId>
-            <version>${flink.version}</version>
-            <type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <version>${junit.version}</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-</project>
diff --git 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java
 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java
deleted file mode 100644
index 2e34c13330..0000000000
--- 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connectors.tubemq;
-
-import org.apache.flink.table.descriptors.ConnectorDescriptor;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static 
org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_GROUP;
-import static 
org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_MASTER;
-import static 
org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_PROPERTIES;
-import static 
org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_STREAMIDS;
-import static 
org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_TOPIC;
-import static 
org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_TYPE_VALUE_TUBEMQ;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The {@link ConnectorDescriptor} for tubemq sources and sinks.
- */
-public class Tubemq extends ConnectorDescriptor {
-
-    @Nullable
-    private boolean consumerRole = true;
-
-    @Nullable
-    private String topic;
-
-    @Nullable
-    private String master;
-
-    @Nullable
-    private String group;
-
-    @Nullable
-    private String streamIds;
-
-    @Nonnull
-    private Map<String, String> properties;
-
-    public Tubemq() {
-        super(CONNECTOR_TYPE_VALUE_TUBEMQ, 1, true);
-
-        this.properties = new HashMap<>();
-    }
-
-    /**
-     * Sets the tubemq topic to be used.
-     *
-     * @param topic The topic name.
-     */
-    public Tubemq topic(String topic) {
-        checkNotNull(topic);
-
-        this.topic = topic;
-        return this;
-    }
-
-    /**
-     * Sets the client role to be used.
-     *
-     * @param isConsumer The client role if consumer.
-     */
-    public Tubemq asConsumer(boolean isConsumer) {
-        this.consumerRole = isConsumer;
-        return this;
-    }
-
-    /**
-     * Sets the address of tubemq master to connect.
-     *
-     * @param master The address of tubemq master.
-     */
-    public Tubemq master(String master) {
-        checkNotNull(master);
-
-        this.master = master;
-        return this;
-    }
-
-    /**
-     * Sets the tubemq (consumer or producer) group to be used.
-     *
-     * @param group The group name.
-     */
-    public Tubemq group(String group) {
-        checkNotNull(group);
-
-        this.group = group;
-        return this;
-    }
-
-    /**
-     * The tubemq consumers use these streamIds to filter records reading from 
server.
-     *
-     * @param streamIds The filter for consume record from server.
-     */
-    public Tubemq streamIds(String streamIds) {
-
-        this.streamIds = streamIds;
-        return this;
-    }
-
-    /**
-     * Sets the tubemq property.
-     *
-     * @param key   The key of the property.
-     * @param value The value of the property.
-     */
-    public Tubemq property(String key, String value) {
-        checkNotNull(key);
-        checkNotNull(value);
-
-        properties.put(key, value);
-        return this;
-    }
-
-    @Override
-    protected Map<String, String> toConnectorProperties() {
-        DescriptorProperties descriptorProperties = new DescriptorProperties();
-
-        if (topic != null) {
-            descriptorProperties.putString(CONNECTOR_TOPIC, topic);
-        }
-
-        if (master != null) {
-            descriptorProperties.putString(CONNECTOR_MASTER, master);
-        }
-        if (consumerRole) {
-            if (group != null) {
-                descriptorProperties.putString(CONNECTOR_GROUP, group);
-            }
-
-            if (streamIds != null) {
-                descriptorProperties.putString(CONNECTOR_STREAMIDS, streamIds);
-            }
-        }
-
-        descriptorProperties.putPropertiesWithPrefix(CONNECTOR_PROPERTIES, 
properties);
-
-        return descriptorProperties.asMap();
-    }
-}
diff --git 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java
 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java
deleted file mode 100644
index e94fbf86c5..0000000000
--- 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connectors.tubemq;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-
-/**
- * The configuration options for tubemq sources and sink.
- */
-public class TubemqOptions {
-
-    public static final ConfigOption<String> SESSION_KEY =
-            ConfigOptions.key("session.key")
-                    .noDefaultValue()
-                    .withDescription("The session key for this consumer group 
at startup.");
-
-    public static final ConfigOption<String> STREAM_ID =
-            ConfigOptions.key("topic.streamId")
-                    .noDefaultValue()
-                    .withDescription("The streamId owned this topic.");
-
-    public static final ConfigOption<Integer> MAX_RETRIES =
-            ConfigOptions.key("max.retries")
-                    .defaultValue(5)
-                    .withDescription("The maximum number of retries when an "
-                            + "exception is caught.");
-
-    public static final ConfigOption<Boolean> BOOTSTRAP_FROM_MAX =
-            ConfigOptions.key("bootstrap.from.max")
-                    .defaultValue(false)
-                    .withDescription("True if consuming from the most recent "
-                            + "position when the tubemq source starts.. It 
only takes "
-                            + "effect when the tubemq source does not recover 
from "
-                            + "checkpoints.");
-
-    public static final ConfigOption<String> SOURCE_MAX_IDLE_TIME =
-            ConfigOptions.key("source.task.max.idle.time")
-                    .defaultValue("5min")
-                    .withDescription("The max time of the source marked as 
temporarily idle.");
-
-    public static final ConfigOption<String> MESSAGE_NOT_FOUND_WAIT_PERIOD =
-            ConfigOptions.key("message.not.found.wait.period")
-                    .defaultValue("350ms")
-                    .withDescription("The time of waiting period if tubemq 
broker return message not found.");
-}
diff --git 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSinkFunction.java
 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSinkFunction.java
deleted file mode 100644
index fabd43af6e..0000000000
--- 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSinkFunction.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connectors.tubemq;
-
-import org.apache.inlong.tubemq.client.config.TubeClientConfig;
-import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
-import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
-import org.apache.inlong.tubemq.client.producer.MessageProducer;
-import org.apache.inlong.tubemq.client.producer.MessageSentResult;
-import org.apache.inlong.tubemq.corebase.Message;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashSet;
-
-import static org.apache.flink.connectors.tubemq.TubemqOptions.MAX_RETRIES;
-
-public class TubemqSinkFunction<T> extends RichSinkFunction<T> implements 
CheckpointedFunction {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(TubemqSinkFunction.class);
-
-    private static final String SYSTEM_HEADER_TIME_FORMAT = "yyyyMMddHHmm";
-
-    /**
-     * The address of tubemq master, format eg: 127.0.0.1:8080,127.0.0.2:8081.
-     */
-    private final String masterAddress;
-
-    /**
-     * The topic name.
-     */
-    private final String topic;
-
-    /**
-     * The streamId of this topic
-     */
-    private final String streamId;
-    /**
-     * The serializer for the records sent to pulsar.
-     */
-    private final SerializationSchema<T> serializationSchema;
-
-    /**
-     * The tubemq producer.
-     */
-    private transient MessageProducer producer;
-
-    /**
-     * The tubemq session factory.
-     */
-    private transient MessageSessionFactory sessionFactory;
-
-    /**
-     * The maximum number of retries.
-     */
-    private final int maxRetries;
-
-    public TubemqSinkFunction(String topic,
-            String masterAddress,
-            SerializationSchema<T> serializationSchema,
-            Configuration configuration) {
-        Preconditions.checkNotNull(topic,
-                "The topic must not be null.");
-        Preconditions.checkNotNull(masterAddress,
-                "The master address must not be null.");
-        Preconditions.checkNotNull(serializationSchema,
-                "The serialization schema must not be null.");
-        Preconditions.checkNotNull(configuration,
-                "The configuration must not be null.");
-
-        this.topic = topic;
-        this.masterAddress = masterAddress;
-        this.serializationSchema = serializationSchema;
-        this.streamId = configuration.getString(TubemqOptions.STREAM_ID);
-        this.maxRetries = configuration.getInteger(MAX_RETRIES);
-    }
-
-    @Override
-    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) 
{
-        // Nothing to do.
-    }
-
-    @Override
-    public void initializeState(FunctionInitializationContext 
functionInitializationContext) {
-        // Nothing to do.
-    }
-
-    @Override
-    public void open(Configuration parameters) throws Exception {
-        super.open(parameters);
-
-        TubeClientConfig tubeClientConfig = new 
TubeClientConfig(masterAddress);
-        this.sessionFactory = new TubeSingleSessionFactory(tubeClientConfig);
-        this.producer = sessionFactory.createProducer();
-        HashSet<String> hashSet = new HashSet<>();
-        hashSet.add(topic);
-        producer.publish(hashSet);
-    }
-
-    @Override
-    public void invoke(T in, Context context) throws Exception {
-
-        int retries = 0;
-        Exception exception = null;
-
-        while (maxRetries <= 0 || retries < maxRetries) {
-
-            try {
-                byte[] body = serializationSchema.serialize(in);
-                Message message = new Message(topic, body);
-                if (StringUtils.isNotBlank(streamId)) {
-                    SimpleDateFormat sdf = new 
SimpleDateFormat(SYSTEM_HEADER_TIME_FORMAT);
-                    long currTimeMillis = System.currentTimeMillis();
-                    message.putSystemHeader(streamId, sdf.format(new 
Date(currTimeMillis)));
-                }
-
-                MessageSentResult sendResult = producer.sendMessage(message);
-                if (sendResult.isSuccess()) {
-                    return;
-                } else {
-                    LOG.warn("Send msg fail, error code: {}, error message: 
{}",
-                            sendResult.getErrCode(), sendResult.getErrMsg());
-                }
-            } catch (Exception e) {
-                LOG.warn("Could not properly send the message to hippo "
-                        + "(retries: {}).", retries, e);
-
-                retries++;
-                exception = ExceptionUtils.firstOrSuppressed(e, exception);
-            }
-        }
-
-        throw new IOException("Could not properly send the message to hippo.", 
exception);
-    }
-
-    @Override
-    public void close() throws Exception {
-
-        try {
-            if (producer != null) {
-                producer.shutdown();
-                producer = null;
-            }
-            if (sessionFactory != null) {
-                sessionFactory.shutdown();
-                sessionFactory = null;
-            }
-        } catch (Throwable e) {
-            LOG.error("Shutdown producer error", e);
-        } finally {
-            super.close();
-        }
-    }
-}
diff --git 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java
 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java
deleted file mode 100644
index 4f9fc050a9..0000000000
--- 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connectors.tubemq;
-
-import org.apache.inlong.tubemq.client.config.ConsumerConfig;
-import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
-import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
-import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
-import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
-import org.apache.inlong.tubemq.corebase.Message;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.time.Duration;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-
-import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
-import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.TimeUtils.parseDuration;
-
-/**
- * The Flink TubeMQ Consumer.
- *
- * @param <T> The type of records produced by this data source
- */
-public class TubemqSourceFunction<T>
-        extends
-            RichParallelSourceFunction<T>
-        implements
-            CheckpointedFunction {
-
-    private static final Logger LOG =
-            LoggerFactory.getLogger(TubemqSourceFunction.class);
-
-    private static final String TUBE_OFFSET_STATE = "tube-offset-state";
-
-    private static final String SPLIT_COMMA = ",";
-    private static final String SPLIT_COLON = ":";
-
-    /**
-     * The address of TubeMQ master, format eg: 127.0.0.1:8080,127.0.0.2:8081.
-     */
-    private final String masterAddress;
-
-    /**
-     * The topic name.
-     */
-    private final String topic;
-
-    /**
-     * The tubemq consumers use this streamId set to filter records reading 
from server.
-     */
-    private final TreeSet<String> streamIdSet;
-
-    /**
-     * The consumer group name.
-     */
-    private final String consumerGroup;
-
-    /**
-     * The deserializer for records.
-     */
-    private final DeserializationSchema<T> deserializationSchema;
-
-    /**
-     * The random key for TubeMQ consumer group when startup.
-     */
-    private final String sessionKey;
-
-    /**
-     * True if consuming message from max offset.
-     */
-    private final boolean consumeFromMax;
-
-    /**
-     * The time to wait if tubemq broker returns message not found.
-     */
-    private final Duration messageNotFoundWaitPeriod;
-
-    /**
-     * The max time to marked source idle.
-     */
-    private final Duration maxIdleTime;
-
-    /**
-     * Flag indicating whether the consumer is still running.
-     **/
-    private volatile boolean running;
-
-    /**
-     * The state for the offsets of queues.
-     */
-    private transient ListState<Tuple2<String, Long>> offsetsState;
-
-    /**
-     * The current offsets of partitions which are stored in {@link 
#offsetsState}
-     * once a checkpoint is triggered.
-     *
-     *NOTE: The offsets are populated in the main thread and saved in the
-     * checkpoint thread. Its usage must be guarded by the checkpoint lock.</p>
-     */
-    private transient Map<String, Long> currentOffsets;
-
-    /**
-     * The TubeMQ session factory.
-     */
-    private transient TubeSingleSessionFactory messageSessionFactory;
-
-    /**
-     * The TubeMQ pull consumer.
-     */
-    private transient PullMessageConsumer messagePullConsumer;
-
-    /**
-     * Build a TubeMQ source function
-     *
-     * @param masterAddress            the master address of TubeMQ
-     * @param topic                    the topic name
-     * @param streamIdSet                   the  topic's filter condition items
-     * @param consumerGroup            the consumer group name
-     * @param deserializationSchema    the deserialize schema
-     * @param configuration            the configure
-     */
-    public TubemqSourceFunction(
-            String masterAddress,
-            String topic,
-            TreeSet<String> streamIdSet,
-            String consumerGroup,
-            DeserializationSchema<T> deserializationSchema,
-            Configuration configuration) {
-        checkNotNull(masterAddress,
-                "The master address must not be null.");
-        checkNotNull(topic,
-                "The topic must not be null.");
-        checkNotNull(streamIdSet,
-                "The streamId set must not be null.");
-        checkNotNull(consumerGroup,
-                "The consumer group must not be null.");
-        checkNotNull(deserializationSchema,
-                "The deserialization schema must not be null.");
-        checkNotNull(configuration,
-                "The configuration must not be null.");
-
-        this.masterAddress = masterAddress;
-        this.topic = topic;
-        this.streamIdSet = streamIdSet;
-        this.consumerGroup = consumerGroup;
-        this.deserializationSchema = deserializationSchema;
-
-        this.sessionKey =
-                configuration.getString(TubemqOptions.SESSION_KEY);
-        this.consumeFromMax =
-                configuration.getBoolean(TubemqOptions.BOOTSTRAP_FROM_MAX);
-        this.messageNotFoundWaitPeriod =
-                parseDuration(
-                        configuration.getString(
-                                TubemqOptions.MESSAGE_NOT_FOUND_WAIT_PERIOD));
-        this.maxIdleTime =
-                parseDuration(
-                        configuration.getString(
-                                TubemqOptions.SOURCE_MAX_IDLE_TIME));
-    }
-
-    @Override
-    public void initializeState(
-            FunctionInitializationContext context) throws Exception {
-
-        TypeInformation<Tuple2<String, Long>> typeInformation =
-                new TupleTypeInfo<>(STRING_TYPE_INFO, LONG_TYPE_INFO);
-        ListStateDescriptor<Tuple2<String, Long>> stateDescriptor =
-                new ListStateDescriptor<>(TUBE_OFFSET_STATE, typeInformation);
-
-        OperatorStateStore stateStore = context.getOperatorStateStore();
-        offsetsState = stateStore.getListState(stateDescriptor);
-
-        currentOffsets = new HashMap<>();
-        if (context.isRestored()) {
-            for (Tuple2<String, Long> tubeOffset : offsetsState.get()) {
-                currentOffsets.put(tubeOffset.f0, tubeOffset.f1);
-            }
-
-            LOG.info("Successfully restore the offsets {}.", currentOffsets);
-        } else {
-            LOG.info("No restore offsets.");
-        }
-    }
-
-    @Override
-    public void open(Configuration parameters) throws Exception {
-        ConsumerConfig consumerConfig =
-                new ConsumerConfig(masterAddress, consumerGroup);
-        consumerConfig.setConsumePosition(consumeFromMax
-                ? ConsumePosition.CONSUMER_FROM_MAX_OFFSET_ALWAYS
-                : ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
-        
consumerConfig.setMsgNotFoundWaitPeriodMs(messageNotFoundWaitPeriod.toMillis());
-
-        final int numTasks = getRuntimeContext().getNumberOfParallelSubtasks();
-
-        messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
-
-        messagePullConsumer =
-                messageSessionFactory.createPullConsumer(consumerConfig);
-        messagePullConsumer
-                .subscribe(topic, streamIdSet);
-        messagePullConsumer
-                .completeSubscribe(sessionKey, numTasks, true, currentOffsets);
-
-        running = true;
-    }
-
-    @Override
-    public void run(SourceContext<T> ctx) throws Exception {
-
-        Instant lastConsumeInstant = Instant.now();
-
-        while (running) {
-
-            ConsumerResult consumeResult = messagePullConsumer.getMessage();
-            if (!consumeResult.isSuccess()) {
-                if (!(consumeResult.getErrCode() == 400
-                        || consumeResult.getErrCode() == 404
-                        || consumeResult.getErrCode() == 405
-                        || consumeResult.getErrCode() == 406
-                        || consumeResult.getErrCode() == 407
-                        || consumeResult.getErrCode() == 408)) {
-                    LOG.info("Could not consume messages from tubemq (errcode: 
{}, "
-                            + "errmsg: {}).", consumeResult.getErrCode(),
-                            consumeResult.getErrMsg());
-                }
-
-                Duration idleTime =
-                        Duration.between(lastConsumeInstant, Instant.now());
-                if (idleTime.compareTo(maxIdleTime) > 0) {
-                    ctx.markAsTemporarilyIdle();
-                }
-
-                continue;
-            }
-
-            List<Message> messageList = consumeResult.getMessageList();
-
-            List<T> records = new ArrayList<>();
-            if (messageList != null) {
-                lastConsumeInstant = Instant.now();
-
-                for (Message message : messageList) {
-                    T record =
-                            
deserializationSchema.deserialize(message.getData());
-                    records.add(record);
-                }
-            }
-
-            synchronized (ctx.getCheckpointLock()) {
-
-                for (T record : records) {
-                    ctx.collect(record);
-                }
-
-                currentOffsets.put(
-                        consumeResult.getPartitionKey(),
-                        consumeResult.getCurrOffset());
-            }
-
-            ConsumerResult confirmResult =
-                    messagePullConsumer
-                            .confirmConsume(consumeResult.getConfirmContext(), 
true);
-            if (!confirmResult.isSuccess()) {
-                if (!(confirmResult.getErrCode() == 400
-                        || confirmResult.getErrCode() == 404
-                        || confirmResult.getErrCode() == 405
-                        || confirmResult.getErrCode() == 406
-                        || confirmResult.getErrCode() == 407
-                        || confirmResult.getErrCode() == 408)) {
-                    LOG.warn("Could not confirm messages to tubemq (errcode: 
{}, "
-                            + "errmsg: {}).", confirmResult.getErrCode(),
-                            confirmResult.getErrMsg());
-                }
-            }
-        }
-    }
-
-    @Override
-    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
-
-        offsetsState.clear();
-        for (Map.Entry<String, Long> entry : currentOffsets.entrySet()) {
-            offsetsState.add(new Tuple2<>(entry.getKey(), entry.getValue()));
-        }
-
-        LOG.info("Successfully save the offsets in checkpoint {}: {}.",
-                context.getCheckpointId(), currentOffsets);
-    }
-
-    @Override
-    public void cancel() {
-        running = false;
-    }
-
-    @Override
-    public void close() throws Exception {
-
-        cancel();
-
-        if (messagePullConsumer != null) {
-            try {
-                messagePullConsumer.shutdown();
-            } catch (Throwable t) {
-                LOG.warn("Could not properly shutdown the tubemq pull 
consumer.", t);
-            }
-        }
-
-        if (messageSessionFactory != null) {
-            try {
-                messageSessionFactory.shutdown();
-            } catch (Throwable t) {
-                LOG.warn("Could not properly shutdown the tubemq session 
factory.", t);
-            }
-        }
-
-        super.close();
-
-        LOG.info("Closed the tubemq source.");
-    }
-}
diff --git 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSink.java
 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSink.java
deleted file mode 100644
index b9f7e5f86d..0000000000
--- 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSink.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connectors.tubemq;
-
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.sinks.AppendStreamTableSink;
-import org.apache.flink.table.utils.TableConnectorUtils;
-import org.apache.flink.types.Row;
-
-import java.util.Arrays;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Tubemq {@link org.apache.flink.table.sinks.StreamTableSink}.
- */
-public class TubemqTableSink implements AppendStreamTableSink<Row> {
-
-    /**
-     * Serialization schema for records to tubemq.
-     */
-    private final SerializationSchema<Row> serializationSchema;
-
-    /**
-     * The schema of the table.
-     */
-    private final TableSchema schema;
-
-    /**
-     * The tubemq topic name.
-     */
-    private final String topic;
-
-    /**
-     * The address of tubemq master, format eg: 127.0.0.1:8080,127.0.0.2:8081 .
-     */
-    private final String masterAddress;
-
-    /**
-     * The parameters collection for tubemq producer.
-     */
-    private final Configuration configuration;
-
-    public TubemqTableSink(
-            SerializationSchema<Row> serializationSchema,
-            TableSchema schema,
-            String topic,
-            String masterAddress,
-            Configuration configuration) {
-        this.serializationSchema = checkNotNull(serializationSchema,
-                "The deserialization schema must not be null.");
-        this.schema = checkNotNull(schema,
-                "The schema must not be null.");
-        this.topic = checkNotNull(topic,
-                "Topic must not be null.");
-        this.masterAddress = checkNotNull(masterAddress,
-                "Master address must not be null.");
-        this.configuration = checkNotNull(configuration,
-                "The configuration must not be null.");
-    }
-
-    @Override
-    public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
-
-        final SinkFunction<Row> tubemqSinkFunction =
-                new TubemqSinkFunction<>(
-                        topic,
-                        masterAddress,
-                        serializationSchema,
-                        configuration);
-
-        return dataStream
-                .addSink(tubemqSinkFunction)
-                .name(
-                        TableConnectorUtils.generateRuntimeName(
-                                getClass(),
-                                getFieldNames()));
-    }
-
-    @Override
-    public TypeInformation<Row> getOutputType() {
-        return schema.toRowType();
-    }
-
-    @Override
-    public String[] getFieldNames() {
-        return schema.getFieldNames();
-    }
-
-    @Override
-    public TypeInformation<?>[] getFieldTypes() {
-        return schema.getFieldTypes();
-    }
-
-    @Override
-    public TubemqTableSink configure(String[] fieldNames, TypeInformation<?>[] 
fieldTypes) {
-        if (!Arrays.equals(getFieldNames(), fieldNames)
-                || !Arrays.equals(getFieldTypes(), fieldTypes)) {
-            throw new ValidationException("Reconfiguration with different 
fields is not allowed. "
-                    + "Expected: " + Arrays.toString(getFieldNames())
-                    + " / " + Arrays.toString(getFieldTypes()) + ". "
-                    + "But was: " + Arrays.toString(fieldNames) + " / "
-                    + Arrays.toString(fieldTypes));
-        }
-
-        return this;
-    }
-}
diff --git 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java
 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java
deleted file mode 100644
index 934a441687..0000000000
--- 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connectors.tubemq;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.Types;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.sources.DefinedFieldMapping;
-import org.apache.flink.table.sources.DefinedProctimeAttribute;
-import org.apache.flink.table.sources.DefinedRowtimeAttributes;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.table.sources.StreamTableSource;
-import org.apache.flink.types.Row;
-
-import javax.annotation.Nullable;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.TreeSet;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * TubeMQ {@link StreamTableSource}.
- */
-public class TubemqTableSource
-        implements
-            StreamTableSource<Row>,
-            DefinedProctimeAttribute,
-            DefinedRowtimeAttributes,
-            DefinedFieldMapping {
-
-    /**
-     * Deserialization schema for records from TubeMQ.
-     */
-    private final DeserializationSchema<Row> deserializationSchema;
-
-    /**
-     * The schema of the table.
-     */
-    private final TableSchema schema;
-
-    /**
-     * Field name of the processing time attribute, null if no processing time
-     * field is defined.
-     */
-    private final Optional<String> proctimeAttribute;
-
-    /**
-     * Descriptors for rowtime attributes.
-     */
-    private final List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
-
-    /**
-     * Mapping for the fields of the table schema to fields of the physical
-     * returned type.
-     */
-    private final Map<String, String> fieldMapping;
-
-    /**
-     * The address of TubeMQ master, format eg: 127.0.0.1:8080,127.0.0.2:8081 .
-     */
-    private final String masterAddress;
-
-    /**
-     * The TubeMQ topic name.
-     */
-    private final String topic;
-
-    /**
-     * The TubeMQ streamId filter collection.
-     */
-    private final TreeSet<String> streamIdSet;
-
-    /**
-     * The TubeMQ consumer group name.
-     */
-    private final String consumerGroup;
-
-    /**
-     * The parameters collection for TubeMQ consumer.
-     */
-    private final Configuration configuration;
-
-    /**
-     * Build TubeMQ table source
-     *
-     * @param deserializationSchema   the deserialize schema
-     * @param schema             the data schema
-     * @param proctimeAttribute              the proc time
-     * @param rowtimeAttributeDescriptors    the row time attribute descriptor
-     * @param fieldMapping        the field map information
-     * @param masterAddress       the master address
-     * @param topic               the topic name
-     * @param streamIdSet              the topic's filter condition items
-     * @param consumerGroup       the consumer group
-     * @param configuration       the configure
-     */
-    public TubemqTableSource(
-            DeserializationSchema<Row> deserializationSchema,
-            TableSchema schema,
-            Optional<String> proctimeAttribute,
-            List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors,
-            Map<String, String> fieldMapping,
-            String masterAddress,
-            String topic,
-            TreeSet<String> streamIdSet,
-            String consumerGroup,
-            Configuration configuration) {
-        checkNotNull(deserializationSchema,
-                "The deserialization schema must not be null.");
-        checkNotNull(schema,
-                "The schema must not be null.");
-        checkNotNull(fieldMapping,
-                "The field mapping must not be null.");
-        checkNotNull(masterAddress,
-                "The master address must not be null.");
-        checkNotNull(topic,
-                "The topic must not be null.");
-        checkNotNull(streamIdSet,
-                "The streamId set must not be null.");
-        checkNotNull(consumerGroup,
-                "The consumer group must not be null.");
-        checkNotNull(configuration,
-                "The configuration must not be null.");
-
-        this.deserializationSchema = deserializationSchema;
-        this.schema = schema;
-        this.fieldMapping = fieldMapping;
-        this.masterAddress = masterAddress;
-        this.topic = topic;
-        this.streamIdSet = streamIdSet;
-        this.consumerGroup = consumerGroup;
-        this.configuration = configuration;
-
-        this.proctimeAttribute =
-                validateProcTimeAttribute(proctimeAttribute);
-        this.rowtimeAttributeDescriptors =
-                
validateRowTimeAttributeDescriptors(rowtimeAttributeDescriptors);
-    }
-
-    @Override
-    public TableSchema getTableSchema() {
-        return schema;
-    }
-
-    @Nullable
-    @Override
-    public String getProctimeAttribute() {
-        return proctimeAttribute.orElse(null);
-    }
-
-    @Override
-    public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
-        return rowtimeAttributeDescriptors;
-    }
-
-    @Override
-    public Map<String, String> getFieldMapping() {
-        return fieldMapping;
-    }
-
-    @Override
-    public DataStream<Row> getDataStream(
-            StreamExecutionEnvironment streamExecutionEnvironment) {
-        SourceFunction<Row> sourceFunction =
-                new TubemqSourceFunction<>(
-                        masterAddress,
-                        topic,
-                        streamIdSet,
-                        consumerGroup,
-                        deserializationSchema,
-                        configuration);
-
-        return streamExecutionEnvironment
-                .addSource(sourceFunction)
-                .name(explainSource());
-    }
-
-    private Optional<String> validateProcTimeAttribute(
-            Optional<String> proctimeAttribute) {
-        return proctimeAttribute.map((attribute) -> {
-            Optional<TypeInformation<?>> tpe = schema.getFieldType(attribute);
-            if (!tpe.isPresent()) {
-                throw new ValidationException("Proc time attribute '"
-                        + attribute + "' isn't present in TableSchema.");
-            } else if (tpe.get() != Types.SQL_TIMESTAMP()) {
-                throw new ValidationException("Proc time attribute '"
-                        + attribute + "' isn't of type SQL_TIMESTAMP.");
-            }
-            return attribute;
-        });
-    }
-
-    private List<RowtimeAttributeDescriptor> 
validateRowTimeAttributeDescriptors(
-            List<RowtimeAttributeDescriptor> attributeDescriptors) {
-        checkNotNull(attributeDescriptors);
-
-        for (RowtimeAttributeDescriptor desc : attributeDescriptors) {
-            String name = desc.getAttributeName();
-            Optional<TypeInformation<?>> tpe = schema.getFieldType(name);
-            if (!tpe.isPresent()) {
-                throw new ValidationException("Row time attribute '"
-                        + name + "' is not present.");
-            } else if (tpe.get() != Types.SQL_TIMESTAMP()) {
-                throw new ValidationException("Row time attribute '"
-                        + name + "' is not of type SQL_TIMESTAMP.");
-            }
-        }
-
-        return attributeDescriptors;
-    }
-}
diff --git 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceSinkFactory.java
 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceSinkFactory.java
deleted file mode 100644
index e28e85c1b7..0000000000
--- 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceSinkFactory.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connectors.tubemq;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.SchemaValidator;
-import org.apache.flink.table.factories.DeserializationSchemaFactory;
-import org.apache.flink.table.factories.SerializationSchemaFactory;
-import org.apache.flink.table.factories.StreamTableSinkFactory;
-import org.apache.flink.table.factories.StreamTableSourceFactory;
-import org.apache.flink.table.factories.TableFactoryService;
-import org.apache.flink.table.sinks.StreamTableSink;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.table.sources.StreamTableSource;
-import org.apache.flink.types.Row;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.TreeSet;
-
-import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
-import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
-import static 
org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_CLASS;
-import static 
org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM;
-import static 
org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_SERIALIZED;
-import static 
org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE;
-import static 
org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_CLASS;
-import static 
org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_DELAY;
-import static 
org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_SERIALIZED;
-import static 
org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_PROCTIME;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
-import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE;
-import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND;
-
-/**
- * Factory for creating configured instances of {@link TubemqTableSource}.
- */
-public class TubemqTableSourceSinkFactory
-        implements
-            StreamTableSourceFactory<Row>,
-            StreamTableSinkFactory<Row> {
-
-    private static final String SPLIT_COMMA = ",";
-
-    public TubemqTableSourceSinkFactory() {
-    }
-
-    @Override
-    public Map<String, String> requiredContext() {
-
-        Map<String, String> context = new HashMap<>();
-        context.put(UPDATE_MODE, UPDATE_MODE_VALUE_APPEND);
-        context.put(CONNECTOR_TYPE, 
TubemqValidator.CONNECTOR_TYPE_VALUE_TUBEMQ);
-        context.put(CONNECTOR_PROPERTY_VERSION, "1");
-
-        return context;
-    }
-
-    @Override
-    public List<String> supportedProperties() {
-        List<String> properties = new ArrayList<>();
-
-        // tubemq
-        properties.add(TubemqValidator.CONNECTOR_TOPIC);
-        properties.add(TubemqValidator.CONNECTOR_MASTER);
-        properties.add(TubemqValidator.CONNECTOR_GROUP);
-        properties.add(TubemqValidator.CONNECTOR_STREAMIDS);
-        properties.add(TubemqValidator.CONNECTOR_PROPERTIES + ".*");
-
-        // schema
-        properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
-        properties.add(SCHEMA + ".#." + SCHEMA_NAME);
-        properties.add(SCHEMA + ".#." + SCHEMA_FROM);
-
-        // time attributes
-        properties.add(SCHEMA + ".#." + SCHEMA_PROCTIME);
-        properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE);
-        properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_FROM);
-        properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_CLASS);
-        properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED);
-        properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_TYPE);
-        properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_CLASS);
-        properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_SERIALIZED);
-        properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_DELAY);
-
-        // format wildcard
-        properties.add(FORMAT + ".*");
-
-        return properties;
-    }
-
-    @Override
-    public StreamTableSource<Row> createStreamTableSource(
-            Map<String, String> properties) {
-        final DeserializationSchema<Row> deserializationSchema =
-                getDeserializationSchema(properties);
-
-        final DescriptorProperties descriptorProperties =
-                new DescriptorProperties(true);
-        descriptorProperties.putProperties(properties);
-
-        validateProperties(descriptorProperties);
-
-        final TableSchema schema =
-                descriptorProperties.getTableSchema(SCHEMA);
-        final Optional<String> proctimeAttribute =
-                SchemaValidator.deriveProctimeAttribute(descriptorProperties);
-        final List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors =
-                SchemaValidator.deriveRowtimeAttributes(descriptorProperties);
-        final Map<String, String> fieldMapping =
-                SchemaValidator.deriveFieldMapping(
-                        descriptorProperties,
-                        Optional.of(deserializationSchema.getProducedType()));
-        final String topic =
-                
descriptorProperties.getString(TubemqValidator.CONNECTOR_TOPIC);
-        final String masterAddress =
-                
descriptorProperties.getString(TubemqValidator.CONNECTOR_MASTER);
-        final String consumerGroup =
-                
descriptorProperties.getString(TubemqValidator.CONNECTOR_GROUP);
-        final String streamIds =
-                descriptorProperties
-                        .getOptionalString(TubemqValidator.CONNECTOR_STREAMIDS)
-                        .orElse(null);
-        final Configuration configuration =
-                getConfiguration(descriptorProperties);
-
-        TreeSet<String> streamIdSet = new TreeSet<>();
-        if (streamIds != null) {
-            streamIdSet.addAll(Arrays.asList(streamIds.split(SPLIT_COMMA)));
-        }
-
-        return new TubemqTableSource(
-                deserializationSchema,
-                schema,
-                proctimeAttribute,
-                rowtimeAttributeDescriptors,
-                fieldMapping,
-                masterAddress,
-                topic,
-                streamIdSet,
-                consumerGroup,
-                configuration);
-    }
-
-    @Override
-    public StreamTableSink<Row> createStreamTableSink(
-            Map<String, String> properties) {
-        final SerializationSchema<Row> serializationSchema =
-                getSerializationSchema(properties);
-
-        final DescriptorProperties descriptorProperties =
-                new DescriptorProperties(true);
-        descriptorProperties.putProperties(properties);
-
-        validateProperties(descriptorProperties);
-
-        final TableSchema tableSchema =
-                descriptorProperties.getTableSchema(SCHEMA);
-        final String topic =
-                
descriptorProperties.getString(TubemqValidator.CONNECTOR_TOPIC);
-        final String masterAddress =
-                
descriptorProperties.getString(TubemqValidator.CONNECTOR_MASTER);
-
-        final Configuration configuration =
-                getConfiguration(descriptorProperties);
-
-        return new TubemqTableSink(
-                serializationSchema,
-                tableSchema,
-                topic,
-                masterAddress,
-                configuration);
-    }
-
-    private SerializationSchema<Row> getSerializationSchema(
-            Map<String, String> properties) {
-        @SuppressWarnings("unchecked")
-        final SerializationSchemaFactory<Row> formatFactory =
-                TableFactoryService.find(
-                        SerializationSchemaFactory.class,
-                        properties,
-                        this.getClass().getClassLoader());
-
-        return formatFactory.createSerializationSchema(properties);
-    }
-
-    private void validateProperties(DescriptorProperties descriptorProperties) 
{
-        new SchemaValidator(true, false, false).validate(descriptorProperties);
-        new TubemqValidator().validate(descriptorProperties);
-    }
-
-    private DeserializationSchema<Row> getDeserializationSchema(
-            Map<String, String> properties) {
-        @SuppressWarnings("unchecked")
-        final DeserializationSchemaFactory<Row> formatFactory =
-                TableFactoryService.find(
-                        DeserializationSchemaFactory.class,
-                        properties,
-                        this.getClass().getClassLoader());
-
-        return formatFactory.createDeserializationSchema(properties);
-    }
-
-    private Configuration getConfiguration(
-            DescriptorProperties descriptorProperties) {
-        Map<String, String> properties =
-                
descriptorProperties.getPropertiesWithPrefix(TubemqValidator.CONNECTOR_PROPERTIES);
-
-        Configuration configuration = new Configuration();
-        for (Map.Entry<String, String> property : properties.entrySet()) {
-            configuration.setString(property.getKey(), property.getValue());
-        }
-
-        return configuration;
-    }
-}
diff --git 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqValidator.java
 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqValidator.java
deleted file mode 100644
index bddaec4eea..0000000000
--- 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqValidator.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connectors.tubemq;
-
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-
-/**
- * The validator for {@link Tubemq}.
- */
-public class TubemqValidator extends ConnectorDescriptorValidator {
-
-    /**
-     * The type of connector.
-     */
-    public static final String CONNECTOR_TYPE_VALUE_TUBEMQ = "tubemq";
-
-    /**
-     * The address of tubemq master.
-     */
-    public static final String CONNECTOR_MASTER = "connector.master";
-
-    /**
-     * The tubemq topic name.
-     */
-    public static final String CONNECTOR_TOPIC = "connector.topic";
-
-    /**
-     * The tubemq (consumer or producer) group name.
-     */
-    public static final String CONNECTOR_GROUP = "connector.group";
-
-    /**
-     * The tubemq consumers use these streamIds to filter records reading from 
server.
-     */
-    public static final String CONNECTOR_STREAMIDS = "connector.stream-ids";
-
-    /**
-     * The prefix of tubemq properties (optional).
-     */
-    public static final String CONNECTOR_PROPERTIES = "connector.properties";
-
-    @Override
-    public void validate(DescriptorProperties properties) {
-        super.validate(properties);
-
-        // Validates that the connector type is tubemq.
-        properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_TUBEMQ, 
false);
-
-        // Validate that the topic name is set.
-        properties.validateString(CONNECTOR_TOPIC, false, 1, 
Integer.MAX_VALUE);
-
-        // Validate that the master address is set.
-        properties.validateString(CONNECTOR_MASTER, false, 1, 
Integer.MAX_VALUE);
-
-        // Validate that the group name is set.
-        properties.validateString(CONNECTOR_GROUP, false, 1, 
Integer.MAX_VALUE);
-
-        // Validate that the streamIds is set.
-        properties.validateString(CONNECTOR_STREAMIDS, true, 1, 
Integer.MAX_VALUE);
-    }
-}
diff --git 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
deleted file mode 100644
index 30831743b7..0000000000
--- 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.flink.connectors.tubemq.TubemqTableSourceSinkFactory
diff --git 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/java/org/apache/flink/connectors/tubemq/TubemqTest.java
 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/java/org/apache/flink/connectors/tubemq/TubemqTest.java
deleted file mode 100644
index 7f0e68def3..0000000000
--- 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/java/org/apache/flink/connectors/tubemq/TubemqTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connectors.tubemq;
-
-import org.apache.flink.table.descriptors.Descriptor;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.DescriptorTestBase;
-import org.apache.flink.table.descriptors.DescriptorValidator;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Unit tests for {@link Tubemq}.
- */
-public class TubemqTest extends DescriptorTestBase {
-
-    @Override
-    protected List<Descriptor> descriptors() {
-        final Descriptor descriptor1 =
-                new Tubemq()
-                        .topic("test-topic-1")
-                        .master("localhost:9001")
-                        .group("test-group-1");
-
-        final Descriptor descriptor2 =
-                new Tubemq()
-                        .topic("test-topic-2")
-                        .master("localhost:9001")
-                        .group("test-group-2")
-                        .property("bootstrap.from.max", "true");
-
-        final Descriptor descriptor3 =
-                new Tubemq()
-                        .topic("test-topic-3")
-                        .master("localhost:9001")
-                        .group("test-group-3")
-                        .streamIds("test-streamId-1,test-streamId-2");
-
-        return Arrays.asList(descriptor1, descriptor2, descriptor3);
-    }
-
-    @Override
-    protected List<Map<String, String>> properties() {
-        final Map<String, String> props1 = new HashMap<>();
-        props1.put("connector.property-version", "1");
-        props1.put("connector.type", "tubemq");
-        props1.put("connector.master", "localhost:9001");
-        props1.put("connector.topic", "test-topic-1");
-        props1.put("connector.group", "test-group-1");
-
-        final Map<String, String> props2 = new HashMap<>();
-        props2.put("connector.property-version", "1");
-        props2.put("connector.type", "tubemq");
-        props2.put("connector.master", "localhost:9001");
-        props2.put("connector.topic", "test-topic-2");
-        props2.put("connector.group", "test-group-2");
-        props2.put("connector.properties.bootstrap.from.max", "true");
-
-        final Map<String, String> props3 = new HashMap<>();
-        props3.put("connector.property-version", "1");
-        props3.put("connector.type", "tubemq");
-        props3.put("connector.master", "localhost:9001");
-        props3.put("connector.topic", "test-topic-3");
-        props3.put("connector.stream-ids", "test-streamId-1,test-streamId-2");
-        props3.put("connector.group", "test-group-3");
-
-        return Arrays.asList(props1, props2, props3);
-    }
-
-    @Override
-    protected DescriptorValidator validator() {
-        return new TubemqValidator();
-    }
-
-    @Test
-    public void testTubePropertiesValidator() {
-        DescriptorValidator validator = this.validator();
-        DescriptorProperties descriptorProperties = new DescriptorProperties();
-        descriptorProperties.putProperties(properties().get(0));
-        validator.validate(descriptorProperties);
-    }
-}
diff --git 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/resources/log4j2.properties
 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/resources/log4j2.properties
deleted file mode 100644
index 1da9b515e3..0000000000
--- 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/resources/log4j2.properties
+++ /dev/null
@@ -1,21 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-rootLogger=info, A1
-# A1 is set to be a ConsoleAppender.
-appender.A1.name=A1
-appender.A1.type=Console

Reply via email to