This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f8bd82cd2 [INLONG-6944][SDK] Remove useless code of SortSDK (#6951)
f8bd82cd2 is described below

commit f8bd82cd23849de2f80bf4bb9f23aa9aadb2afda
Author: vernedeng <deng...@pku.edu.cn>
AuthorDate: Mon Dec 19 16:34:40 2022 +0800

    [INLONG-6944][SDK] Remove useless code of SortSDK (#6951)
---
 .../inlong/sdk/sort/api/InLongTopicFetcher.java    |  82 ----
 .../inlong/sdk/sort/api/InlongTopicManager.java    |  49 --
 .../inlong/sdk/sort/api/SortClientFactory.java     |  25 --
 .../sdk/sort/impl/InlongTopicManagerImpl.java      | 492 ---------------------
 .../inlong/sdk/sort/impl/ManagerReporter.java      |  30 +-
 .../inlong/sdk/sort/impl/SortClientImpl.java       |  33 +-
 .../inlong/sdk/sort/impl/SortClientImplV2.java     | 159 -------
 .../sort/impl/kafka/InLongKafkaFetcherImpl.java    | 343 --------------
 .../sort/impl/pulsar/InLongPulsarFetcherImpl.java  | 332 --------------
 .../sdk/sort/impl/tube/InLongTubeFetcherImpl.java  | 320 --------------
 .../sdk/sort/impl/InlongTopicManagerImplTest.java  |  36 +-
 .../sort/impl/decode/MessageDeserializerTest.java  |  17 -
 .../impl/kafka/InLongKafkaFetcherImplTest.java     |  33 +-
 .../impl/pulsar/InLongPulsarFetcherImplTest.java   |  59 +--
 .../sort/impl/tube/InLongTubeFetcherImplTest.java  |  19 +-
 .../standalone/source/sortsdk/SortSdkSource.java   |   6 +-
 16 files changed, 96 insertions(+), 1939 deletions(-)

diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InLongTopicFetcher.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InLongTopicFetcher.java
deleted file mode 100644
index f65645129..000000000
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InLongTopicFetcher.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.sort.api;
-
-import org.apache.inlong.sdk.sort.entity.InLongTopic;
-import org.apache.inlong.sdk.sort.impl.decode.MessageDeserializer;
-import org.apache.inlong.sdk.sort.interceptor.MsgTimeInterceptor;
-
-import java.util.Objects;
-import java.util.Optional;
-
-@Deprecated
-public abstract class InLongTopicFetcher {
-
-    protected InLongTopic inLongTopic;
-    protected ClientContext context;
-    protected Deserializer deserializer;
-    protected volatile Thread fetchThread;
-    protected volatile boolean closed = false;
-    protected volatile boolean isStopConsume = false;
-    // use for empty topic to sleep
-    protected long sleepTime = 0L;
-    protected int emptyFetchTimes = 0;
-    // for rollback
-    protected Interceptor interceptor;
-    protected Seeker seeker;
-
-    public InLongTopicFetcher(InLongTopic inLongTopic, ClientContext context) {
-        this.inLongTopic = inLongTopic;
-        this.context = context;
-        this.deserializer = new MessageDeserializer();
-        this.interceptor = new MsgTimeInterceptor();
-        this.interceptor.configure(inLongTopic);
-    }
-
-    public abstract boolean init(Object client);
-
-    public abstract void ack(String msgOffset) throws Exception;
-
-    public abstract void pause();
-
-    public abstract void resume();
-
-    public abstract boolean close();
-
-    public abstract boolean isClosed();
-
-    public abstract void stopConsume(boolean stopConsume);
-
-    public abstract boolean isConsumeStop();
-
-    public abstract InLongTopic getInLongTopic();
-
-    public abstract long getConsumedDataSize();
-
-    public abstract long getAckedOffset();
-
-    public boolean updateTopic(InLongTopic topic) {
-        if (Objects.equals(inLongTopic, topic)) {
-            return false;
-        }
-        this.inLongTopic = topic;
-        Optional.ofNullable(seeker).ifPresent(seeker -> 
seeker.configure(inLongTopic));
-        Optional.ofNullable(interceptor).ifPresent(interceptor -> 
interceptor.configure(inLongTopic));
-        return true;
-    }
-}
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManager.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManager.java
deleted file mode 100644
index 4bcfee17a..000000000
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManager.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.sort.api;
-
-import java.util.Collection;
-import java.util.Set;
-import org.apache.inlong.sdk.sort.entity.InLongTopic;
-
-@Deprecated
-public abstract class InlongTopicManager implements Cleanable {
-
-    protected ClientContext context;
-    protected QueryConsumeConfig queryConsumeConfig;
-
-    public InlongTopicManager(ClientContext context, QueryConsumeConfig 
queryConsumeConfig) {
-        this.context = context;
-        this.queryConsumeConfig = queryConsumeConfig;
-    }
-
-    public abstract InLongTopicFetcher addFetcher(InLongTopic inLongTopic);
-
-    public abstract InLongTopicFetcher removeFetcher(InLongTopic inLongTopic, 
boolean closeFetcher);
-
-    public abstract InLongTopicFetcher getFetcher(String fetchKey);
-
-    public abstract Collection<InLongTopicFetcher> getAllFetchers();
-
-    public abstract Set<String> getManagedInLongTopics();
-
-    public abstract void offlineAllTopicsAndPartitions();
-
-    public abstract void close();
-
-}
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientFactory.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientFactory.java
index 8feba4d06..b5b0a4113 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientFactory.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientFactory.java
@@ -18,43 +18,18 @@
 package org.apache.inlong.sdk.sort.api;
 
 import org.apache.inlong.sdk.sort.impl.SortClientImpl;
-import org.apache.inlong.sdk.sort.impl.SortClientImplV2;
 
 /**
  * Factory of {@link SortClient}
  */
 public class SortClientFactory {
 
-    /**
-     * create default SortClient
-     *
-     * @param config SortClientConfig
-     * @return SortClient
-     */
     public static SortClient createSortClient(SortClientConfig config) {
         return new SortClientImpl(config);
     }
 
-    /**
-     * create SortClient with user defined QueryConsumeConfig ,MetricReporter 
and ManagerReportHandler
-     *
-     * @param config SortClientConfig
-     * @param queryConsumeConfig QueryConsumeConfig
-     * @param reporter MetricReporter
-     * @param reportHandler ManagerReportHandler
-     * @return SortClient
-     */
     public static SortClient createSortClient(SortClientConfig config, 
QueryConsumeConfig queryConsumeConfig,
             MetricReporter reporter, ManagerReportHandler reportHandler) {
         return new SortClientImpl(config, queryConsumeConfig, reporter, 
reportHandler);
     }
-
-    public static SortClient createSortClientV2(SortClientConfig config) {
-        return new SortClientImplV2(config);
-    }
-
-    public static SortClient createSortClientV2(SortClientConfig config, 
QueryConsumeConfig queryConsumeConfig,
-            MetricReporter reporter, ManagerReportHandler reportHandler) {
-        return new SortClientImplV2(config, queryConsumeConfig, reporter, 
reportHandler);
-    }
 }
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InlongTopicManagerImpl.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InlongTopicManagerImpl.java
deleted file mode 100644
index 41e41a735..000000000
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InlongTopicManagerImpl.java
+++ /dev/null
@@ -1,492 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.sort.impl;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import org.apache.inlong.sdk.sort.api.ClientContext;
-import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
-import org.apache.inlong.sdk.sort.api.InlongTopicManager;
-import org.apache.inlong.sdk.sort.api.InlongTopicTypeEnum;
-import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
-import org.apache.inlong.sdk.sort.entity.ConsumeConfig;
-import org.apache.inlong.sdk.sort.entity.InLongTopic;
-import org.apache.inlong.sdk.sort.impl.kafka.InLongKafkaFetcherImpl;
-import org.apache.inlong.sdk.sort.impl.pulsar.InLongPulsarFetcherImpl;
-import org.apache.inlong.sdk.sort.impl.tube.InLongTubeFetcherImpl;
-import org.apache.inlong.sdk.sort.fetcher.tube.TubeConsumerCreator;
-import org.apache.inlong.sdk.sort.util.PeriodicTask;
-import org.apache.inlong.sdk.sort.util.StringUtil;
-import org.apache.inlong.tubemq.client.config.TubeClientConfig;
-import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
-import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
-import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Deprecated
-public class InlongTopicManagerImpl extends InlongTopicManager {
-
-    private final Logger logger = 
LoggerFactory.getLogger(InlongTopicManagerImpl.class);
-
-    private final ConcurrentHashMap<String, InLongTopicFetcher> fetchers = new 
ConcurrentHashMap<>();
-    private final ConcurrentHashMap<String, PulsarClient> pulsarClients = new 
ConcurrentHashMap<>();
-    private final ConcurrentHashMap<String, TubeConsumerCreator> tubeFactories 
= new ConcurrentHashMap<>();
-
-    private final PeriodicTask updateMetaDataWorker;
-    private volatile List<String> toBeSelectFetchers = new ArrayList<>();
-    private boolean stopAssign = false;
-
-    public InlongTopicManagerImpl(ClientContext context, QueryConsumeConfig 
queryConsumeConfig) {
-        super(context, queryConsumeConfig);
-        updateMetaDataWorker = new 
UpdateMetaDataThread(context.getConfig().getUpdateMetaDataIntervalSec(),
-                TimeUnit.SECONDS);
-        String threadName = "sortsdk_inlongtopic_manager_" + 
context.getConfig().getSortTaskId()
-                + "_" + StringUtil.formatDate(new Date(), "yyyy-MM-dd 
HH:mm:ss");
-        updateMetaDataWorker.start(threadName);
-    }
-
-    private void updateToBeSelectFetchers(Collection<String> c) {
-        toBeSelectFetchers = new ArrayList<>(c);
-    }
-
-    private boolean initFetcher(InLongTopicFetcher fetcher, InLongTopic 
inLongTopic) {
-        if 
(InlongTopicTypeEnum.PULSAR.getName().equalsIgnoreCase(inLongTopic.getTopicType()))
 {
-            logger.info("create fetcher topic is pulsar {}", inLongTopic);
-            return 
fetcher.init(pulsarClients.get(inLongTopic.getInLongCluster().getClusterId()));
-        } else if 
(InlongTopicTypeEnum.KAFKA.getName().equalsIgnoreCase(inLongTopic.getTopicType()))
 {
-            logger.info("create fetcher topic is kafka {}", inLongTopic);
-            return 
fetcher.init(inLongTopic.getInLongCluster().getBootstraps());
-        } else if 
(InlongTopicTypeEnum.TUBE.getName().equalsIgnoreCase(inLongTopic.getTopicType()))
 {
-            logger.info("create fetcher topic is tube {}", inLongTopic);
-            return 
fetcher.init(tubeFactories.get(inLongTopic.getInLongCluster().getClusterId()));
-        } else {
-            logger.error("create fetcher topic type not support " + 
inLongTopic.getTopicType());
-            return false;
-        }
-    }
-
-    @Override
-    public InLongTopicFetcher addFetcher(InLongTopic inLongTopic) {
-
-        try {
-            InLongTopicFetcher result = 
fetchers.get(inLongTopic.getTopicKey());
-            if (result == null) {
-                // create fetcher (pulsar,tube,kafka)
-                InLongTopicFetcher inLongTopicFetcher = 
createInLongTopicFetcher(inLongTopic);
-                InLongTopicFetcher preValue = 
fetchers.putIfAbsent(inLongTopic.getTopicKey(), inLongTopicFetcher);
-                logger.info("addFetcher :{}", inLongTopic.getTopicKey());
-                if (preValue != null) {
-                    result = preValue;
-                    if (inLongTopicFetcher != null) {
-                        inLongTopicFetcher.close();
-                    }
-                    logger.info("addFetcher create same fetcher {}", 
inLongTopic);
-                } else {
-                    result = inLongTopicFetcher;
-                    if (result != null
-                            && !initFetcher(result, inLongTopic)) {
-                        logger.info("addFetcher init fail {}", 
inLongTopic.getTopicKey());
-                        result.close();
-                        result = null;
-                    }
-                }
-            }
-            return result;
-        } finally {
-            updateToBeSelectFetchers(fetchers.keySet());
-        }
-    }
-
-    /**
-     * create fetcher (pulsar,tube,kafka)
-     *
-     * @param inLongTopic {@link InLongTopic}
-     * @return {@link InLongTopicFetcher}
-     */
-    private InLongTopicFetcher createInLongTopicFetcher(InLongTopic 
inLongTopic) {
-        if 
(InlongTopicTypeEnum.PULSAR.getName().equalsIgnoreCase(inLongTopic.getTopicType()))
 {
-            logger.info("the topic is pulsar {}", inLongTopic);
-            return new InLongPulsarFetcherImpl(inLongTopic, context);
-        } else if 
(InlongTopicTypeEnum.KAFKA.getName().equalsIgnoreCase(inLongTopic.getTopicType()))
 {
-            logger.info("the topic is kafka {}", inLongTopic);
-            return new InLongKafkaFetcherImpl(inLongTopic, context);
-        } else if 
(InlongTopicTypeEnum.TUBE.getName().equalsIgnoreCase(inLongTopic.getTopicType()))
 {
-            logger.info("the topic is tube {}", inLongTopic);
-            return new InLongTubeFetcherImpl(inLongTopic, context);
-        } else {
-            logger.error("topic type not support " + 
inLongTopic.getTopicType());
-            return null;
-        }
-    }
-
-    @Override
-    public InLongTopicFetcher removeFetcher(InLongTopic inLongTopic, boolean 
closeFetcher) {
-        InLongTopicFetcher result = fetchers.remove(inLongTopic.getTopicKey());
-        if (result != null && closeFetcher) {
-            result.close();
-        }
-        return result;
-    }
-
-    @Override
-    public InLongTopicFetcher getFetcher(String fetchKey) {
-        return fetchers.get(fetchKey);
-    }
-
-    @Override
-    public Set<String> getManagedInLongTopics() {
-        return new HashSet<>(fetchers.keySet());
-    }
-
-    @Override
-    public Collection<InLongTopicFetcher> getAllFetchers() {
-        return fetchers.values();
-    }
-
-    /**
-     * offline all inlong topic
-     */
-    @Override
-    public void offlineAllTopicsAndPartitions() {
-        String subscribeId = context.getConfig().getSortTaskId();
-        try {
-            logger.info("start offline {}", subscribeId);
-            stopAssign = true;
-            closeAllFetcher();
-            logger.info("close finished {}", subscribeId);
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-        }
-    }
-
-    @Override
-    public void close() {
-        if (updateMetaDataWorker != null) {
-            updateMetaDataWorker.stop();
-        }
-    }
-
-    @Override
-    public boolean clean() {
-        String sortTaskId = context.getConfig().getSortTaskId();
-        try {
-            logger.info("start close {}", sortTaskId);
-
-            if (updateMetaDataWorker != null) {
-                updateMetaDataWorker.stop();
-            }
-
-            closeFetcher();
-            closePulsarClient();
-            closeTubeSessionFactory();
-            logger.info("close finished {}", sortTaskId);
-            return true;
-        } catch (Throwable th) {
-            logger.error("close error " + sortTaskId, th);
-        }
-        return false;
-    }
-
-    private void closeAllFetcher() {
-        closeFetcher();
-    }
-
-    private void closeFetcher() {
-        Set<Entry<String, InLongTopicFetcher>> entries = fetchers.entrySet();
-        for (Entry<String, InLongTopicFetcher> entry : entries) {
-            String fetchKey = entry.getKey();
-            InLongTopicFetcher inLongTopicFetcher = entry.getValue();
-            boolean succ = false;
-            if (inLongTopicFetcher != null) {
-                try {
-                    succ = inLongTopicFetcher.close();
-                } catch (Exception e) {
-                    logger.error(e.getMessage(), e);
-                }
-            }
-            logger.info(" close fetcher{} {}", fetchKey, succ);
-        }
-    }
-
-    private void closePulsarClient() {
-        for (Map.Entry<String, PulsarClient> entry : pulsarClients.entrySet()) 
{
-            PulsarClient pulsarClient = entry.getValue();
-            String key = entry.getKey();
-            try {
-                if (pulsarClient != null) {
-                    pulsarClient.close();
-                }
-            } catch (Exception e) {
-                logger.error("close PulsarClient" + key + " error.", e);
-            }
-        }
-        pulsarClients.clear();
-    }
-
-    private void closeTubeSessionFactory() {
-        for (Map.Entry<String, TubeConsumerCreator> entry : 
tubeFactories.entrySet()) {
-            MessageSessionFactory tubeMessageSessionFactory = 
entry.getValue().getMessageSessionFactory();
-            String key = entry.getKey();
-            try {
-                if (tubeMessageSessionFactory != null) {
-                    tubeMessageSessionFactory.shutdown();
-                }
-            } catch (Exception e) {
-                logger.error("close MessageSessionFactory" + key + " error.", 
e);
-            }
-        }
-        tubeFactories.clear();
-    }
-
-    private List<String> getNewTopics(List<InLongTopic> 
newSubscribedInLongTopics) {
-        if (newSubscribedInLongTopics != null && 
newSubscribedInLongTopics.size() > 0) {
-            List<String> newTopics = new ArrayList<>();
-            for (InLongTopic inLongTopic : newSubscribedInLongTopics) {
-                newTopics.add(inLongTopic.getTopicKey());
-            }
-            return newTopics;
-        }
-        return null;
-    }
-
-    private void handleCurrentConsumeConfig(List<InLongTopic> 
currentConsumeConfig) {
-        if (null == currentConsumeConfig) {
-            logger.warn("List<InLongTopic> currentConsumeConfig is null");
-            return;
-        }
-
-        List<InLongTopic> newConsumeConfig = new 
ArrayList<>(currentConsumeConfig);
-        logger.debug("newConsumeConfig List:{}", 
Arrays.toString(newConsumeConfig.toArray()));
-        List<String> newTopics = getNewTopics(newConsumeConfig);
-        logger.debug("newTopics :{}", Arrays.toString(newTopics.toArray()));
-
-        List<String> oldInLongTopics = new ArrayList<>(fetchers.keySet());
-        logger.debug("oldInLongTopics :{}", 
Arrays.toString(oldInLongTopics.toArray()));
-        // get need be offlined topics
-        oldInLongTopics.removeAll(newTopics);
-        logger.debug("removed oldInLongTopics :{}", 
Arrays.toString(oldInLongTopics.toArray()));
-
-        // get new topics
-        newTopics.removeAll(new ArrayList<>(fetchers.keySet()));
-        logger.debug("really new topics :{}", 
Arrays.toString(newTopics.toArray()));
-        // offline need be offlined topics
-        offlineRmovedTopic(oldInLongTopics);
-        // online new topics
-        onlineNewTopic(newConsumeConfig, newTopics);
-    }
-
-    /**
-     * offline inlong topic which not belong the sortTaskId
-     *
-     * @param oldInLongTopics {@link List}
-     */
-    private void offlineRmovedTopic(List<String> oldInLongTopics) {
-        for (String fetchKey : oldInLongTopics) {
-            logger.info("offlineRmovedTopic {}", fetchKey);
-            InLongTopic inLongTopic = fetchers.get(fetchKey).getInLongTopic();
-            InLongTopicFetcher inLongTopicFetcher = 
fetchers.getOrDefault(fetchKey, null);
-            if (inLongTopicFetcher != null) {
-                inLongTopicFetcher.close();
-            }
-            fetchers.remove(fetchKey);
-            if (context != null && context.getStatManager() != null && 
inLongTopic != null) {
-                context.getStatManager()
-                        .getStatistics(context.getConfig().getSortTaskId(),
-                                inLongTopic.getInLongCluster().getClusterId(),
-                                inLongTopic.getTopic())
-                        .addTopicOfflineTimes(1);
-            } else {
-                logger.error("context == null or context.getStatManager() == 
null or inLongTopic == null :{}",
-                        inLongTopic);
-            }
-        }
-    }
-
-    /**
-     * online new inlong topic
-     *
-     * @param newSubscribedInLongTopics List
-     * @param reallyNewTopic List
-     */
-    private void onlineNewTopic(List<InLongTopic> newSubscribedInLongTopics, 
List<String> reallyNewTopic) {
-        for (InLongTopic inLongTopic : newSubscribedInLongTopics) {
-            if (!reallyNewTopic.contains(inLongTopic.getTopicKey())) {
-                
logger.debug("!reallyNewTopic.contains(inLongTopic.getTopicKey())");
-                continue;
-            }
-            onlineTopic(inLongTopic);
-        }
-    }
-
-    private void onlineTopic(InLongTopic inLongTopic) {
-        if 
(InlongTopicTypeEnum.PULSAR.getName().equalsIgnoreCase(inLongTopic.getTopicType()))
 {
-            logger.info("the topic is pulsar:{}", inLongTopic);
-            onlinePulsarTopic(inLongTopic);
-        } else if 
(InlongTopicTypeEnum.KAFKA.getName().equalsIgnoreCase(inLongTopic.getTopicType()))
 {
-            logger.info("the topic is kafka:{}", inLongTopic);
-            onlineKafkaTopic(inLongTopic);
-        } else if 
(InlongTopicTypeEnum.TUBE.getName().equalsIgnoreCase(inLongTopic.getTopicType()))
 {
-            logger.info("the topic is tube:{}", inLongTopic);
-            onlineTubeTopic(inLongTopic);
-        } else {
-            logger.error("topic type:{} not support", 
inLongTopic.getTopicType());
-        }
-    }
-
-    private void onlinePulsarTopic(InLongTopic inLongTopic) {
-        if (!checkAndCreateNewPulsarClient(inLongTopic)) {
-            logger.error("checkAndCreateNewPulsarClient error:{}", 
inLongTopic);
-            return;
-        }
-        createNewFetcher(inLongTopic);
-    }
-
-    private boolean checkAndCreateNewPulsarClient(InLongTopic inLongTopic) {
-        if 
(!pulsarClients.containsKey(inLongTopic.getInLongCluster().getClusterId())) {
-            if (inLongTopic.getInLongCluster().getBootstraps() != null) {
-                try {
-                    PulsarClient pulsarClient = PulsarClient.builder()
-                            
.serviceUrl(inLongTopic.getInLongCluster().getBootstraps())
-                            
.authentication(AuthenticationFactory.token(inLongTopic.getInLongCluster().getToken()))
-                            
.statsInterval(context.getConfig().getStatsIntervalSeconds(), TimeUnit.SECONDS)
-                            .build();
-                    
pulsarClients.put(inLongTopic.getInLongCluster().getClusterId(), pulsarClient);
-                    logger.debug("create pulsar client succ {}",
-                            new 
String[]{inLongTopic.getInLongCluster().getClusterId(),
-                                    
inLongTopic.getInLongCluster().getBootstraps(),
-                                    
inLongTopic.getInLongCluster().getToken()});
-                } catch (Exception e) {
-                    logger.error("create pulsar client error {}", inLongTopic);
-                    logger.error(e.getMessage(), e);
-                    return false;
-                }
-            } else {
-                logger.error("bootstrap is null {}", 
inLongTopic.getInLongCluster());
-                return false;
-            }
-        }
-        logger.info("create pulsar client true {}", inLongTopic);
-        return true;
-    }
-
-    private boolean checkAndCreateNewTubeSessionFactory(InLongTopic 
inLongTopic) {
-        if 
(!tubeFactories.containsKey(inLongTopic.getInLongCluster().getClusterId())) {
-            if (inLongTopic.getInLongCluster().getBootstraps() != null) {
-                try {
-                    // create MessageSessionFactory
-                    TubeClientConfig tubeConfig = new 
TubeClientConfig(inLongTopic.getInLongCluster().getBootstraps());
-                    MessageSessionFactory messageSessionFactory = new 
TubeSingleSessionFactory(tubeConfig);
-                    TubeConsumerCreator tubeConsumerCreator = new 
TubeConsumerCreator(messageSessionFactory,
-                            tubeConfig);
-                    
tubeFactories.put(inLongTopic.getInLongCluster().getClusterId(), 
tubeConsumerCreator);
-                    logger.debug("create tube client succ {} {} {}",
-                            new 
String[]{inLongTopic.getInLongCluster().getClusterId(),
-                                    
inLongTopic.getInLongCluster().getBootstraps(),
-                                    
inLongTopic.getInLongCluster().getToken()});
-                } catch (Exception e) {
-                    logger.error("create tube client error {}", inLongTopic);
-                    logger.error(e.getMessage(), e);
-                    return false;
-                }
-            } else {
-                logger.info("bootstrap is null {}", 
inLongTopic.getInLongCluster());
-                return false;
-            }
-        }
-        logger.info("create pulsar client true {}", inLongTopic);
-        return true;
-    }
-
-    private void onlineKafkaTopic(InLongTopic inLongTopic) {
-        createNewFetcher(inLongTopic);
-    }
-
-    private void onlineTubeTopic(InLongTopic inLongTopic) {
-        if (!checkAndCreateNewTubeSessionFactory(inLongTopic)) {
-            logger.error("checkAndCreateNewPulsarClient error:{}", 
inLongTopic);
-            return;
-        }
-        createNewFetcher(inLongTopic);
-    }
-
-    private void createNewFetcher(InLongTopic inLongTopic) {
-        if (!fetchers.containsKey(inLongTopic.getTopicKey())) {
-            logger.info("begin add Fetcher:{}", inLongTopic.getTopicKey());
-            if (context != null && context.getStatManager() != null) {
-                context.getStatManager()
-                        .getStatistics(context.getConfig().getSortTaskId(),
-                                inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
-                        .addTopicOnlineTimes(1);
-                InLongTopicFetcher fetcher = addFetcher(inLongTopic);
-                if (fetcher == null) {
-                    fetchers.remove(inLongTopic.getTopicKey());
-                    logger.error("add fetcher error:{}", 
inLongTopic.getTopicKey());
-                }
-            } else {
-                logger.error("context == null or context.getStatManager() == 
null");
-            }
-        }
-    }
-
-    private class UpdateMetaDataThread extends PeriodicTask {
-
-        public UpdateMetaDataThread(long runInterval, TimeUnit timeUnit) {
-            super(runInterval, timeUnit, context.getConfig());
-        }
-
-        @Override
-        protected void doWork() {
-            logger.debug("InLongTopicManagerImpl doWork");
-            if (stopAssign) {
-                logger.warn("assign is stoped");
-                return;
-            }
-            // get sortTask conf from manager
-            if (queryConsumeConfig != null) {
-                long start = System.currentTimeMillis();
-                
context.getStatManager().getStatistics(context.getConfig().getSortTaskId())
-                        .addRequestManagerTimes(1);
-                ConsumeConfig consumeConfig = queryConsumeConfig
-                        
.queryCurrentConsumeConfig(context.getConfig().getSortTaskId());
-                
context.getStatManager().getStatistics(context.getConfig().getSortTaskId())
-                        .addRequestManagerTimeCost(System.currentTimeMillis() 
- start);
-
-                if (consumeConfig != null) {
-                    handleCurrentConsumeConfig(consumeConfig.getTopics());
-                } else {
-                    logger.warn("subscribedInfo is null");
-                    
context.getStatManager().getStatistics(context.getConfig().getSortTaskId())
-                            .addRequestManagerFailTimes(1);
-                }
-            } else {
-                logger.error("subscribedMetaDataInfo is null");
-            }
-        }
-    }
-}
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/ManagerReporter.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/ManagerReporter.java
index 8c7a3b2a8..f2fdd5668 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/ManagerReporter.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/ManagerReporter.java
@@ -25,22 +25,23 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import org.apache.inlong.sdk.sort.api.ClientContext;
-import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
-import org.apache.inlong.sdk.sort.api.InlongTopicManager;
 import org.apache.inlong.sdk.sort.api.ManagerReportHandler;
 import org.apache.inlong.sdk.sort.api.ReportApi;
+import org.apache.inlong.sdk.sort.api.TopicFetcher;
+import org.apache.inlong.sdk.sort.api.TopicManager;
 import org.apache.inlong.sdk.sort.entity.ConsumeState;
 import org.apache.inlong.sdk.sort.entity.ConsumeStatusParams;
 import org.apache.inlong.sdk.sort.entity.ConsumeStatusResult;
 import org.apache.inlong.sdk.sort.entity.HeartBeatParams;
 import org.apache.inlong.sdk.sort.entity.HeartBeatResult;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
 import org.apache.inlong.sdk.sort.util.PeriodicTask;
 
 public class ManagerReporter extends PeriodicTask {
 
     private final ConcurrentHashMap<Integer, Long> reportApiRunTimeMs = new 
ConcurrentHashMap<>();
     private final ClientContext context;
-    private final InlongTopicManager inLongTopicManager;
+    private final TopicManager inLongTopicManager;
     private final ManagerReportHandler reportHandler;
     private Map<Integer, Long> reportApiInterval = new HashMap<>();
 
@@ -53,8 +54,7 @@ public class ManagerReporter extends PeriodicTask {
      * @param runInterval long
      * @param timeUnit TimeUnit
      */
-    public ManagerReporter(ClientContext context, ManagerReportHandler 
reportHandler,
-            InlongTopicManager inLongTopicManager,
+    public ManagerReporter(ClientContext context, ManagerReportHandler 
reportHandler, TopicManager inLongTopicManager,
             long runInterval, TimeUnit timeUnit) {
         super(runInterval, timeUnit, context.getConfig());
         this.context = context;
@@ -158,17 +158,17 @@ public class ManagerReporter extends PeriodicTask {
             
consumeStatusParams.setSubscribedId(context.getConfig().getSortTaskId());
             consumeStatusParams.setIp(context.getConfig().getLocalIp());
             List<ConsumeState> consumeStates = new ArrayList<>();
-            Collection<InLongTopicFetcher> allFetchers =
+            Collection<TopicFetcher> allFetchers =
                     inLongTopicManager.getAllFetchers();
-            for (InLongTopicFetcher fetcher : allFetchers) {
-                ConsumeState consumeState = new ConsumeState();
-                consumeState.setTopic(fetcher.getInLongTopic().getTopic());
-                
consumeState.setTopicType(fetcher.getInLongTopic().getTopicType());
-                
consumeState.setClusterId(fetcher.getInLongTopic().getInLongCluster().getClusterId());
-                
consumeState.setConsumedDataSize(fetcher.getConsumedDataSize());
-                consumeState.setAckOffset(fetcher.getAckedOffset());
-                
consumeState.setPartition(fetcher.getInLongTopic().getPartitionId());
-                consumeStates.add(consumeState);
+            for (TopicFetcher fetcher : allFetchers) {
+                for (InLongTopic topic : fetcher.getTopics()) {
+                    ConsumeState consumeState = new ConsumeState();
+                    consumeState.setTopic(topic.getTopic());
+                    consumeState.setTopicType(topic.getTopicType());
+                    
consumeState.setClusterId(topic.getInLongCluster().getClusterId());
+                    consumeState.setPartition(topic.getPartitionId());
+                    consumeStates.add(consumeState);
+                }
             }
             consumeStatusParams.setConsumeStates(consumeStates);
 
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImpl.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImpl.java
index 753c39f60..a381b7910 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImpl.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImpl.java
@@ -19,18 +19,21 @@ package org.apache.inlong.sdk.sort.impl;
 
 import org.apache.inlong.sdk.sort.api.Cleanable;
 import org.apache.inlong.sdk.sort.api.ClientContext;
-import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
-import org.apache.inlong.sdk.sort.api.InlongTopicManager;
 import org.apache.inlong.sdk.sort.api.ManagerReportHandler;
 import org.apache.inlong.sdk.sort.api.MetricReporter;
 import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
 import org.apache.inlong.sdk.sort.api.SortClient;
 import org.apache.inlong.sdk.sort.api.SortClientConfig;
+import org.apache.inlong.sdk.sort.api.TopicFetcher;
+import org.apache.inlong.sdk.sort.api.TopicManager;
 import org.apache.inlong.sdk.sort.exception.NotExistException;
+import org.apache.inlong.sdk.sort.api.InlongTopicManagerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Deprecated
+/**
+ * New version of sort client.
+ */
 public class SortClientImpl extends SortClient {
 
     private final String logPrefix = "[" + 
SortClientImpl.class.getSimpleName() + "] ";
@@ -40,7 +43,7 @@ public class SortClientImpl extends SortClient {
 
     private final ClientContext context;
 
-    private final InlongTopicManager inLongTopicManager;
+    private final TopicManager inLongTopicManager;
 
     /**
      * SortClient Constructor
@@ -51,7 +54,10 @@ public class SortClientImpl extends SortClient {
         try {
             this.sortClientConfig = sortClientConfig;
             this.context = new ClientContextImpl(this.sortClientConfig, new 
MetricReporterImpl(sortClientConfig));
-            this.inLongTopicManager = new InlongTopicManagerImpl(context, new 
QueryConsumeConfigImpl(context));
+
+            this.inLongTopicManager = InlongTopicManagerFactory
+                    .createInLongTopicManager(sortClientConfig.getTopicType(),
+                            context, new QueryConsumeConfigImpl(context));
         } catch (Exception e) {
             this.close();
             throw e;
@@ -71,7 +77,10 @@ public class SortClientImpl extends SortClient {
         try {
             this.sortClientConfig = sortClientConfig;
             this.context = new ClientContextImpl(this.sortClientConfig, 
metricReporter);
-            this.inLongTopicManager = new InlongTopicManagerImpl(context, new 
QueryConsumeConfigImpl(context));
+            queryConsumeConfig.configure(context);
+            this.inLongTopicManager = InlongTopicManagerFactory
+                    .createInLongTopicManager(sortClientConfig.getTopicType(),
+                            context, queryConsumeConfig);
         } catch (Exception e) {
             e.printStackTrace();
             this.close();
@@ -102,8 +111,8 @@ public class SortClientImpl extends SortClient {
     public void ack(String msgKey, String msgOffset)
             throws Exception {
         logger.debug("ack:{} offset:{}", msgKey, msgOffset);
-        InLongTopicFetcher inLongTopicFetcher = getFetcher(msgKey);
-        inLongTopicFetcher.ack(msgOffset);
+        TopicFetcher topicFetcher = getFetcher(msgKey);
+        topicFetcher.ack(msgOffset);
     }
 
     /**
@@ -128,12 +137,12 @@ public class SortClientImpl extends SortClient {
         return this.sortClientConfig;
     }
 
-    private InLongTopicFetcher getFetcher(String msgKey) throws 
NotExistException {
-        InLongTopicFetcher inLongTopicFetcher = 
inLongTopicManager.getFetcher(msgKey);
-        if (inLongTopicFetcher == null) {
+    private TopicFetcher getFetcher(String msgKey) throws NotExistException {
+        TopicFetcher topicFetcher = inLongTopicManager.getFetcher(msgKey);
+        if (topicFetcher == null) {
             throw new NotExistException(msgKey + " not exist.");
         }
-        return inLongTopicFetcher;
+        return topicFetcher;
     }
 
     private boolean doClose(Cleanable c) {
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java
deleted file mode 100644
index 3a730d082..000000000
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.sort.impl;
-
-import org.apache.inlong.sdk.sort.api.Cleanable;
-import org.apache.inlong.sdk.sort.api.ClientContext;
-import org.apache.inlong.sdk.sort.api.ManagerReportHandler;
-import org.apache.inlong.sdk.sort.api.MetricReporter;
-import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
-import org.apache.inlong.sdk.sort.api.SortClient;
-import org.apache.inlong.sdk.sort.api.SortClientConfig;
-import org.apache.inlong.sdk.sort.api.TopicFetcher;
-import org.apache.inlong.sdk.sort.api.TopicManager;
-import org.apache.inlong.sdk.sort.exception.NotExistException;
-import org.apache.inlong.sdk.sort.api.InlongTopicManagerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * New version of sort client.
- */
-public class SortClientImplV2 extends SortClient {
-
-    private final String logPrefix = "[" + 
SortClientImpl.class.getSimpleName() + "] ";
-    private final Logger logger = 
LoggerFactory.getLogger(SortClientImpl.class);
-
-    private final SortClientConfig sortClientConfig;
-
-    private final ClientContext context;
-
-    private final TopicManager inLongTopicManager;
-
-    /**
-     * SortClient Constructor
-     *
-     * @param sortClientConfig SortClientConfig
-     */
-    public SortClientImplV2(SortClientConfig sortClientConfig) {
-        try {
-            this.sortClientConfig = sortClientConfig;
-            this.context = new ClientContextImpl(this.sortClientConfig, new 
MetricReporterImpl(sortClientConfig));
-
-            this.inLongTopicManager = InlongTopicManagerFactory
-                    .createInLongTopicManager(sortClientConfig.getTopicType(),
-                            context, new QueryConsumeConfigImpl(context));
-        } catch (Exception e) {
-            this.close();
-            throw e;
-        }
-    }
-
-    /**
-     * SortClient Constructor with user defined 
QueryConsumeConfig,MetricReporter and ManagerReportHandler
-     *
-     * @param sortClientConfig SortClientConfig
-     * @param queryConsumeConfig QueryConsumeConfig
-     * @param metricReporter MetricReporter
-     * @param managerReportHandler ManagerReportHandler
-     */
-    public SortClientImplV2(SortClientConfig sortClientConfig, 
QueryConsumeConfig queryConsumeConfig,
-            MetricReporter metricReporter, ManagerReportHandler 
managerReportHandler) {
-        try {
-            this.sortClientConfig = sortClientConfig;
-            this.context = new ClientContextImpl(this.sortClientConfig, 
metricReporter);
-            queryConsumeConfig.configure(context);
-            this.inLongTopicManager = InlongTopicManagerFactory
-                    .createInLongTopicManager(sortClientConfig.getTopicType(),
-                            context, queryConsumeConfig);
-        } catch (Exception e) {
-            e.printStackTrace();
-            this.close();
-            throw e;
-        }
-    }
-
-    /**
-     * init SortClient
-     *
-     * @return true/false
-     * @throws Throwable
-     */
-    @Override
-    public boolean init() throws Throwable {
-        logger.info(logPrefix + "init|" + sortClientConfig);
-        return true;
-    }
-
-    /**
-     * ack offset to msgKey
-     *
-     * @param msgKey String
-     * @param msgOffset String
-     * @throws Exception
-     */
-    @Override
-    public void ack(String msgKey, String msgOffset)
-            throws Exception {
-        logger.debug("ack:{} offset:{}", msgKey, msgOffset);
-        TopicFetcher topicFetcher = getFetcher(msgKey);
-        topicFetcher.ack(msgOffset);
-    }
-
-    /**
-     * close SortClient
-     *
-     * @return true/false
-     */
-    @Override
-    public boolean close() {
-        boolean cleanInLongTopicManager = doClose(inLongTopicManager);
-        boolean cleanContext = doClose(context);
-
-        logger.info(logPrefix
-
-                + "|cleanInLongTopicManager=" + cleanInLongTopicManager
-                + "|cleanContext=" + cleanContext);
-        return (cleanInLongTopicManager && cleanContext);
-    }
-
-    @Override
-    public SortClientConfig getConfig() {
-        return this.sortClientConfig;
-    }
-
-    private TopicFetcher getFetcher(String msgKey) throws NotExistException {
-        TopicFetcher topicFetcher = inLongTopicManager.getFetcher(msgKey);
-        if (topicFetcher == null) {
-            throw new NotExistException(msgKey + " not exist.");
-        }
-        return topicFetcher;
-    }
-
-    private boolean doClose(Cleanable c) {
-        try {
-            if (c != null) {
-                return c.clean();
-            }
-            return true;
-        } catch (Throwable th) {
-            logger.error(logPrefix + "clean error.", th);
-            return false;
-        }
-    }
-}
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
deleted file mode 100644
index deffa3272..000000000
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.sort.impl.kafka;
-
-import com.google.gson.Gson;
-
-import org.apache.inlong.sdk.sort.api.ClientContext;
-import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
-import org.apache.inlong.sdk.sort.api.SeekerFactory;
-import org.apache.inlong.sdk.sort.api.SortClientConfig.ConsumeStrategy;
-import org.apache.inlong.sdk.sort.entity.InLongMessage;
-import org.apache.inlong.sdk.sort.entity.InLongTopic;
-import org.apache.inlong.sdk.sort.entity.MessageRecord;
-import org.apache.inlong.sdk.sort.fetcher.kafka.AckOffsetOnRebalance;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.RangeAssignor;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-@Deprecated
-public class InLongKafkaFetcherImpl extends InLongTopicFetcher {
-
-    private final Logger logger = 
LoggerFactory.getLogger(InLongKafkaFetcherImpl.class);
-    private final ConcurrentHashMap<TopicPartition, OffsetAndMetadata> 
commitOffsetMap = new ConcurrentHashMap<>();
-    private final AtomicLong ackOffsets = new AtomicLong(0);
-    private volatile boolean stopConsume = false;
-    private String bootstrapServers;
-    private KafkaConsumer<byte[], byte[]> consumer;
-
-    public InLongKafkaFetcherImpl(InLongTopic inLongTopic, ClientContext 
context) {
-        super(inLongTopic, context);
-    }
-
-    @Override
-    public boolean init(Object object) {
-        String bootstrapServers = (String) object;
-        try {
-            createKafkaConsumer(bootstrapServers);
-            if (consumer != null) {
-                logger.info("start to subscribe topic:{}", new 
Gson().toJson(inLongTopic));
-                this.seeker = SeekerFactory.createKafkaSeeker(consumer, 
inLongTopic);
-                
consumer.subscribe(Collections.singletonList(inLongTopic.getTopic()),
-                        new 
AckOffsetOnRebalance(this.inLongTopic.getInLongCluster().getClusterId(), seeker,
-                                commitOffsetMap, consumer));
-            } else {
-                logger.info("consumer is null");
-                return false;
-            }
-            this.bootstrapServers = bootstrapServers;
-            String threadName = String.format("sort_sdk_fetch_thread_%s_%s_%d",
-                    this.inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic(), this.hashCode());
-            this.fetchThread = new Thread(new Fetcher(), threadName);
-            logger.info("start to start thread:{}", threadName);
-            this.fetchThread.start();
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-            return false;
-        }
-        return true;
-    }
-
-    @Override
-    public void ack(String msgOffset) throws Exception {
-        String[] offset = msgOffset.split(":");
-        if (offset.length == 2) {
-            TopicPartition topicPartition = new 
TopicPartition(inLongTopic.getTopic(), Integer.parseInt(offset[0]));
-            OffsetAndMetadata offsetAndMetadata = new 
OffsetAndMetadata(Long.parseLong(offset[1]));
-            commitOffsetMap.put(topicPartition, offsetAndMetadata);
-        } else {
-            throw new Exception("offset is illegal, the correct format is 
int:long ,the error offset is:" + msgOffset);
-        }
-    }
-
-    @Override
-    public void pause() {
-        this.stopConsume = true;
-    }
-
-    @Override
-    public void resume() {
-        this.stopConsume = false;
-    }
-
-    @Override
-    public boolean close() {
-        this.closed = true;
-        try {
-            if (fetchThread != null) {
-                fetchThread.interrupt();
-            }
-            if (consumer != null) {
-                consumer.close();
-            }
-        } catch (Throwable throwable) {
-            throwable.printStackTrace();
-        }
-        logger.info("closed {}", inLongTopic);
-        return true;
-    }
-
-    @Override
-    public boolean isClosed() {
-        return closed;
-    }
-
-    @Override
-    public void stopConsume(boolean stopConsume) {
-        this.stopConsume = stopConsume;
-    }
-
-    @Override
-    public boolean isConsumeStop() {
-        return this.stopConsume;
-    }
-
-    @Override
-    public InLongTopic getInLongTopic() {
-        return inLongTopic;
-    }
-
-    @Override
-    public long getConsumedDataSize() {
-        return 0;
-    }
-
-    @Override
-    public long getAckedOffset() {
-        return 0;
-    }
-
-    private void createKafkaConsumer(String bootstrapServers) {
-        Properties properties = new Properties();
-        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
-        properties.put(ConsumerConfig.GROUP_ID_CONFIG, 
context.getConfig().getSortTaskId());
-        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                ByteArrayDeserializer.class.getName());
-        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                ByteArrayDeserializer.class.getName());
-        properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG,
-                context.getConfig().getKafkaSocketRecvBufferSize());
-        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-        ConsumeStrategy offsetResetStrategy = 
context.getConfig().getOffsetResetStrategy();
-        if (offsetResetStrategy == ConsumeStrategy.lastest
-                || offsetResetStrategy == ConsumeStrategy.lastest_absolutely) {
-            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
-        } else if (offsetResetStrategy == ConsumeStrategy.earliest
-                || offsetResetStrategy == ConsumeStrategy.earliest_absolutely) 
{
-            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-        } else {
-            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
-        }
-        properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,
-                context.getConfig().getKafkaFetchSizeBytes());
-        properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,
-                context.getConfig().getKafkaFetchWaitMs());
-        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
-                RangeAssignor.class.getName());
-        properties.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 120000L);
-        this.bootstrapServers = bootstrapServers;
-        logger.info("start to create kafka consumer:{}", properties);
-        this.consumer = new KafkaConsumer<>(properties);
-        logger.info("end to create kafka consumer:{}", consumer);
-    }
-
-    public class Fetcher implements Runnable {
-
-        private void commitKafkaOffset() {
-            if (consumer != null && commitOffsetMap.size() > 0) {
-                try {
-                    consumer.commitSync(commitOffsetMap);
-                    commitOffsetMap.clear();
-                    // TODO monitor commit succ
-
-                } catch (Exception e) {
-                    // TODO monitor commit fail
-                    logger.error(e.getMessage(), e);
-                }
-            }
-        }
-
-        /**
-         * put the received msg to onFinished method
-         *
-         * @param messageRecords {@link List < MessageRecord >}
-         */
-        private void handleAndCallbackMsg(List<MessageRecord> messageRecords) {
-            long start = System.currentTimeMillis();
-            try {
-                context.getStatManager()
-                        .getStatistics(context.getConfig().getSortTaskId(),
-                                inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
-                        .addCallbackTimes(1);
-                
context.getConfig().getCallback().onFinishedBatch(messageRecords);
-                context.getStatManager()
-                        .getStatistics(context.getConfig().getSortTaskId(),
-                                inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
-                        .addCallbackTimeCost(System.currentTimeMillis() - 
start).addCallbackDoneTimes(1);
-            } catch (Exception e) {
-                context.getStatManager()
-                        .getStatistics(context.getConfig().getSortTaskId(),
-                                inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
-                        .addCallbackErrorTimes(1);
-                logger.error(e.getMessage(), e);
-            }
-        }
-
-        private String getOffset(int partitionId, long offset) {
-            return partitionId + ":" + offset;
-        }
-
-        private Map<String, String> getMsgHeaders(Headers headers) {
-            Map<String, String> headerMap = new HashMap<>();
-            for (Header header : headers) {
-                headerMap.put(header.key(), new String(header.value()));
-            }
-            return headerMap;
-        }
-
-        @Override
-        public void run() {
-            boolean hasPermit;
-            while (true) {
-                hasPermit = false;
-                try {
-                    if (context.getConfig().isStopConsume() || stopConsume) {
-                        TimeUnit.MILLISECONDS.sleep(50);
-                        continue;
-                    }
-
-                    if (sleepTime > 0) {
-                        TimeUnit.MILLISECONDS.sleep(sleepTime);
-                    }
-
-                    context.acquireRequestPermit();
-                    hasPermit = true;
-                    // fetch from kafka
-                    fetchFromKafka();
-                    // commit
-                    commitKafkaOffset();
-                } catch (Exception e) {
-                    context.getStatManager()
-                            .getStatistics(context.getConfig().getSortTaskId(),
-                                    
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
-                            .addFetchErrorTimes(1);
-                    logger.error(e.getMessage(), e);
-                } finally {
-                    if (hasPermit) {
-                        context.releaseRequestPermit();
-                    }
-                }
-            }
-        }
-
-        private void fetchFromKafka() throws Exception {
-            context.getStatManager()
-                    .getStatistics(context.getConfig().getSortTaskId(),
-                            inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
-                    .addMsgCount(1).addFetchTimes(1);
-
-            long startFetchTime = System.currentTimeMillis();
-            ConsumerRecords<byte[], byte[]> records = consumer
-                    
.poll(Duration.ofMillis(context.getConfig().getKafkaFetchWaitMs()));
-            context.getStatManager()
-                    .getStatistics(context.getConfig().getSortTaskId(),
-                            inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
-                    .addFetchTimeCost(System.currentTimeMillis() - 
startFetchTime);
-            if (null != records && !records.isEmpty()) {
-
-                for (ConsumerRecord<byte[], byte[]> msg : records) {
-                    List<MessageRecord> msgs = new ArrayList<>();
-                    String offsetKey = getOffset(msg.partition(), 
msg.offset());
-                    List<InLongMessage> inLongMessages = deserializer
-                            .deserialize(context, inLongTopic, 
getMsgHeaders(msg.headers()), msg.value());
-                    inLongMessages = interceptor.intercept(inLongMessages);
-                    if (inLongMessages.isEmpty()) {
-                        ack(offsetKey);
-                        continue;
-                    }
-
-                    msgs.add(new MessageRecord(inLongTopic.getTopicKey(),
-                            inLongMessages,
-                            offsetKey, System.currentTimeMillis()));
-                    context.getStatManager()
-                            .getStatistics(context.getConfig().getSortTaskId(),
-                                    
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
-                            .addConsumeSize(msg.value().length);
-                    context.getStatManager()
-                            .getStatistics(context.getConfig().getSortTaskId(),
-                                    
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
-                            .addMsgCount(msgs.size());
-                    handleAndCallbackMsg(msgs);
-                }
-                sleepTime = 0L;
-            } else {
-                context.getStatManager()
-                        .getStatistics(context.getConfig().getSortTaskId(),
-                                inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
-                        .addEmptyFetchTimes(1);
-                emptyFetchTimes++;
-                if (emptyFetchTimes >= 
context.getConfig().getEmptyPollTimes()) {
-                    sleepTime = Math.min((sleepTime += 
context.getConfig().getEmptyPollSleepStepMs()),
-                            context.getConfig().getMaxEmptyPollSleepMs());
-                    emptyFetchTimes = 0;
-                }
-            }
-        }
-    }
-}
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
deleted file mode 100644
index fddd9ec11..000000000
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.sort.impl.pulsar;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.sdk.sort.api.ClientContext;
-import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
-import org.apache.inlong.sdk.sort.entity.InLongMessage;
-import org.apache.inlong.sdk.sort.entity.InLongTopic;
-import org.apache.inlong.sdk.sort.entity.MessageRecord;
-import org.apache.inlong.sdk.sort.util.StringUtil;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Messages;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Base64;
-import java.util.Date;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-public class InLongPulsarFetcherImpl extends InLongTopicFetcher {
-
-    private final Logger logger = 
LoggerFactory.getLogger(InLongPulsarFetcherImpl.class);
-    private final ReentrantReadWriteLock mainLock = new 
ReentrantReadWriteLock(true);
-    private final ConcurrentHashMap<String, MessageId> offsetCache = new 
ConcurrentHashMap<>();
-    private Consumer<byte[]> consumer;
-
-    public InLongPulsarFetcherImpl(InLongTopic inLongTopic,
-            ClientContext context) {
-        super(inLongTopic, context);
-    }
-
-    @Override
-    public void stopConsume(boolean stopConsume) {
-        this.isStopConsume = stopConsume;
-    }
-
-    @Override
-    public boolean isConsumeStop() {
-        return isStopConsume;
-    }
-
-    @Override
-    public InLongTopic getInLongTopic() {
-        return inLongTopic;
-    }
-
-    @Override
-    public long getConsumedDataSize() {
-        return 0L;
-    }
-
-    @Override
-    public long getAckedOffset() {
-        return 0L;
-    }
-
-    private void ackSucc(String offset) {
-        offsetCache.remove(offset);
-        
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
-                inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic()).addAckSuccTimes(1L);
-    }
-
-    /**
-     * ack Offset
-     *
-     * @param msgOffset String
-     */
-    @Override
-    public void ack(String msgOffset) throws Exception {
-        if (!StringUtils.isEmpty(msgOffset)) {
-            try {
-                if (consumer == null) {
-                    
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
-                            inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
-                            .addAckFailTimes(1L);
-                    logger.error("consumer == null {}", inLongTopic);
-                    return;
-                }
-                MessageId messageId = offsetCache.get(msgOffset);
-                if (messageId == null) {
-                    
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
-                            inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
-                            .addAckFailTimes(1L);
-                    logger.error("messageId == null {}", inLongTopic);
-                    return;
-                }
-                consumer.acknowledgeAsync(messageId)
-                        .thenAccept(consumer -> ackSucc(msgOffset))
-                        .exceptionally(exception -> {
-                            logger.error("ack fail:{} {},error:{}",
-                                    inLongTopic, msgOffset, 
exception.getMessage(), exception);
-                            
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
-                                    
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
-                                    .addAckFailTimes(1L);
-                            return null;
-                        });
-            } catch (Exception e) {
-                
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
-                        inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic()).addAckFailTimes(1L);
-                logger.error(e.getMessage(), e);
-                throw e;
-            }
-        }
-    }
-
-    /**
-     * create Consumer and fetch thread
-     *
-     * @return boolean
-     */
-    @Override
-    public boolean init(Object object) {
-        PulsarClient pulsarClient = (PulsarClient) object;
-        return createConsumer(pulsarClient);
-    }
-
-    private boolean createConsumer(PulsarClient client) {
-        if (null == client) {
-            return false;
-        }
-        try {
-            consumer = client.newConsumer(Schema.BYTES)
-                    .topic(inLongTopic.getTopic())
-                    .subscriptionName(context.getConfig().getSortTaskId())
-                    .subscriptionType(SubscriptionType.Shared)
-                    .startMessageIdInclusive()
-                    .ackTimeout(context.getConfig().getAckTimeoutSec(), 
TimeUnit.SECONDS)
-                    
.receiverQueueSize(context.getConfig().getPulsarReceiveQueueSize())
-                    .subscribe();
-
-            String threadName = "sort_sdk_fetch_thread_" + 
StringUtil.formatDate(new Date());
-            this.fetchThread = new Thread(new Fetcher(), threadName);
-            this.fetchThread.start();
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-            return false;
-        }
-        return true;
-    }
-
-    /**
-     * pause
-     */
-    @Override
-    public void pause() {
-        if (consumer != null) {
-            consumer.pause();
-        }
-    }
-
-    /**
-     * resume
-     */
-    @Override
-    public void resume() {
-        if (consumer != null) {
-            consumer.resume();
-        }
-    }
-
-    /**
-     * close
-     *
-     * @return true/false
-     */
-    @Override
-    public boolean close() {
-        mainLock.writeLock().lock();
-        try {
-            try {
-                if (consumer != null) {
-                    consumer.close();
-                }
-                if (fetchThread != null) {
-                    fetchThread.interrupt();
-                }
-            } catch (PulsarClientException e) {
-                e.printStackTrace();
-            }
-            logger.info("closed {}", inLongTopic);
-            return true;
-        } finally {
-            this.closed = true;
-            mainLock.writeLock().unlock();
-        }
-    }
-
-    @Override
-    public boolean isClosed() {
-        return closed;
-    }
-
-    public class Fetcher implements Runnable {
-
-        /**
-         * put the received msg to onFinished method
-         *
-         * @param messageRecords {@link List}
-         */
-        private void handleAndCallbackMsg(List<MessageRecord> messageRecords) {
-            long start = System.currentTimeMillis();
-            try {
-                context.getStatManager()
-                        .getStatistics(context.getConfig().getSortTaskId(),
-                                inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
-                        .addCallbackTimes(1L);
-                
context.getConfig().getCallback().onFinishedBatch(messageRecords);
-                context.getStatManager()
-                        .getStatistics(context.getConfig().getSortTaskId(),
-                                inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
-                        .addCallbackTimeCost(System.currentTimeMillis() - 
start).addCallbackDoneTimes(1L);
-            } catch (Exception e) {
-                context.getStatManager()
-                        .getStatistics(context.getConfig().getSortTaskId(),
-                                inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
-                        .addCallbackErrorTimes(1L);
-                e.printStackTrace();
-            }
-        }
-
-        private String getOffset(MessageId msgId) {
-            return Base64.getEncoder().encodeToString(msgId.toByteArray());
-        }
-
-        @Override
-        public void run() {
-            boolean hasPermit;
-            while (true) {
-                hasPermit = false;
-                try {
-                    if (context.getConfig().isStopConsume() || isStopConsume) {
-                        TimeUnit.MILLISECONDS.sleep(50);
-                        continue;
-                    }
-
-                    if (sleepTime > 0) {
-                        TimeUnit.MILLISECONDS.sleep(sleepTime);
-                    }
-
-                    context.acquireRequestPermit();
-                    hasPermit = true;
-                    context.getStatManager()
-                            .getStatistics(context.getConfig().getSortTaskId(),
-                                    
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
-                            .addMsgCount(1L).addFetchTimes(1L);
-
-                    long startFetchTime = System.currentTimeMillis();
-                    Messages<byte[]> messages = consumer.batchReceive();
-                    context.getStatManager()
-                            .getStatistics(context.getConfig().getSortTaskId(),
-                                    
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
-                            .addFetchTimeCost(System.currentTimeMillis() - 
startFetchTime);
-                    if (null != messages && messages.size() != 0) {
-                        for (Message<byte[]> msg : messages) {
-                            List<MessageRecord> msgs = new ArrayList<>();
-                            String offsetKey = getOffset(msg.getMessageId());
-                            offsetCache.put(offsetKey, msg.getMessageId());
-
-                            List<InLongMessage> inLongMessages = deserializer
-                                    .deserialize(context, inLongTopic, 
msg.getProperties(), msg.getData());
-
-                            msgs.add(new 
MessageRecord(inLongTopic.getTopicKey(),
-                                    inLongMessages,
-                                    offsetKey, System.currentTimeMillis()));
-                            context.getStatManager()
-                                    
.getStatistics(context.getConfig().getSortTaskId(),
-                                            
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
-                                    .addConsumeSize(msg.getData().length);
-                            context.getStatManager()
-                                    
.getStatistics(context.getConfig().getSortTaskId(),
-                                            
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
-                                    .addMsgCount(msgs.size());
-                            handleAndCallbackMsg(msgs);
-                        }
-                        sleepTime = 0L;
-                    } else {
-                        context.getStatManager()
-                                
.getStatistics(context.getConfig().getSortTaskId(),
-                                        
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
-                                .addEmptyFetchTimes(1L);
-                        emptyFetchTimes++;
-                        if (emptyFetchTimes >= 
context.getConfig().getEmptyPollTimes()) {
-                            sleepTime = Math.min((sleepTime += 
context.getConfig().getEmptyPollSleepStepMs()),
-                                    
context.getConfig().getMaxEmptyPollSleepMs());
-                            emptyFetchTimes = 0;
-                        }
-                    }
-                } catch (Exception e) {
-                    context.getStatManager()
-                            .getStatistics(context.getConfig().getSortTaskId(),
-                                    
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
-                            .addFetchErrorTimes(1L);
-                    logger.error(e.getMessage(), e);
-                } finally {
-                    if (hasPermit) {
-                        context.releaseRequestPermit();
-                    }
-                }
-
-                if (closed) {
-                    break;
-                }
-            }
-        }
-    }
-}
\ No newline at end of file
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java
deleted file mode 100644
index f4d2f3624..000000000
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.sort.impl.tube;
-
-import com.google.common.base.Splitter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.sdk.sort.api.ClientContext;
-import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
-import org.apache.inlong.sdk.sort.api.SysConstants;
-import org.apache.inlong.sdk.sort.entity.InLongMessage;
-import org.apache.inlong.sdk.sort.entity.InLongTopic;
-import org.apache.inlong.sdk.sort.entity.MessageRecord;
-import org.apache.inlong.sdk.sort.fetcher.tube.TubeConsumerCreator;
-import org.apache.inlong.sdk.sort.util.StringUtil;
-import org.apache.inlong.tubemq.client.config.ConsumerConfig;
-import org.apache.inlong.tubemq.client.config.TubeClientConfig;
-import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
-import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
-import org.apache.inlong.tubemq.corebase.Message;
-import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Deprecated
-public class InLongTubeFetcherImpl extends InLongTopicFetcher {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(InLongTubeFetcherImpl.class);
-    private PullMessageConsumer messageConsumer;
-    private volatile Thread fetchThread;
-
-    public InLongTubeFetcherImpl(InLongTopic inLongTopic, ClientContext 
context) {
-        super(inLongTopic, context);
-    }
-
-    @Override
-    public boolean init(Object object) {
-        TubeConsumerCreator tubeConsumerCreator = (TubeConsumerCreator) object;
-        TubeClientConfig tubeClientConfig = 
tubeConsumerCreator.getTubeClientConfig();
-        try {
-            ConsumerConfig consumerConfig = new 
ConsumerConfig(tubeClientConfig.getMasterInfo(),
-                    context.getConfig().getSortTaskId());
-
-            messageConsumer = 
tubeConsumerCreator.getMessageSessionFactory().createPullConsumer(consumerConfig);
-            if (messageConsumer != null) {
-                TreeSet<String> filters = null;
-                if (inLongTopic.getProperties() != null && 
inLongTopic.getProperties().containsKey(
-                        SysConstants.TUBE_TOPIC_FILTER_KEY)) {
-                    String filterStr = 
inLongTopic.getProperties().get(SysConstants.TUBE_TOPIC_FILTER_KEY);
-                    String[] filterArray = filterStr.split(" ");
-                    filters = new TreeSet<>(Arrays.asList(filterArray));
-                }
-                messageConsumer.subscribe(inLongTopic.getTopic(), filters);
-                messageConsumer.completeSubscribe();
-
-                String threadName = "sort_sdk_fetch_thread_" + 
StringUtil.formatDate(new Date());
-                this.fetchThread = new Thread(new Fetcher(), threadName);
-                this.fetchThread.start();
-            } else {
-                return false;
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            return false;
-        }
-        return true;
-    }
-
-    @Override
-    public void ack(String msgOffset) throws Exception {
-        if (!StringUtils.isEmpty(msgOffset)) {
-            if (messageConsumer == null) {
-                context.getStatManager()
-                        .getStatistics(context.getConfig().getSortTaskId(),
-                                inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
-                        .addAckFailTimes(1L);
-                LOG.warn("consumer == null");
-                return;
-            }
-
-            try {
-                ConsumerResult consumerResult = 
messageConsumer.confirmConsume(msgOffset, true);
-                int errCode = consumerResult.getErrCode();
-                if (TErrCodeConstants.SUCCESS != errCode) {
-                    context.getStatManager()
-                            .getStatistics(context.getConfig().getSortTaskId(),
-                                    
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
-                            .addAckFailTimes(1L);
-                } else {
-                    context.getStatManager()
-                            .getStatistics(context.getConfig().getSortTaskId(),
-                                    
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
-                            .addAckSuccTimes(1L);
-                }
-            } catch (Exception e) {
-                
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
-                        inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic()).addAckFailTimes(1L);
-                LOG.error(e.getMessage(), e);
-                throw e;
-            }
-        }
-    }
-
-    @Override
-    public void pause() {
-        this.closed = true;
-    }
-
-    @Override
-    public void resume() {
-        this.closed = false;
-    }
-
-    @Override
-    public boolean close() {
-        try {
-            if (fetchThread != null) {
-                fetchThread.interrupt();
-            }
-            if (messageConsumer != null) {
-                messageConsumer.shutdown();
-            }
-        } catch (Throwable throwable) {
-            throwable.printStackTrace();
-        } finally {
-            this.closed = true;
-        }
-        LOG.info("closed {}", inLongTopic);
-        return true;
-    }
-
-    @Override
-    public boolean isClosed() {
-        return closed;
-    }
-
-    @Override
-    public void stopConsume(boolean stopConsume) {
-        this.isStopConsume = stopConsume;
-    }
-
-    @Override
-    public boolean isConsumeStop() {
-        return isStopConsume;
-    }
-
-    @Override
-    public InLongTopic getInLongTopic() {
-        return inLongTopic;
-    }
-
-    @Override
-    public long getConsumedDataSize() {
-        return 0L;
-    }
-
-    @Override
-    public long getAckedOffset() {
-        return 0L;
-    }
-
-    public class Fetcher implements Runnable {
-
-        /**
-         * put the received msg to onFinished method
-         *
-         * @param messageRecord {@link MessageRecord}
-         */
-        private void handleAndCallbackMsg(MessageRecord messageRecord) {
-            long start = System.currentTimeMillis();
-            try {
-                context.getStatManager()
-                        .getStatistics(context.getConfig().getSortTaskId(),
-                                inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
-                        .addCallbackTimes(1L);
-                
context.getConfig().getCallback().onFinishedBatch(Collections.singletonList(messageRecord));
-                context.getStatManager()
-                        .getStatistics(context.getConfig().getSortTaskId(),
-                                inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
-                        .addCallbackTimeCost(System.currentTimeMillis() - 
start).addCallbackDoneTimes(1L);
-            } catch (Exception e) {
-                context.getStatManager()
-                        .getStatistics(context.getConfig().getSortTaskId(),
-                                inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
-                        .addCallbackErrorTimes(1L);
-                e.printStackTrace();
-            }
-        }
-
-        /**
-         * parseAttr from k1=v1&k2=v2 to kv map
-         *
-         * @param splitter {@link Splitter}
-         * @param attr String
-         * @param entrySplitterStr String
-         * @return {@link Map}
-         */
-        private Map<String, String> parseAttr(Splitter splitter, String attr, 
String entrySplitterStr) {
-            Map<String, String> map = new HashMap<>();
-            for (String s : splitter.split(attr)) {
-                int idx = s.indexOf(entrySplitterStr);
-                String k = s;
-                String v = null;
-                if (idx > 0) {
-                    k = s.substring(0, idx);
-                    v = s.substring(idx + 1);
-                }
-                map.put(k, v);
-            }
-            return map;
-        }
-
-        private Map<String, String> getAttributeMap(String attribute) {
-            final Splitter splitter = Splitter.on("&");
-            return parseAttr(splitter, attribute, "=");
-        }
-
-        @Override
-        public void run() {
-            boolean hasPermit;
-            while (true) {
-                hasPermit = false;
-                try {
-                    if (context.getConfig().isStopConsume() || isStopConsume) {
-                        TimeUnit.MILLISECONDS.sleep(50L);
-                        continue;
-                    }
-
-                    if (sleepTime > 0) {
-                        TimeUnit.MILLISECONDS.sleep(sleepTime);
-                    }
-
-                    context.acquireRequestPermit();
-                    hasPermit = true;
-                    context.getStatManager()
-                            .getStatistics(context.getConfig().getSortTaskId(),
-                                    
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
-                            .addMsgCount(1L).addFetchTimes(1L);
-
-                    long startFetchTime = System.currentTimeMillis();
-                    ConsumerResult message = messageConsumer.getMessage();
-                    context.getStatManager()
-                            .getStatistics(context.getConfig().getSortTaskId(),
-                                    
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
-                            .addFetchTimeCost(System.currentTimeMillis() - 
startFetchTime);
-                    if (null != message && TErrCodeConstants.SUCCESS == 
message.getErrCode()) {
-                        for (Message msg : message.getMessageList()) {
-                            List<InLongMessage> msgs = new ArrayList<>();
-                            List<InLongMessage> deserialize = deserializer
-                                    .deserialize(context, inLongTopic, 
getAttributeMap(msg.getAttribute()),
-                                            msg.getData());
-                            deserialize = interceptor.intercept(deserialize);
-                            if (deserialize.isEmpty()) {
-                                continue;
-                            }
-                            msgs.addAll(deserialize);
-                            context.getStatManager()
-                                    
.getStatistics(context.getConfig().getSortTaskId(),
-                                            
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
-                                    
.addMsgCount(deserialize.size()).addConsumeSize(msg.getData().length);
-                            handleAndCallbackMsg(new 
MessageRecord(inLongTopic.getTopicKey(), msgs,
-                                    message.getConfirmContext(), 
System.currentTimeMillis()));
-                        }
-
-                        sleepTime = 0L;
-                    } else {
-                        context.getStatManager()
-                                
.getStatistics(context.getConfig().getSortTaskId(),
-                                        
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
-                                .addEmptyFetchTimes(1L);
-                        emptyFetchTimes++;
-                        if (emptyFetchTimes >= 
context.getConfig().getEmptyPollTimes()) {
-                            sleepTime = Math.min((sleepTime += 
context.getConfig().getEmptyPollSleepStepMs()),
-                                    
context.getConfig().getMaxEmptyPollSleepMs());
-                            emptyFetchTimes = 0;
-                        }
-                    }
-                } catch (Exception e) {
-                    context.getStatManager()
-                            .getStatistics(context.getConfig().getSortTaskId(),
-                                    
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
-                            .addFetchErrorTimes(1L);
-                    LOG.error(e.getMessage(), e);
-                } finally {
-                    if (hasPermit) {
-                        context.releaseRequestPermit();
-                    }
-                }
-
-                if (closed) {
-                    break;
-                }
-            }
-        }
-    }
-}
diff --git 
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/InlongTopicManagerImplTest.java
 
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/InlongTopicManagerImplTest.java
index 363a8244f..52011b3c6 100644
--- 
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/InlongTopicManagerImplTest.java
+++ 
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/InlongTopicManagerImplTest.java
@@ -24,12 +24,12 @@ import java.util.HashMap;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.inlong.sdk.sort.api.ClientContext;
-import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
-import org.apache.inlong.sdk.sort.api.InlongTopicManager;
 import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
 import org.apache.inlong.sdk.sort.api.SortClientConfig;
+import org.apache.inlong.sdk.sort.api.TopicFetcher;
 import org.apache.inlong.sdk.sort.entity.CacheZoneCluster;
 import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.manager.InlongSingleTopicManager;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -45,7 +45,7 @@ public class InlongTopicManagerImplTest {
     private InLongTopic inLongTopic;
     private ClientContext clientContext;
     private QueryConsumeConfig queryConsumeConfig;
-    private InlongTopicManager inLongTopicManager;
+    private InlongSingleTopicManager inLongTopicManager;
 
     {
         System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString());
@@ -66,40 +66,40 @@ public class InlongTopicManagerImplTest {
         when(sortClientConfig.getSortTaskId()).thenReturn("test");
         when(sortClientConfig.getUpdateMetaDataIntervalSec()).thenReturn(60);
         queryConsumeConfig = PowerMockito.mock(QueryConsumeConfigImpl.class);
-        inLongTopicManager = new InlongTopicManagerImpl(clientContext, 
queryConsumeConfig);
+        inLongTopicManager = new InlongSingleTopicManager(clientContext, 
queryConsumeConfig);
     }
 
     @Test
     public void testAddFetcher() {
-        InlongTopicManager inLongTopicManager = new 
InlongTopicManagerImpl(clientContext, queryConsumeConfig);
+        InlongSingleTopicManager inLongTopicManager = new 
InlongSingleTopicManager(clientContext, queryConsumeConfig);
 
-        InLongTopicFetcher inLongTopicFetcher = 
inLongTopicManager.addFetcher(inLongTopic);
+        TopicFetcher inLongTopicFetcher = 
inLongTopicManager.addTopic(inLongTopic);
         Assert.assertNull(inLongTopicFetcher);
     }
 
     @Test
     public void testRemoveFetcher() {
 
-        InLongTopicFetcher inLongTopicFetcher = 
inLongTopicManager.removeFetcher(inLongTopic, true);
+        TopicFetcher inLongTopicFetcher = 
inLongTopicManager.removeTopic(inLongTopic, true);
         Assert.assertNull(inLongTopicFetcher);
 
-        ConcurrentHashMap<String, InLongTopicFetcher> fetchers = new 
ConcurrentHashMap<>();
-        InLongTopicFetcher inLongTopicFetcherRmMock = 
PowerMockito.mock(InLongTopicFetcher.class);
+        ConcurrentHashMap<String, TopicFetcher> fetchers = new 
ConcurrentHashMap<>();
+        TopicFetcher inLongTopicFetcherRmMock = 
PowerMockito.mock(TopicFetcher.class);
         fetchers.put(inLongTopic.getTopicKey(), inLongTopicFetcherRmMock);
 
         Whitebox.setInternalState(inLongTopicManager, "fetchers", fetchers);
 
-        inLongTopicFetcher = inLongTopicManager.removeFetcher(inLongTopic, 
true);
+        inLongTopicFetcher = inLongTopicManager.removeTopic(inLongTopic, true);
         Assert.assertNotNull(inLongTopicFetcher);
 
     }
 
     @Test
     public void testGetFetcher() {
-        InLongTopicFetcher fetcher = 
inLongTopicManager.getFetcher(inLongTopic.getTopicKey());
+        TopicFetcher fetcher = 
inLongTopicManager.getFetcher(inLongTopic.getTopicKey());
         Assert.assertNull(fetcher);
-        ConcurrentHashMap<String, InLongTopicFetcher> fetchers = new 
ConcurrentHashMap<>();
-        InLongTopicFetcher inLongTopicFetcherRmMock = 
PowerMockito.mock(InLongTopicFetcher.class);
+        ConcurrentHashMap<String, TopicFetcher> fetchers = new 
ConcurrentHashMap<>();
+        TopicFetcher inLongTopicFetcherRmMock = 
PowerMockito.mock(TopicFetcher.class);
         fetchers.put(inLongTopic.getTopicKey(), inLongTopicFetcherRmMock);
 
         Whitebox.setInternalState(inLongTopicManager, "fetchers", fetchers);
@@ -114,8 +114,8 @@ public class InlongTopicManagerImplTest {
         Set<String> managedInLongTopics = 
inLongTopicManager.getManagedInLongTopics();
         Assert.assertEquals(0, managedInLongTopics.size());
 
-        ConcurrentHashMap<String, InLongTopicFetcher> fetchers = new 
ConcurrentHashMap<>();
-        InLongTopicFetcher inLongTopicFetcherRmMock = 
PowerMockito.mock(InLongTopicFetcher.class);
+        ConcurrentHashMap<String, TopicFetcher> fetchers = new 
ConcurrentHashMap<>();
+        TopicFetcher inLongTopicFetcherRmMock = 
PowerMockito.mock(TopicFetcher.class);
         fetchers.put(inLongTopic.getTopicKey(), inLongTopicFetcherRmMock);
         Whitebox.setInternalState(inLongTopicManager, "fetchers", fetchers);
         managedInLongTopics = inLongTopicManager.getManagedInLongTopics();
@@ -125,11 +125,11 @@ public class InlongTopicManagerImplTest {
 
     @Test
     public void testGetAllFetchers() {
-        Collection<InLongTopicFetcher> allFetchers = 
inLongTopicManager.getAllFetchers();
+        Collection<TopicFetcher> allFetchers = 
inLongTopicManager.getAllFetchers();
         Assert.assertEquals(0, allFetchers.size());
 
-        ConcurrentHashMap<String, InLongTopicFetcher> fetchers = new 
ConcurrentHashMap<>();
-        InLongTopicFetcher inLongTopicFetcherRmMock = 
PowerMockito.mock(InLongTopicFetcher.class);
+        ConcurrentHashMap<String, TopicFetcher> fetchers = new 
ConcurrentHashMap<>();
+        TopicFetcher inLongTopicFetcherRmMock = 
PowerMockito.mock(TopicFetcher.class);
         fetchers.put(inLongTopic.getTopicKey(), inLongTopicFetcherRmMock);
         Whitebox.setInternalState(inLongTopicManager, "fetchers", fetchers);
         allFetchers = inLongTopicManager.getAllFetchers();
diff --git 
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializerTest.java
 
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializerTest.java
index 815f1047c..4fc965625 100644
--- 
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializerTest.java
+++ 
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializerTest.java
@@ -17,9 +17,6 @@
 
 package org.apache.inlong.sdk.sort.impl.decode;
 
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.powermock.api.mockito.PowerMockito.when;
-
 import com.google.protobuf.ByteString;
 import java.util.HashMap;
 import java.util.List;
@@ -29,13 +26,10 @@ import 
org.apache.inlong.sdk.commons.protocol.ProxySdk.MapFieldEntry;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObjs;
 import org.apache.inlong.sdk.sort.api.ClientContext;
-import org.apache.inlong.sdk.sort.api.SortClientConfig;
 import org.apache.inlong.sdk.sort.entity.CacheZoneCluster;
 import org.apache.inlong.sdk.sort.entity.InLongMessage;
 import org.apache.inlong.sdk.sort.entity.InLongTopic;
 import org.apache.inlong.sdk.sort.impl.ClientContextImpl;
-import org.apache.inlong.sdk.sort.stat.SortClientStateCounter;
-import org.apache.inlong.sdk.sort.stat.StatManager;
 import org.apache.inlong.sdk.sort.util.Utils;
 import org.junit.Assert;
 import org.junit.Test;
@@ -52,16 +46,12 @@ public class MessageDeserializerTest {
     private InLongTopic inLongTopic;
     private String testData;
     private MessageObjs messageObjs;
-    private SortClientConfig sortClientConfig;
-    private StatManager statManager;
 
     private void setUp() throws Exception {
         System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString());
         messageDeserializer = new MessageDeserializer();
         headers = new HashMap<>();
         context = PowerMockito.mock(ClientContextImpl.class);
-        sortClientConfig = PowerMockito.mock(SortClientConfig.class);
-        statManager = PowerMockito.mock(StatManager.class);
 
         inLongTopic = new InLongTopic();
         inLongTopic.setTopic("testTopic");
@@ -69,13 +59,6 @@ public class MessageDeserializerTest {
         inLongTopic.setInLongCluster(cacheZoneCluster);
         inLongTopic.setProperties(new HashMap<>());
 
-        when(context.getConfig()).thenReturn(sortClientConfig);
-        when(context.getStatManager()).thenReturn(statManager);
-        SortClientStateCounter sortClientStateCounter = new 
SortClientStateCounter("sortTaskId",
-                cacheZoneCluster.getClusterId(),
-                inLongTopic.getTopic(), 0);
-        when(statManager.getStatistics(anyString(), anyString(), 
anyString())).thenReturn(sortClientStateCounter);
-        when(sortClientConfig.getSortTaskId()).thenReturn("sortTaskId");
     }
 
     @Test
diff --git 
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImplTest.java
 
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImplTest.java
index 8d3a4b5e3..88b1ed7b2 100644
--- 
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImplTest.java
+++ 
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImplTest.java
@@ -17,16 +17,14 @@
 
 package org.apache.inlong.sdk.sort.impl.kafka;
 
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.powermock.api.mockito.PowerMockito.when;
-
 import org.apache.inlong.sdk.sort.api.ClientContext;
-import org.apache.inlong.sdk.sort.api.SortClientConfig;
+import org.apache.inlong.sdk.sort.api.TopicFetcher;
 import org.apache.inlong.sdk.sort.entity.CacheZoneCluster;
 import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.fetcher.kafka.KafkaSingleTopicFetcher;
 import org.apache.inlong.sdk.sort.impl.ClientContextImpl;
-import org.apache.inlong.sdk.sort.stat.SortClientStateCounter;
-import org.apache.inlong.sdk.sort.stat.StatManager;
+import org.apache.inlong.sdk.sort.impl.decode.MessageDeserializer;
+import org.apache.inlong.sdk.sort.interceptor.MsgTimeInterceptor;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -45,8 +43,7 @@ public class InLongKafkaFetcherImplTest {
 
     private ClientContext clientContext;
     private InLongTopic inLongTopic;
-    private SortClientConfig sortClientConfig;
-    private StatManager statManager;
+    private static final String TEST_BOOTSTRAP = "testBootstrap";
 
     /**
      * setUp
@@ -65,34 +62,26 @@ public class InLongKafkaFetcherImplTest {
         inLongTopic.setInLongCluster(cacheZoneCluster);
         clientContext = PowerMockito.mock(ClientContextImpl.class);
 
-        sortClientConfig = PowerMockito.mock(SortClientConfig.class);
-        statManager = PowerMockito.mock(StatManager.class);
-
-        when(clientContext.getConfig()).thenReturn(sortClientConfig);
-        when(clientContext.getStatManager()).thenReturn(statManager);
-        SortClientStateCounter sortClientStateCounter = new 
SortClientStateCounter("sortTaskId",
-                cacheZoneCluster.getClusterId(),
-                inLongTopic.getTopic(), 0);
-        when(statManager.getStatistics(anyString(), anyString(), 
anyString())).thenReturn(sortClientStateCounter);
-        when(sortClientConfig.getSortTaskId()).thenReturn("sortTaskId");
-
     }
 
     @Test
     public void pause() {
-        InLongKafkaFetcherImpl inLongTopicFetcher = new 
InLongKafkaFetcherImpl(inLongTopic, clientContext);
+        TopicFetcher inLongTopicFetcher = new 
KafkaSingleTopicFetcher(inLongTopic, clientContext,
+                new MsgTimeInterceptor(), new MessageDeserializer(), 
TEST_BOOTSTRAP);
         inLongTopicFetcher.pause();
     }
 
     @Test
     public void resume() {
-        InLongKafkaFetcherImpl inLongTopicFetcher = new 
InLongKafkaFetcherImpl(inLongTopic, clientContext);
+        TopicFetcher inLongTopicFetcher = new 
KafkaSingleTopicFetcher(inLongTopic, clientContext,
+                new MsgTimeInterceptor(), new MessageDeserializer(), 
TEST_BOOTSTRAP);
         inLongTopicFetcher.resume();
     }
 
     @Test
     public void close() {
-        InLongKafkaFetcherImpl inLongTopicFetcher = new 
InLongKafkaFetcherImpl(inLongTopic, clientContext);
+        TopicFetcher inLongTopicFetcher = new 
KafkaSingleTopicFetcher(inLongTopic, clientContext,
+                new MsgTimeInterceptor(), new MessageDeserializer(), 
TEST_BOOTSTRAP);
         boolean close = inLongTopicFetcher.close();
         Assert.assertTrue(close);
     }
diff --git 
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
 
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
index 935387f9d..202e8b468 100644
--- 
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
+++ 
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
@@ -27,13 +27,14 @@ import static org.powermock.api.mockito.PowerMockito.when;
 import java.util.HashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.inlong.sdk.sort.api.ClientContext;
-import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
 import org.apache.inlong.sdk.sort.api.SortClientConfig;
+import org.apache.inlong.sdk.sort.api.TopicFetcher;
 import org.apache.inlong.sdk.sort.entity.CacheZoneCluster;
 import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.fetcher.pulsar.PulsarSingleTopicFetcher;
 import org.apache.inlong.sdk.sort.impl.ClientContextImpl;
-import org.apache.inlong.sdk.sort.stat.SortClientStateCounter;
-import org.apache.inlong.sdk.sort.stat.StatManager;
+import org.apache.inlong.sdk.sort.impl.decode.MessageDeserializer;
+import org.apache.inlong.sdk.sort.interceptor.MsgTimeInterceptor;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.MessageId;
@@ -56,7 +57,6 @@ public class InLongPulsarFetcherImplTest {
     private ClientContext clientContext;
     private InLongTopic inLongTopic;
     private SortClientConfig sortClientConfig;
-    private StatManager statManager;
 
     /**
      * setUp
@@ -76,52 +76,35 @@ public class InLongPulsarFetcherImplTest {
         clientContext = PowerMockito.mock(ClientContextImpl.class);
 
         sortClientConfig = PowerMockito.mock(SortClientConfig.class);
-        statManager = PowerMockito.mock(StatManager.class);
 
         when(clientContext.getConfig()).thenReturn(sortClientConfig);
-        when(clientContext.getStatManager()).thenReturn(statManager);
-        SortClientStateCounter sortClientStateCounter = new 
SortClientStateCounter("sortTaskId",
-                cacheZoneCluster.getClusterId(),
-                inLongTopic.getTopic(), 0);
-        when(statManager.getStatistics(anyString(), anyString(), 
anyString())).thenReturn(sortClientStateCounter);
         when(sortClientConfig.getSortTaskId()).thenReturn("sortTaskId");
 
     }
 
     @Test
     public void stopConsume() {
-        InLongTopicFetcher inLongTopicFetcher = new 
InLongPulsarFetcherImpl(inLongTopic, clientContext);
-        boolean consumeStop = inLongTopicFetcher.isConsumeStop();
+        TopicFetcher inLongTopicFetcher = new 
PulsarSingleTopicFetcher(inLongTopic, clientContext,
+                new MsgTimeInterceptor(), new MessageDeserializer(), null);
+        boolean consumeStop = inLongTopicFetcher.isStopConsume();
         Assert.assertFalse(consumeStop);
-        inLongTopicFetcher.stopConsume(true);
-        consumeStop = inLongTopicFetcher.isConsumeStop();
+        inLongTopicFetcher.setStopConsume(true);
+        consumeStop = inLongTopicFetcher.isStopConsume();
         Assert.assertTrue(consumeStop);
     }
 
     @Test
     public void getInLongTopic() {
-        InLongTopicFetcher inLongTopicFetcher = new 
InLongPulsarFetcherImpl(inLongTopic, clientContext);
-        InLongTopic inLongTopic = inLongTopicFetcher.getInLongTopic();
+        TopicFetcher inLongTopicFetcher = new 
PulsarSingleTopicFetcher(inLongTopic, clientContext,
+                new MsgTimeInterceptor(), new MessageDeserializer(), null);
+        InLongTopic inLongTopic = inLongTopicFetcher.getTopics().get(0);
         Assert.assertEquals(inLongTopic.getInLongCluster(), 
inLongTopic.getInLongCluster());
     }
 
-    @Test
-    public void getConsumedDataSize() {
-        InLongTopicFetcher inLongTopicFetcher = new 
InLongPulsarFetcherImpl(inLongTopic, clientContext);
-        long consumedDataSize = inLongTopicFetcher.getConsumedDataSize();
-        Assert.assertEquals(0L, consumedDataSize);
-    }
-
-    @Test
-    public void getAckedOffset() {
-        InLongTopicFetcher inLongTopicFetcher = new 
InLongPulsarFetcherImpl(inLongTopic, clientContext);
-        long ackedOffset = inLongTopicFetcher.getAckedOffset();
-        Assert.assertEquals(0L, ackedOffset);
-    }
-
     @Test
     public void ack() {
-        InLongTopicFetcher inLongTopicFetcher = new 
InLongPulsarFetcherImpl(inLongTopic, clientContext);
+        TopicFetcher inLongTopicFetcher = new 
PulsarSingleTopicFetcher(inLongTopic, clientContext,
+                new MsgTimeInterceptor(), new MessageDeserializer(), null);
         MessageId messageId = PowerMockito.mock(MessageId.class);
         ConcurrentHashMap<String, MessageId> offsetCache = new 
ConcurrentHashMap<>();
         offsetCache.put("test", messageId);
@@ -137,10 +120,11 @@ public class InLongPulsarFetcherImplTest {
 
     @Test
     public void init() {
-        InLongTopicFetcher inLongTopicFetcher = new 
InLongPulsarFetcherImpl(inLongTopic, clientContext);
         PulsarClient pulsarClient = PowerMockito.mock(PulsarClient.class);
         ConsumerBuilder consumerBuilder = 
PowerMockito.mock(ConsumerBuilder.class);
 
+        TopicFetcher inLongTopicFetcher = new 
PulsarSingleTopicFetcher(inLongTopic, clientContext,
+                new MsgTimeInterceptor(), new MessageDeserializer(), 
pulsarClient);
         try {
             when(pulsarClient.newConsumer(any())).thenReturn(consumerBuilder);
             
when(consumerBuilder.topic(anyString())).thenReturn(consumerBuilder);
@@ -156,7 +140,7 @@ public class InLongPulsarFetcherImplTest {
             Consumer consumer = PowerMockito.mock(Consumer.class);
             when(consumerBuilder.subscribe()).thenReturn(consumer);
             doNothing().when(consumer).close();
-            boolean init = inLongTopicFetcher.init(pulsarClient);
+            boolean init = inLongTopicFetcher.init();
             inLongTopicFetcher.close();
             Assert.assertTrue(init);
         } catch (Exception e) {
@@ -166,19 +150,22 @@ public class InLongPulsarFetcherImplTest {
 
     @Test
     public void pause() {
-        InLongTopicFetcher inLongTopicFetcher = new 
InLongPulsarFetcherImpl(inLongTopic, clientContext);
+        TopicFetcher inLongTopicFetcher = new 
PulsarSingleTopicFetcher(inLongTopic, clientContext,
+                new MsgTimeInterceptor(), new MessageDeserializer(), null);
         inLongTopicFetcher.pause();
     }
 
     @Test
     public void resume() {
-        InLongTopicFetcher inLongTopicFetcher = new 
InLongPulsarFetcherImpl(inLongTopic, clientContext);
+        TopicFetcher inLongTopicFetcher = new 
PulsarSingleTopicFetcher(inLongTopic, clientContext,
+                new MsgTimeInterceptor(), new MessageDeserializer(), null);
         inLongTopicFetcher.resume();
     }
 
     @Test
     public void close() {
-        InLongTopicFetcher inLongTopicFetcher = new 
InLongPulsarFetcherImpl(inLongTopic, clientContext);
+        TopicFetcher inLongTopicFetcher = new 
PulsarSingleTopicFetcher(inLongTopic, clientContext,
+                new MsgTimeInterceptor(), new MessageDeserializer(), null);
         boolean close = inLongTopicFetcher.close();
         Assert.assertTrue(close);
 
diff --git 
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImplTest.java
 
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImplTest.java
index 3e35f43f6..dde55f53b 100644
--- 
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImplTest.java
+++ 
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImplTest.java
@@ -17,17 +17,15 @@
 
 package org.apache.inlong.sdk.sort.impl.tube;
 
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.powermock.api.mockito.PowerMockito.when;
 
 import org.apache.inlong.sdk.sort.api.ClientContext;
-import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
 import org.apache.inlong.sdk.sort.api.SortClientConfig;
+import org.apache.inlong.sdk.sort.api.TopicFetcher;
 import org.apache.inlong.sdk.sort.entity.CacheZoneCluster;
 import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.fetcher.tube.TubeSingleTopicFetcher;
 import org.apache.inlong.sdk.sort.impl.ClientContextImpl;
-import org.apache.inlong.sdk.sort.stat.SortClientStateCounter;
-import org.apache.inlong.sdk.sort.stat.StatManager;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -40,7 +38,6 @@ public class InLongTubeFetcherImplTest {
     private ClientContext clientContext;
     private InLongTopic inLongTopic;
     private SortClientConfig sortClientConfig;
-    private StatManager statManager;
 
     /**
      * setUp
@@ -60,33 +57,27 @@ public class InLongTubeFetcherImplTest {
         clientContext = PowerMockito.mock(ClientContextImpl.class);
 
         sortClientConfig = PowerMockito.mock(SortClientConfig.class);
-        statManager = PowerMockito.mock(StatManager.class);
 
         when(clientContext.getConfig()).thenReturn(sortClientConfig);
-        when(clientContext.getStatManager()).thenReturn(statManager);
-        SortClientStateCounter sortClientStateCounter = new 
SortClientStateCounter("sortTaskId",
-                cacheZoneCluster.getClusterId(),
-                inLongTopic.getTopic(), 0);
-        when(statManager.getStatistics(anyString(), anyString(), 
anyString())).thenReturn(sortClientStateCounter);
         when(sortClientConfig.getSortTaskId()).thenReturn("sortTaskId");
 
     }
 
     @Test
     public void pause() {
-        InLongTubeFetcherImpl inLongTopicFetcher = new 
InLongTubeFetcherImpl(inLongTopic, clientContext);
+        TopicFetcher inLongTopicFetcher = new 
TubeSingleTopicFetcher(inLongTopic, clientContext, null, null, null);
         inLongTopicFetcher.pause();
     }
 
     @Test
     public void resume() {
-        InLongTubeFetcherImpl inLongTopicFetcher = new 
InLongTubeFetcherImpl(inLongTopic, clientContext);
+        TopicFetcher inLongTopicFetcher = new 
TubeSingleTopicFetcher(inLongTopic, clientContext, null, null, null);
         inLongTopicFetcher.resume();
     }
 
     @Test
     public void close() {
-        InLongTopicFetcher inLongTopicFetcher = new 
InLongTubeFetcherImpl(inLongTopic, clientContext);
+        TopicFetcher inLongTopicFetcher = new 
TubeSingleTopicFetcher(inLongTopic, clientContext, null, null, null);
         boolean close = inLongTopicFetcher.close();
         Assert.assertTrue(close);
 
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
index f00a1a03b..b74ad9c6b 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
@@ -196,14 +196,14 @@ public final class SortSdkSource extends AbstractSource
             if 
(SortClusterConfigType.FILE.name().equalsIgnoreCase(configType)) {
                 LOG.info("Create sort sdk client in file way:{}", configType);
                 ClassResourceQueryConsumeConfig queryConfig = new 
ClassResourceQueryConsumeConfig();
-                client = SortClientFactory.createSortClientV2(clientConfig,
+                client = SortClientFactory.createSortClient(clientConfig,
                         queryConfig,
                         new MetricReporterImpl(clientConfig),
                         new ManagerReportHandlerImpl());
             } else if 
(SortClusterConfigType.MANAGER.name().equalsIgnoreCase(configType)) {
                 LOG.info("Create sort sdk client in manager way:{}", 
configType);
                 
clientConfig.setManagerApiUrl(ManagerUrlHandler.getSortSourceConfigUrl());
-                client = SortClientFactory.createSortClientV2(clientConfig);
+                client = SortClientFactory.createSortClient(clientConfig);
             } else {
                 LOG.info("Create sort sdk client in custom way:{}", 
configType);
                 // user-defined
@@ -218,7 +218,7 @@ public final class SortSdkSource extends AbstractSource
                     return null;
                 }
                 // if it specifies the type of QueryConsumeConfig.
-                client = SortClientFactory.createSortClientV2(clientConfig,
+                client = SortClientFactory.createSortClient(clientConfig,
                         (QueryConsumeConfig) loaderObject,
                         new MetricReporterImpl(clientConfig),
                         new ManagerReportHandlerImpl());

Reply via email to