This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new f8bd82cd2 [INLONG-6944][SDK] Remove useless code of SortSDK (#6951) f8bd82cd2 is described below commit f8bd82cd23849de2f80bf4bb9f23aa9aadb2afda Author: vernedeng <deng...@pku.edu.cn> AuthorDate: Mon Dec 19 16:34:40 2022 +0800 [INLONG-6944][SDK] Remove useless code of SortSDK (#6951) --- .../inlong/sdk/sort/api/InLongTopicFetcher.java | 82 ---- .../inlong/sdk/sort/api/InlongTopicManager.java | 49 -- .../inlong/sdk/sort/api/SortClientFactory.java | 25 -- .../sdk/sort/impl/InlongTopicManagerImpl.java | 492 --------------------- .../inlong/sdk/sort/impl/ManagerReporter.java | 30 +- .../inlong/sdk/sort/impl/SortClientImpl.java | 33 +- .../inlong/sdk/sort/impl/SortClientImplV2.java | 159 ------- .../sort/impl/kafka/InLongKafkaFetcherImpl.java | 343 -------------- .../sort/impl/pulsar/InLongPulsarFetcherImpl.java | 332 -------------- .../sdk/sort/impl/tube/InLongTubeFetcherImpl.java | 320 -------------- .../sdk/sort/impl/InlongTopicManagerImplTest.java | 36 +- .../sort/impl/decode/MessageDeserializerTest.java | 17 - .../impl/kafka/InLongKafkaFetcherImplTest.java | 33 +- .../impl/pulsar/InLongPulsarFetcherImplTest.java | 59 +-- .../sort/impl/tube/InLongTubeFetcherImplTest.java | 19 +- .../standalone/source/sortsdk/SortSdkSource.java | 6 +- 16 files changed, 96 insertions(+), 1939 deletions(-) diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InLongTopicFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InLongTopicFetcher.java deleted file mode 100644 index f65645129..000000000 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InLongTopicFetcher.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.sort.api; - -import org.apache.inlong.sdk.sort.entity.InLongTopic; -import org.apache.inlong.sdk.sort.impl.decode.MessageDeserializer; -import org.apache.inlong.sdk.sort.interceptor.MsgTimeInterceptor; - -import java.util.Objects; -import java.util.Optional; - -@Deprecated -public abstract class InLongTopicFetcher { - - protected InLongTopic inLongTopic; - protected ClientContext context; - protected Deserializer deserializer; - protected volatile Thread fetchThread; - protected volatile boolean closed = false; - protected volatile boolean isStopConsume = false; - // use for empty topic to sleep - protected long sleepTime = 0L; - protected int emptyFetchTimes = 0; - // for rollback - protected Interceptor interceptor; - protected Seeker seeker; - - public InLongTopicFetcher(InLongTopic inLongTopic, ClientContext context) { - this.inLongTopic = inLongTopic; - this.context = context; - this.deserializer = new MessageDeserializer(); - this.interceptor = new MsgTimeInterceptor(); - this.interceptor.configure(inLongTopic); - } - - public abstract boolean init(Object client); - - public abstract void ack(String msgOffset) throws Exception; - - public abstract void pause(); - - public abstract void resume(); - - public abstract boolean close(); - - public abstract boolean isClosed(); - - public abstract void stopConsume(boolean stopConsume); - - public abstract boolean isConsumeStop(); - - public abstract InLongTopic getInLongTopic(); - - public abstract long getConsumedDataSize(); - - public abstract long getAckedOffset(); - - public boolean updateTopic(InLongTopic topic) { - if (Objects.equals(inLongTopic, topic)) { - return false; - } - this.inLongTopic = topic; - Optional.ofNullable(seeker).ifPresent(seeker -> seeker.configure(inLongTopic)); - Optional.ofNullable(interceptor).ifPresent(interceptor -> interceptor.configure(inLongTopic)); - return true; - } -} diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManager.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManager.java deleted file mode 100644 index 4bcfee17a..000000000 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManager.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.sort.api; - -import java.util.Collection; -import java.util.Set; -import org.apache.inlong.sdk.sort.entity.InLongTopic; - -@Deprecated -public abstract class InlongTopicManager implements Cleanable { - - protected ClientContext context; - protected QueryConsumeConfig queryConsumeConfig; - - public InlongTopicManager(ClientContext context, QueryConsumeConfig queryConsumeConfig) { - this.context = context; - this.queryConsumeConfig = queryConsumeConfig; - } - - public abstract InLongTopicFetcher addFetcher(InLongTopic inLongTopic); - - public abstract InLongTopicFetcher removeFetcher(InLongTopic inLongTopic, boolean closeFetcher); - - public abstract InLongTopicFetcher getFetcher(String fetchKey); - - public abstract Collection<InLongTopicFetcher> getAllFetchers(); - - public abstract Set<String> getManagedInLongTopics(); - - public abstract void offlineAllTopicsAndPartitions(); - - public abstract void close(); - -} diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientFactory.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientFactory.java index 8feba4d06..b5b0a4113 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientFactory.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientFactory.java @@ -18,43 +18,18 @@ package org.apache.inlong.sdk.sort.api; import org.apache.inlong.sdk.sort.impl.SortClientImpl; -import org.apache.inlong.sdk.sort.impl.SortClientImplV2; /** * Factory of {@link SortClient} */ public class SortClientFactory { - /** - * create default SortClient - * - * @param config SortClientConfig - * @return SortClient - */ public static SortClient createSortClient(SortClientConfig config) { return new SortClientImpl(config); } - /** - * create SortClient with user defined QueryConsumeConfig ,MetricReporter and ManagerReportHandler - * - * @param config SortClientConfig - * @param queryConsumeConfig QueryConsumeConfig - * @param reporter MetricReporter - * @param reportHandler ManagerReportHandler - * @return SortClient - */ public static SortClient createSortClient(SortClientConfig config, QueryConsumeConfig queryConsumeConfig, MetricReporter reporter, ManagerReportHandler reportHandler) { return new SortClientImpl(config, queryConsumeConfig, reporter, reportHandler); } - - public static SortClient createSortClientV2(SortClientConfig config) { - return new SortClientImplV2(config); - } - - public static SortClient createSortClientV2(SortClientConfig config, QueryConsumeConfig queryConsumeConfig, - MetricReporter reporter, ManagerReportHandler reportHandler) { - return new SortClientImplV2(config, queryConsumeConfig, reporter, reportHandler); - } } diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InlongTopicManagerImpl.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InlongTopicManagerImpl.java deleted file mode 100644 index 41e41a735..000000000 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InlongTopicManagerImpl.java +++ /dev/null @@ -1,492 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.sort.impl; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Date; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import org.apache.inlong.sdk.sort.api.ClientContext; -import org.apache.inlong.sdk.sort.api.InLongTopicFetcher; -import org.apache.inlong.sdk.sort.api.InlongTopicManager; -import org.apache.inlong.sdk.sort.api.InlongTopicTypeEnum; -import org.apache.inlong.sdk.sort.api.QueryConsumeConfig; -import org.apache.inlong.sdk.sort.entity.ConsumeConfig; -import org.apache.inlong.sdk.sort.entity.InLongTopic; -import org.apache.inlong.sdk.sort.impl.kafka.InLongKafkaFetcherImpl; -import org.apache.inlong.sdk.sort.impl.pulsar.InLongPulsarFetcherImpl; -import org.apache.inlong.sdk.sort.impl.tube.InLongTubeFetcherImpl; -import org.apache.inlong.sdk.sort.fetcher.tube.TubeConsumerCreator; -import org.apache.inlong.sdk.sort.util.PeriodicTask; -import org.apache.inlong.sdk.sort.util.StringUtil; -import org.apache.inlong.tubemq.client.config.TubeClientConfig; -import org.apache.inlong.tubemq.client.factory.MessageSessionFactory; -import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory; -import org.apache.pulsar.client.api.AuthenticationFactory; -import org.apache.pulsar.client.api.PulsarClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Deprecated -public class InlongTopicManagerImpl extends InlongTopicManager { - - private final Logger logger = LoggerFactory.getLogger(InlongTopicManagerImpl.class); - - private final ConcurrentHashMap<String, InLongTopicFetcher> fetchers = new ConcurrentHashMap<>(); - private final ConcurrentHashMap<String, PulsarClient> pulsarClients = new ConcurrentHashMap<>(); - private final ConcurrentHashMap<String, TubeConsumerCreator> tubeFactories = new ConcurrentHashMap<>(); - - private final PeriodicTask updateMetaDataWorker; - private volatile List<String> toBeSelectFetchers = new ArrayList<>(); - private boolean stopAssign = false; - - public InlongTopicManagerImpl(ClientContext context, QueryConsumeConfig queryConsumeConfig) { - super(context, queryConsumeConfig); - updateMetaDataWorker = new UpdateMetaDataThread(context.getConfig().getUpdateMetaDataIntervalSec(), - TimeUnit.SECONDS); - String threadName = "sortsdk_inlongtopic_manager_" + context.getConfig().getSortTaskId() - + "_" + StringUtil.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss"); - updateMetaDataWorker.start(threadName); - } - - private void updateToBeSelectFetchers(Collection<String> c) { - toBeSelectFetchers = new ArrayList<>(c); - } - - private boolean initFetcher(InLongTopicFetcher fetcher, InLongTopic inLongTopic) { - if (InlongTopicTypeEnum.PULSAR.getName().equalsIgnoreCase(inLongTopic.getTopicType())) { - logger.info("create fetcher topic is pulsar {}", inLongTopic); - return fetcher.init(pulsarClients.get(inLongTopic.getInLongCluster().getClusterId())); - } else if (InlongTopicTypeEnum.KAFKA.getName().equalsIgnoreCase(inLongTopic.getTopicType())) { - logger.info("create fetcher topic is kafka {}", inLongTopic); - return fetcher.init(inLongTopic.getInLongCluster().getBootstraps()); - } else if (InlongTopicTypeEnum.TUBE.getName().equalsIgnoreCase(inLongTopic.getTopicType())) { - logger.info("create fetcher topic is tube {}", inLongTopic); - return fetcher.init(tubeFactories.get(inLongTopic.getInLongCluster().getClusterId())); - } else { - logger.error("create fetcher topic type not support " + inLongTopic.getTopicType()); - return false; - } - } - - @Override - public InLongTopicFetcher addFetcher(InLongTopic inLongTopic) { - - try { - InLongTopicFetcher result = fetchers.get(inLongTopic.getTopicKey()); - if (result == null) { - // create fetcher (pulsar,tube,kafka) - InLongTopicFetcher inLongTopicFetcher = createInLongTopicFetcher(inLongTopic); - InLongTopicFetcher preValue = fetchers.putIfAbsent(inLongTopic.getTopicKey(), inLongTopicFetcher); - logger.info("addFetcher :{}", inLongTopic.getTopicKey()); - if (preValue != null) { - result = preValue; - if (inLongTopicFetcher != null) { - inLongTopicFetcher.close(); - } - logger.info("addFetcher create same fetcher {}", inLongTopic); - } else { - result = inLongTopicFetcher; - if (result != null - && !initFetcher(result, inLongTopic)) { - logger.info("addFetcher init fail {}", inLongTopic.getTopicKey()); - result.close(); - result = null; - } - } - } - return result; - } finally { - updateToBeSelectFetchers(fetchers.keySet()); - } - } - - /** - * create fetcher (pulsar,tube,kafka) - * - * @param inLongTopic {@link InLongTopic} - * @return {@link InLongTopicFetcher} - */ - private InLongTopicFetcher createInLongTopicFetcher(InLongTopic inLongTopic) { - if (InlongTopicTypeEnum.PULSAR.getName().equalsIgnoreCase(inLongTopic.getTopicType())) { - logger.info("the topic is pulsar {}", inLongTopic); - return new InLongPulsarFetcherImpl(inLongTopic, context); - } else if (InlongTopicTypeEnum.KAFKA.getName().equalsIgnoreCase(inLongTopic.getTopicType())) { - logger.info("the topic is kafka {}", inLongTopic); - return new InLongKafkaFetcherImpl(inLongTopic, context); - } else if (InlongTopicTypeEnum.TUBE.getName().equalsIgnoreCase(inLongTopic.getTopicType())) { - logger.info("the topic is tube {}", inLongTopic); - return new InLongTubeFetcherImpl(inLongTopic, context); - } else { - logger.error("topic type not support " + inLongTopic.getTopicType()); - return null; - } - } - - @Override - public InLongTopicFetcher removeFetcher(InLongTopic inLongTopic, boolean closeFetcher) { - InLongTopicFetcher result = fetchers.remove(inLongTopic.getTopicKey()); - if (result != null && closeFetcher) { - result.close(); - } - return result; - } - - @Override - public InLongTopicFetcher getFetcher(String fetchKey) { - return fetchers.get(fetchKey); - } - - @Override - public Set<String> getManagedInLongTopics() { - return new HashSet<>(fetchers.keySet()); - } - - @Override - public Collection<InLongTopicFetcher> getAllFetchers() { - return fetchers.values(); - } - - /** - * offline all inlong topic - */ - @Override - public void offlineAllTopicsAndPartitions() { - String subscribeId = context.getConfig().getSortTaskId(); - try { - logger.info("start offline {}", subscribeId); - stopAssign = true; - closeAllFetcher(); - logger.info("close finished {}", subscribeId); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } - } - - @Override - public void close() { - if (updateMetaDataWorker != null) { - updateMetaDataWorker.stop(); - } - } - - @Override - public boolean clean() { - String sortTaskId = context.getConfig().getSortTaskId(); - try { - logger.info("start close {}", sortTaskId); - - if (updateMetaDataWorker != null) { - updateMetaDataWorker.stop(); - } - - closeFetcher(); - closePulsarClient(); - closeTubeSessionFactory(); - logger.info("close finished {}", sortTaskId); - return true; - } catch (Throwable th) { - logger.error("close error " + sortTaskId, th); - } - return false; - } - - private void closeAllFetcher() { - closeFetcher(); - } - - private void closeFetcher() { - Set<Entry<String, InLongTopicFetcher>> entries = fetchers.entrySet(); - for (Entry<String, InLongTopicFetcher> entry : entries) { - String fetchKey = entry.getKey(); - InLongTopicFetcher inLongTopicFetcher = entry.getValue(); - boolean succ = false; - if (inLongTopicFetcher != null) { - try { - succ = inLongTopicFetcher.close(); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } - } - logger.info(" close fetcher{} {}", fetchKey, succ); - } - } - - private void closePulsarClient() { - for (Map.Entry<String, PulsarClient> entry : pulsarClients.entrySet()) { - PulsarClient pulsarClient = entry.getValue(); - String key = entry.getKey(); - try { - if (pulsarClient != null) { - pulsarClient.close(); - } - } catch (Exception e) { - logger.error("close PulsarClient" + key + " error.", e); - } - } - pulsarClients.clear(); - } - - private void closeTubeSessionFactory() { - for (Map.Entry<String, TubeConsumerCreator> entry : tubeFactories.entrySet()) { - MessageSessionFactory tubeMessageSessionFactory = entry.getValue().getMessageSessionFactory(); - String key = entry.getKey(); - try { - if (tubeMessageSessionFactory != null) { - tubeMessageSessionFactory.shutdown(); - } - } catch (Exception e) { - logger.error("close MessageSessionFactory" + key + " error.", e); - } - } - tubeFactories.clear(); - } - - private List<String> getNewTopics(List<InLongTopic> newSubscribedInLongTopics) { - if (newSubscribedInLongTopics != null && newSubscribedInLongTopics.size() > 0) { - List<String> newTopics = new ArrayList<>(); - for (InLongTopic inLongTopic : newSubscribedInLongTopics) { - newTopics.add(inLongTopic.getTopicKey()); - } - return newTopics; - } - return null; - } - - private void handleCurrentConsumeConfig(List<InLongTopic> currentConsumeConfig) { - if (null == currentConsumeConfig) { - logger.warn("List<InLongTopic> currentConsumeConfig is null"); - return; - } - - List<InLongTopic> newConsumeConfig = new ArrayList<>(currentConsumeConfig); - logger.debug("newConsumeConfig List:{}", Arrays.toString(newConsumeConfig.toArray())); - List<String> newTopics = getNewTopics(newConsumeConfig); - logger.debug("newTopics :{}", Arrays.toString(newTopics.toArray())); - - List<String> oldInLongTopics = new ArrayList<>(fetchers.keySet()); - logger.debug("oldInLongTopics :{}", Arrays.toString(oldInLongTopics.toArray())); - // get need be offlined topics - oldInLongTopics.removeAll(newTopics); - logger.debug("removed oldInLongTopics :{}", Arrays.toString(oldInLongTopics.toArray())); - - // get new topics - newTopics.removeAll(new ArrayList<>(fetchers.keySet())); - logger.debug("really new topics :{}", Arrays.toString(newTopics.toArray())); - // offline need be offlined topics - offlineRmovedTopic(oldInLongTopics); - // online new topics - onlineNewTopic(newConsumeConfig, newTopics); - } - - /** - * offline inlong topic which not belong the sortTaskId - * - * @param oldInLongTopics {@link List} - */ - private void offlineRmovedTopic(List<String> oldInLongTopics) { - for (String fetchKey : oldInLongTopics) { - logger.info("offlineRmovedTopic {}", fetchKey); - InLongTopic inLongTopic = fetchers.get(fetchKey).getInLongTopic(); - InLongTopicFetcher inLongTopicFetcher = fetchers.getOrDefault(fetchKey, null); - if (inLongTopicFetcher != null) { - inLongTopicFetcher.close(); - } - fetchers.remove(fetchKey); - if (context != null && context.getStatManager() != null && inLongTopic != null) { - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), - inLongTopic.getTopic()) - .addTopicOfflineTimes(1); - } else { - logger.error("context == null or context.getStatManager() == null or inLongTopic == null :{}", - inLongTopic); - } - } - } - - /** - * online new inlong topic - * - * @param newSubscribedInLongTopics List - * @param reallyNewTopic List - */ - private void onlineNewTopic(List<InLongTopic> newSubscribedInLongTopics, List<String> reallyNewTopic) { - for (InLongTopic inLongTopic : newSubscribedInLongTopics) { - if (!reallyNewTopic.contains(inLongTopic.getTopicKey())) { - logger.debug("!reallyNewTopic.contains(inLongTopic.getTopicKey())"); - continue; - } - onlineTopic(inLongTopic); - } - } - - private void onlineTopic(InLongTopic inLongTopic) { - if (InlongTopicTypeEnum.PULSAR.getName().equalsIgnoreCase(inLongTopic.getTopicType())) { - logger.info("the topic is pulsar:{}", inLongTopic); - onlinePulsarTopic(inLongTopic); - } else if (InlongTopicTypeEnum.KAFKA.getName().equalsIgnoreCase(inLongTopic.getTopicType())) { - logger.info("the topic is kafka:{}", inLongTopic); - onlineKafkaTopic(inLongTopic); - } else if (InlongTopicTypeEnum.TUBE.getName().equalsIgnoreCase(inLongTopic.getTopicType())) { - logger.info("the topic is tube:{}", inLongTopic); - onlineTubeTopic(inLongTopic); - } else { - logger.error("topic type:{} not support", inLongTopic.getTopicType()); - } - } - - private void onlinePulsarTopic(InLongTopic inLongTopic) { - if (!checkAndCreateNewPulsarClient(inLongTopic)) { - logger.error("checkAndCreateNewPulsarClient error:{}", inLongTopic); - return; - } - createNewFetcher(inLongTopic); - } - - private boolean checkAndCreateNewPulsarClient(InLongTopic inLongTopic) { - if (!pulsarClients.containsKey(inLongTopic.getInLongCluster().getClusterId())) { - if (inLongTopic.getInLongCluster().getBootstraps() != null) { - try { - PulsarClient pulsarClient = PulsarClient.builder() - .serviceUrl(inLongTopic.getInLongCluster().getBootstraps()) - .authentication(AuthenticationFactory.token(inLongTopic.getInLongCluster().getToken())) - .statsInterval(context.getConfig().getStatsIntervalSeconds(), TimeUnit.SECONDS) - .build(); - pulsarClients.put(inLongTopic.getInLongCluster().getClusterId(), pulsarClient); - logger.debug("create pulsar client succ {}", - new String[]{inLongTopic.getInLongCluster().getClusterId(), - inLongTopic.getInLongCluster().getBootstraps(), - inLongTopic.getInLongCluster().getToken()}); - } catch (Exception e) { - logger.error("create pulsar client error {}", inLongTopic); - logger.error(e.getMessage(), e); - return false; - } - } else { - logger.error("bootstrap is null {}", inLongTopic.getInLongCluster()); - return false; - } - } - logger.info("create pulsar client true {}", inLongTopic); - return true; - } - - private boolean checkAndCreateNewTubeSessionFactory(InLongTopic inLongTopic) { - if (!tubeFactories.containsKey(inLongTopic.getInLongCluster().getClusterId())) { - if (inLongTopic.getInLongCluster().getBootstraps() != null) { - try { - // create MessageSessionFactory - TubeClientConfig tubeConfig = new TubeClientConfig(inLongTopic.getInLongCluster().getBootstraps()); - MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(tubeConfig); - TubeConsumerCreator tubeConsumerCreator = new TubeConsumerCreator(messageSessionFactory, - tubeConfig); - tubeFactories.put(inLongTopic.getInLongCluster().getClusterId(), tubeConsumerCreator); - logger.debug("create tube client succ {} {} {}", - new String[]{inLongTopic.getInLongCluster().getClusterId(), - inLongTopic.getInLongCluster().getBootstraps(), - inLongTopic.getInLongCluster().getToken()}); - } catch (Exception e) { - logger.error("create tube client error {}", inLongTopic); - logger.error(e.getMessage(), e); - return false; - } - } else { - logger.info("bootstrap is null {}", inLongTopic.getInLongCluster()); - return false; - } - } - logger.info("create pulsar client true {}", inLongTopic); - return true; - } - - private void onlineKafkaTopic(InLongTopic inLongTopic) { - createNewFetcher(inLongTopic); - } - - private void onlineTubeTopic(InLongTopic inLongTopic) { - if (!checkAndCreateNewTubeSessionFactory(inLongTopic)) { - logger.error("checkAndCreateNewPulsarClient error:{}", inLongTopic); - return; - } - createNewFetcher(inLongTopic); - } - - private void createNewFetcher(InLongTopic inLongTopic) { - if (!fetchers.containsKey(inLongTopic.getTopicKey())) { - logger.info("begin add Fetcher:{}", inLongTopic.getTopicKey()); - if (context != null && context.getStatManager() != null) { - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addTopicOnlineTimes(1); - InLongTopicFetcher fetcher = addFetcher(inLongTopic); - if (fetcher == null) { - fetchers.remove(inLongTopic.getTopicKey()); - logger.error("add fetcher error:{}", inLongTopic.getTopicKey()); - } - } else { - logger.error("context == null or context.getStatManager() == null"); - } - } - } - - private class UpdateMetaDataThread extends PeriodicTask { - - public UpdateMetaDataThread(long runInterval, TimeUnit timeUnit) { - super(runInterval, timeUnit, context.getConfig()); - } - - @Override - protected void doWork() { - logger.debug("InLongTopicManagerImpl doWork"); - if (stopAssign) { - logger.warn("assign is stoped"); - return; - } - // get sortTask conf from manager - if (queryConsumeConfig != null) { - long start = System.currentTimeMillis(); - context.getStatManager().getStatistics(context.getConfig().getSortTaskId()) - .addRequestManagerTimes(1); - ConsumeConfig consumeConfig = queryConsumeConfig - .queryCurrentConsumeConfig(context.getConfig().getSortTaskId()); - context.getStatManager().getStatistics(context.getConfig().getSortTaskId()) - .addRequestManagerTimeCost(System.currentTimeMillis() - start); - - if (consumeConfig != null) { - handleCurrentConsumeConfig(consumeConfig.getTopics()); - } else { - logger.warn("subscribedInfo is null"); - context.getStatManager().getStatistics(context.getConfig().getSortTaskId()) - .addRequestManagerFailTimes(1); - } - } else { - logger.error("subscribedMetaDataInfo is null"); - } - } - } -} diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/ManagerReporter.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/ManagerReporter.java index 8c7a3b2a8..f2fdd5668 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/ManagerReporter.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/ManagerReporter.java @@ -25,22 +25,23 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import org.apache.inlong.sdk.sort.api.ClientContext; -import org.apache.inlong.sdk.sort.api.InLongTopicFetcher; -import org.apache.inlong.sdk.sort.api.InlongTopicManager; import org.apache.inlong.sdk.sort.api.ManagerReportHandler; import org.apache.inlong.sdk.sort.api.ReportApi; +import org.apache.inlong.sdk.sort.api.TopicFetcher; +import org.apache.inlong.sdk.sort.api.TopicManager; import org.apache.inlong.sdk.sort.entity.ConsumeState; import org.apache.inlong.sdk.sort.entity.ConsumeStatusParams; import org.apache.inlong.sdk.sort.entity.ConsumeStatusResult; import org.apache.inlong.sdk.sort.entity.HeartBeatParams; import org.apache.inlong.sdk.sort.entity.HeartBeatResult; +import org.apache.inlong.sdk.sort.entity.InLongTopic; import org.apache.inlong.sdk.sort.util.PeriodicTask; public class ManagerReporter extends PeriodicTask { private final ConcurrentHashMap<Integer, Long> reportApiRunTimeMs = new ConcurrentHashMap<>(); private final ClientContext context; - private final InlongTopicManager inLongTopicManager; + private final TopicManager inLongTopicManager; private final ManagerReportHandler reportHandler; private Map<Integer, Long> reportApiInterval = new HashMap<>(); @@ -53,8 +54,7 @@ public class ManagerReporter extends PeriodicTask { * @param runInterval long * @param timeUnit TimeUnit */ - public ManagerReporter(ClientContext context, ManagerReportHandler reportHandler, - InlongTopicManager inLongTopicManager, + public ManagerReporter(ClientContext context, ManagerReportHandler reportHandler, TopicManager inLongTopicManager, long runInterval, TimeUnit timeUnit) { super(runInterval, timeUnit, context.getConfig()); this.context = context; @@ -158,17 +158,17 @@ public class ManagerReporter extends PeriodicTask { consumeStatusParams.setSubscribedId(context.getConfig().getSortTaskId()); consumeStatusParams.setIp(context.getConfig().getLocalIp()); List<ConsumeState> consumeStates = new ArrayList<>(); - Collection<InLongTopicFetcher> allFetchers = + Collection<TopicFetcher> allFetchers = inLongTopicManager.getAllFetchers(); - for (InLongTopicFetcher fetcher : allFetchers) { - ConsumeState consumeState = new ConsumeState(); - consumeState.setTopic(fetcher.getInLongTopic().getTopic()); - consumeState.setTopicType(fetcher.getInLongTopic().getTopicType()); - consumeState.setClusterId(fetcher.getInLongTopic().getInLongCluster().getClusterId()); - consumeState.setConsumedDataSize(fetcher.getConsumedDataSize()); - consumeState.setAckOffset(fetcher.getAckedOffset()); - consumeState.setPartition(fetcher.getInLongTopic().getPartitionId()); - consumeStates.add(consumeState); + for (TopicFetcher fetcher : allFetchers) { + for (InLongTopic topic : fetcher.getTopics()) { + ConsumeState consumeState = new ConsumeState(); + consumeState.setTopic(topic.getTopic()); + consumeState.setTopicType(topic.getTopicType()); + consumeState.setClusterId(topic.getInLongCluster().getClusterId()); + consumeState.setPartition(topic.getPartitionId()); + consumeStates.add(consumeState); + } } consumeStatusParams.setConsumeStates(consumeStates); diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImpl.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImpl.java index 753c39f60..a381b7910 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImpl.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImpl.java @@ -19,18 +19,21 @@ package org.apache.inlong.sdk.sort.impl; import org.apache.inlong.sdk.sort.api.Cleanable; import org.apache.inlong.sdk.sort.api.ClientContext; -import org.apache.inlong.sdk.sort.api.InLongTopicFetcher; -import org.apache.inlong.sdk.sort.api.InlongTopicManager; import org.apache.inlong.sdk.sort.api.ManagerReportHandler; import org.apache.inlong.sdk.sort.api.MetricReporter; import org.apache.inlong.sdk.sort.api.QueryConsumeConfig; import org.apache.inlong.sdk.sort.api.SortClient; import org.apache.inlong.sdk.sort.api.SortClientConfig; +import org.apache.inlong.sdk.sort.api.TopicFetcher; +import org.apache.inlong.sdk.sort.api.TopicManager; import org.apache.inlong.sdk.sort.exception.NotExistException; +import org.apache.inlong.sdk.sort.api.InlongTopicManagerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Deprecated +/** + * New version of sort client. + */ public class SortClientImpl extends SortClient { private final String logPrefix = "[" + SortClientImpl.class.getSimpleName() + "] "; @@ -40,7 +43,7 @@ public class SortClientImpl extends SortClient { private final ClientContext context; - private final InlongTopicManager inLongTopicManager; + private final TopicManager inLongTopicManager; /** * SortClient Constructor @@ -51,7 +54,10 @@ public class SortClientImpl extends SortClient { try { this.sortClientConfig = sortClientConfig; this.context = new ClientContextImpl(this.sortClientConfig, new MetricReporterImpl(sortClientConfig)); - this.inLongTopicManager = new InlongTopicManagerImpl(context, new QueryConsumeConfigImpl(context)); + + this.inLongTopicManager = InlongTopicManagerFactory + .createInLongTopicManager(sortClientConfig.getTopicType(), + context, new QueryConsumeConfigImpl(context)); } catch (Exception e) { this.close(); throw e; @@ -71,7 +77,10 @@ public class SortClientImpl extends SortClient { try { this.sortClientConfig = sortClientConfig; this.context = new ClientContextImpl(this.sortClientConfig, metricReporter); - this.inLongTopicManager = new InlongTopicManagerImpl(context, new QueryConsumeConfigImpl(context)); + queryConsumeConfig.configure(context); + this.inLongTopicManager = InlongTopicManagerFactory + .createInLongTopicManager(sortClientConfig.getTopicType(), + context, queryConsumeConfig); } catch (Exception e) { e.printStackTrace(); this.close(); @@ -102,8 +111,8 @@ public class SortClientImpl extends SortClient { public void ack(String msgKey, String msgOffset) throws Exception { logger.debug("ack:{} offset:{}", msgKey, msgOffset); - InLongTopicFetcher inLongTopicFetcher = getFetcher(msgKey); - inLongTopicFetcher.ack(msgOffset); + TopicFetcher topicFetcher = getFetcher(msgKey); + topicFetcher.ack(msgOffset); } /** @@ -128,12 +137,12 @@ public class SortClientImpl extends SortClient { return this.sortClientConfig; } - private InLongTopicFetcher getFetcher(String msgKey) throws NotExistException { - InLongTopicFetcher inLongTopicFetcher = inLongTopicManager.getFetcher(msgKey); - if (inLongTopicFetcher == null) { + private TopicFetcher getFetcher(String msgKey) throws NotExistException { + TopicFetcher topicFetcher = inLongTopicManager.getFetcher(msgKey); + if (topicFetcher == null) { throw new NotExistException(msgKey + " not exist."); } - return inLongTopicFetcher; + return topicFetcher; } private boolean doClose(Cleanable c) { diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java deleted file mode 100644 index 3a730d082..000000000 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.sort.impl; - -import org.apache.inlong.sdk.sort.api.Cleanable; -import org.apache.inlong.sdk.sort.api.ClientContext; -import org.apache.inlong.sdk.sort.api.ManagerReportHandler; -import org.apache.inlong.sdk.sort.api.MetricReporter; -import org.apache.inlong.sdk.sort.api.QueryConsumeConfig; -import org.apache.inlong.sdk.sort.api.SortClient; -import org.apache.inlong.sdk.sort.api.SortClientConfig; -import org.apache.inlong.sdk.sort.api.TopicFetcher; -import org.apache.inlong.sdk.sort.api.TopicManager; -import org.apache.inlong.sdk.sort.exception.NotExistException; -import org.apache.inlong.sdk.sort.api.InlongTopicManagerFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * New version of sort client. - */ -public class SortClientImplV2 extends SortClient { - - private final String logPrefix = "[" + SortClientImpl.class.getSimpleName() + "] "; - private final Logger logger = LoggerFactory.getLogger(SortClientImpl.class); - - private final SortClientConfig sortClientConfig; - - private final ClientContext context; - - private final TopicManager inLongTopicManager; - - /** - * SortClient Constructor - * - * @param sortClientConfig SortClientConfig - */ - public SortClientImplV2(SortClientConfig sortClientConfig) { - try { - this.sortClientConfig = sortClientConfig; - this.context = new ClientContextImpl(this.sortClientConfig, new MetricReporterImpl(sortClientConfig)); - - this.inLongTopicManager = InlongTopicManagerFactory - .createInLongTopicManager(sortClientConfig.getTopicType(), - context, new QueryConsumeConfigImpl(context)); - } catch (Exception e) { - this.close(); - throw e; - } - } - - /** - * SortClient Constructor with user defined QueryConsumeConfig,MetricReporter and ManagerReportHandler - * - * @param sortClientConfig SortClientConfig - * @param queryConsumeConfig QueryConsumeConfig - * @param metricReporter MetricReporter - * @param managerReportHandler ManagerReportHandler - */ - public SortClientImplV2(SortClientConfig sortClientConfig, QueryConsumeConfig queryConsumeConfig, - MetricReporter metricReporter, ManagerReportHandler managerReportHandler) { - try { - this.sortClientConfig = sortClientConfig; - this.context = new ClientContextImpl(this.sortClientConfig, metricReporter); - queryConsumeConfig.configure(context); - this.inLongTopicManager = InlongTopicManagerFactory - .createInLongTopicManager(sortClientConfig.getTopicType(), - context, queryConsumeConfig); - } catch (Exception e) { - e.printStackTrace(); - this.close(); - throw e; - } - } - - /** - * init SortClient - * - * @return true/false - * @throws Throwable - */ - @Override - public boolean init() throws Throwable { - logger.info(logPrefix + "init|" + sortClientConfig); - return true; - } - - /** - * ack offset to msgKey - * - * @param msgKey String - * @param msgOffset String - * @throws Exception - */ - @Override - public void ack(String msgKey, String msgOffset) - throws Exception { - logger.debug("ack:{} offset:{}", msgKey, msgOffset); - TopicFetcher topicFetcher = getFetcher(msgKey); - topicFetcher.ack(msgOffset); - } - - /** - * close SortClient - * - * @return true/false - */ - @Override - public boolean close() { - boolean cleanInLongTopicManager = doClose(inLongTopicManager); - boolean cleanContext = doClose(context); - - logger.info(logPrefix - - + "|cleanInLongTopicManager=" + cleanInLongTopicManager - + "|cleanContext=" + cleanContext); - return (cleanInLongTopicManager && cleanContext); - } - - @Override - public SortClientConfig getConfig() { - return this.sortClientConfig; - } - - private TopicFetcher getFetcher(String msgKey) throws NotExistException { - TopicFetcher topicFetcher = inLongTopicManager.getFetcher(msgKey); - if (topicFetcher == null) { - throw new NotExistException(msgKey + " not exist."); - } - return topicFetcher; - } - - private boolean doClose(Cleanable c) { - try { - if (c != null) { - return c.clean(); - } - return true; - } catch (Throwable th) { - logger.error(logPrefix + "clean error.", th); - return false; - } - } -} diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java deleted file mode 100644 index deffa3272..000000000 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java +++ /dev/null @@ -1,343 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.sort.impl.kafka; - -import com.google.gson.Gson; - -import org.apache.inlong.sdk.sort.api.ClientContext; -import org.apache.inlong.sdk.sort.api.InLongTopicFetcher; -import org.apache.inlong.sdk.sort.api.SeekerFactory; -import org.apache.inlong.sdk.sort.api.SortClientConfig.ConsumeStrategy; -import org.apache.inlong.sdk.sort.entity.InLongMessage; -import org.apache.inlong.sdk.sort.entity.InLongTopic; -import org.apache.inlong.sdk.sort.entity.MessageRecord; -import org.apache.inlong.sdk.sort.fetcher.kafka.AckOffsetOnRebalance; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.consumer.RangeAssignor; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -@Deprecated -public class InLongKafkaFetcherImpl extends InLongTopicFetcher { - - private final Logger logger = LoggerFactory.getLogger(InLongKafkaFetcherImpl.class); - private final ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap = new ConcurrentHashMap<>(); - private final AtomicLong ackOffsets = new AtomicLong(0); - private volatile boolean stopConsume = false; - private String bootstrapServers; - private KafkaConsumer<byte[], byte[]> consumer; - - public InLongKafkaFetcherImpl(InLongTopic inLongTopic, ClientContext context) { - super(inLongTopic, context); - } - - @Override - public boolean init(Object object) { - String bootstrapServers = (String) object; - try { - createKafkaConsumer(bootstrapServers); - if (consumer != null) { - logger.info("start to subscribe topic:{}", new Gson().toJson(inLongTopic)); - this.seeker = SeekerFactory.createKafkaSeeker(consumer, inLongTopic); - consumer.subscribe(Collections.singletonList(inLongTopic.getTopic()), - new AckOffsetOnRebalance(this.inLongTopic.getInLongCluster().getClusterId(), seeker, - commitOffsetMap, consumer)); - } else { - logger.info("consumer is null"); - return false; - } - this.bootstrapServers = bootstrapServers; - String threadName = String.format("sort_sdk_fetch_thread_%s_%s_%d", - this.inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic(), this.hashCode()); - this.fetchThread = new Thread(new Fetcher(), threadName); - logger.info("start to start thread:{}", threadName); - this.fetchThread.start(); - } catch (Exception e) { - logger.error(e.getMessage(), e); - return false; - } - return true; - } - - @Override - public void ack(String msgOffset) throws Exception { - String[] offset = msgOffset.split(":"); - if (offset.length == 2) { - TopicPartition topicPartition = new TopicPartition(inLongTopic.getTopic(), Integer.parseInt(offset[0])); - OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(Long.parseLong(offset[1])); - commitOffsetMap.put(topicPartition, offsetAndMetadata); - } else { - throw new Exception("offset is illegal, the correct format is int:long ,the error offset is:" + msgOffset); - } - } - - @Override - public void pause() { - this.stopConsume = true; - } - - @Override - public void resume() { - this.stopConsume = false; - } - - @Override - public boolean close() { - this.closed = true; - try { - if (fetchThread != null) { - fetchThread.interrupt(); - } - if (consumer != null) { - consumer.close(); - } - } catch (Throwable throwable) { - throwable.printStackTrace(); - } - logger.info("closed {}", inLongTopic); - return true; - } - - @Override - public boolean isClosed() { - return closed; - } - - @Override - public void stopConsume(boolean stopConsume) { - this.stopConsume = stopConsume; - } - - @Override - public boolean isConsumeStop() { - return this.stopConsume; - } - - @Override - public InLongTopic getInLongTopic() { - return inLongTopic; - } - - @Override - public long getConsumedDataSize() { - return 0; - } - - @Override - public long getAckedOffset() { - return 0; - } - - private void createKafkaConsumer(String bootstrapServers) { - Properties properties = new Properties(); - properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - properties.put(ConsumerConfig.GROUP_ID_CONFIG, context.getConfig().getSortTaskId()); - properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - ByteArrayDeserializer.class.getName()); - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - ByteArrayDeserializer.class.getName()); - properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, - context.getConfig().getKafkaSocketRecvBufferSize()); - properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - ConsumeStrategy offsetResetStrategy = context.getConfig().getOffsetResetStrategy(); - if (offsetResetStrategy == ConsumeStrategy.lastest - || offsetResetStrategy == ConsumeStrategy.lastest_absolutely) { - properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); - } else if (offsetResetStrategy == ConsumeStrategy.earliest - || offsetResetStrategy == ConsumeStrategy.earliest_absolutely) { - properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - } else { - properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); - } - properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, - context.getConfig().getKafkaFetchSizeBytes()); - properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, - context.getConfig().getKafkaFetchWaitMs()); - properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, - RangeAssignor.class.getName()); - properties.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 120000L); - this.bootstrapServers = bootstrapServers; - logger.info("start to create kafka consumer:{}", properties); - this.consumer = new KafkaConsumer<>(properties); - logger.info("end to create kafka consumer:{}", consumer); - } - - public class Fetcher implements Runnable { - - private void commitKafkaOffset() { - if (consumer != null && commitOffsetMap.size() > 0) { - try { - consumer.commitSync(commitOffsetMap); - commitOffsetMap.clear(); - // TODO monitor commit succ - - } catch (Exception e) { - // TODO monitor commit fail - logger.error(e.getMessage(), e); - } - } - } - - /** - * put the received msg to onFinished method - * - * @param messageRecords {@link List < MessageRecord >} - */ - private void handleAndCallbackMsg(List<MessageRecord> messageRecords) { - long start = System.currentTimeMillis(); - try { - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addCallbackTimes(1); - context.getConfig().getCallback().onFinishedBatch(messageRecords); - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addCallbackTimeCost(System.currentTimeMillis() - start).addCallbackDoneTimes(1); - } catch (Exception e) { - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addCallbackErrorTimes(1); - logger.error(e.getMessage(), e); - } - } - - private String getOffset(int partitionId, long offset) { - return partitionId + ":" + offset; - } - - private Map<String, String> getMsgHeaders(Headers headers) { - Map<String, String> headerMap = new HashMap<>(); - for (Header header : headers) { - headerMap.put(header.key(), new String(header.value())); - } - return headerMap; - } - - @Override - public void run() { - boolean hasPermit; - while (true) { - hasPermit = false; - try { - if (context.getConfig().isStopConsume() || stopConsume) { - TimeUnit.MILLISECONDS.sleep(50); - continue; - } - - if (sleepTime > 0) { - TimeUnit.MILLISECONDS.sleep(sleepTime); - } - - context.acquireRequestPermit(); - hasPermit = true; - // fetch from kafka - fetchFromKafka(); - // commit - commitKafkaOffset(); - } catch (Exception e) { - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addFetchErrorTimes(1); - logger.error(e.getMessage(), e); - } finally { - if (hasPermit) { - context.releaseRequestPermit(); - } - } - } - } - - private void fetchFromKafka() throws Exception { - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addMsgCount(1).addFetchTimes(1); - - long startFetchTime = System.currentTimeMillis(); - ConsumerRecords<byte[], byte[]> records = consumer - .poll(Duration.ofMillis(context.getConfig().getKafkaFetchWaitMs())); - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addFetchTimeCost(System.currentTimeMillis() - startFetchTime); - if (null != records && !records.isEmpty()) { - - for (ConsumerRecord<byte[], byte[]> msg : records) { - List<MessageRecord> msgs = new ArrayList<>(); - String offsetKey = getOffset(msg.partition(), msg.offset()); - List<InLongMessage> inLongMessages = deserializer - .deserialize(context, inLongTopic, getMsgHeaders(msg.headers()), msg.value()); - inLongMessages = interceptor.intercept(inLongMessages); - if (inLongMessages.isEmpty()) { - ack(offsetKey); - continue; - } - - msgs.add(new MessageRecord(inLongTopic.getTopicKey(), - inLongMessages, - offsetKey, System.currentTimeMillis())); - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addConsumeSize(msg.value().length); - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addMsgCount(msgs.size()); - handleAndCallbackMsg(msgs); - } - sleepTime = 0L; - } else { - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addEmptyFetchTimes(1); - emptyFetchTimes++; - if (emptyFetchTimes >= context.getConfig().getEmptyPollTimes()) { - sleepTime = Math.min((sleepTime += context.getConfig().getEmptyPollSleepStepMs()), - context.getConfig().getMaxEmptyPollSleepMs()); - emptyFetchTimes = 0; - } - } - } - } -} diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java deleted file mode 100644 index fddd9ec11..000000000 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java +++ /dev/null @@ -1,332 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.sort.impl.pulsar; - -import org.apache.commons.lang3.StringUtils; -import org.apache.inlong.sdk.sort.api.ClientContext; -import org.apache.inlong.sdk.sort.api.InLongTopicFetcher; -import org.apache.inlong.sdk.sort.entity.InLongMessage; -import org.apache.inlong.sdk.sort.entity.InLongTopic; -import org.apache.inlong.sdk.sort.entity.MessageRecord; -import org.apache.inlong.sdk.sort.util.StringUtil; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.Messages; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.SubscriptionType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Base64; -import java.util.Date; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -public class InLongPulsarFetcherImpl extends InLongTopicFetcher { - - private final Logger logger = LoggerFactory.getLogger(InLongPulsarFetcherImpl.class); - private final ReentrantReadWriteLock mainLock = new ReentrantReadWriteLock(true); - private final ConcurrentHashMap<String, MessageId> offsetCache = new ConcurrentHashMap<>(); - private Consumer<byte[]> consumer; - - public InLongPulsarFetcherImpl(InLongTopic inLongTopic, - ClientContext context) { - super(inLongTopic, context); - } - - @Override - public void stopConsume(boolean stopConsume) { - this.isStopConsume = stopConsume; - } - - @Override - public boolean isConsumeStop() { - return isStopConsume; - } - - @Override - public InLongTopic getInLongTopic() { - return inLongTopic; - } - - @Override - public long getConsumedDataSize() { - return 0L; - } - - @Override - public long getAckedOffset() { - return 0L; - } - - private void ackSucc(String offset) { - offsetCache.remove(offset); - context.getStatManager().getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()).addAckSuccTimes(1L); - } - - /** - * ack Offset - * - * @param msgOffset String - */ - @Override - public void ack(String msgOffset) throws Exception { - if (!StringUtils.isEmpty(msgOffset)) { - try { - if (consumer == null) { - context.getStatManager().getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addAckFailTimes(1L); - logger.error("consumer == null {}", inLongTopic); - return; - } - MessageId messageId = offsetCache.get(msgOffset); - if (messageId == null) { - context.getStatManager().getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addAckFailTimes(1L); - logger.error("messageId == null {}", inLongTopic); - return; - } - consumer.acknowledgeAsync(messageId) - .thenAccept(consumer -> ackSucc(msgOffset)) - .exceptionally(exception -> { - logger.error("ack fail:{} {},error:{}", - inLongTopic, msgOffset, exception.getMessage(), exception); - context.getStatManager().getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addAckFailTimes(1L); - return null; - }); - } catch (Exception e) { - context.getStatManager().getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()).addAckFailTimes(1L); - logger.error(e.getMessage(), e); - throw e; - } - } - } - - /** - * create Consumer and fetch thread - * - * @return boolean - */ - @Override - public boolean init(Object object) { - PulsarClient pulsarClient = (PulsarClient) object; - return createConsumer(pulsarClient); - } - - private boolean createConsumer(PulsarClient client) { - if (null == client) { - return false; - } - try { - consumer = client.newConsumer(Schema.BYTES) - .topic(inLongTopic.getTopic()) - .subscriptionName(context.getConfig().getSortTaskId()) - .subscriptionType(SubscriptionType.Shared) - .startMessageIdInclusive() - .ackTimeout(context.getConfig().getAckTimeoutSec(), TimeUnit.SECONDS) - .receiverQueueSize(context.getConfig().getPulsarReceiveQueueSize()) - .subscribe(); - - String threadName = "sort_sdk_fetch_thread_" + StringUtil.formatDate(new Date()); - this.fetchThread = new Thread(new Fetcher(), threadName); - this.fetchThread.start(); - } catch (Exception e) { - logger.error(e.getMessage(), e); - return false; - } - return true; - } - - /** - * pause - */ - @Override - public void pause() { - if (consumer != null) { - consumer.pause(); - } - } - - /** - * resume - */ - @Override - public void resume() { - if (consumer != null) { - consumer.resume(); - } - } - - /** - * close - * - * @return true/false - */ - @Override - public boolean close() { - mainLock.writeLock().lock(); - try { - try { - if (consumer != null) { - consumer.close(); - } - if (fetchThread != null) { - fetchThread.interrupt(); - } - } catch (PulsarClientException e) { - e.printStackTrace(); - } - logger.info("closed {}", inLongTopic); - return true; - } finally { - this.closed = true; - mainLock.writeLock().unlock(); - } - } - - @Override - public boolean isClosed() { - return closed; - } - - public class Fetcher implements Runnable { - - /** - * put the received msg to onFinished method - * - * @param messageRecords {@link List} - */ - private void handleAndCallbackMsg(List<MessageRecord> messageRecords) { - long start = System.currentTimeMillis(); - try { - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addCallbackTimes(1L); - context.getConfig().getCallback().onFinishedBatch(messageRecords); - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addCallbackTimeCost(System.currentTimeMillis() - start).addCallbackDoneTimes(1L); - } catch (Exception e) { - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addCallbackErrorTimes(1L); - e.printStackTrace(); - } - } - - private String getOffset(MessageId msgId) { - return Base64.getEncoder().encodeToString(msgId.toByteArray()); - } - - @Override - public void run() { - boolean hasPermit; - while (true) { - hasPermit = false; - try { - if (context.getConfig().isStopConsume() || isStopConsume) { - TimeUnit.MILLISECONDS.sleep(50); - continue; - } - - if (sleepTime > 0) { - TimeUnit.MILLISECONDS.sleep(sleepTime); - } - - context.acquireRequestPermit(); - hasPermit = true; - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addMsgCount(1L).addFetchTimes(1L); - - long startFetchTime = System.currentTimeMillis(); - Messages<byte[]> messages = consumer.batchReceive(); - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addFetchTimeCost(System.currentTimeMillis() - startFetchTime); - if (null != messages && messages.size() != 0) { - for (Message<byte[]> msg : messages) { - List<MessageRecord> msgs = new ArrayList<>(); - String offsetKey = getOffset(msg.getMessageId()); - offsetCache.put(offsetKey, msg.getMessageId()); - - List<InLongMessage> inLongMessages = deserializer - .deserialize(context, inLongTopic, msg.getProperties(), msg.getData()); - - msgs.add(new MessageRecord(inLongTopic.getTopicKey(), - inLongMessages, - offsetKey, System.currentTimeMillis())); - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addConsumeSize(msg.getData().length); - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addMsgCount(msgs.size()); - handleAndCallbackMsg(msgs); - } - sleepTime = 0L; - } else { - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addEmptyFetchTimes(1L); - emptyFetchTimes++; - if (emptyFetchTimes >= context.getConfig().getEmptyPollTimes()) { - sleepTime = Math.min((sleepTime += context.getConfig().getEmptyPollSleepStepMs()), - context.getConfig().getMaxEmptyPollSleepMs()); - emptyFetchTimes = 0; - } - } - } catch (Exception e) { - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addFetchErrorTimes(1L); - logger.error(e.getMessage(), e); - } finally { - if (hasPermit) { - context.releaseRequestPermit(); - } - } - - if (closed) { - break; - } - } - } - } -} \ No newline at end of file diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java deleted file mode 100644 index f4d2f3624..000000000 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java +++ /dev/null @@ -1,320 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.sort.impl.tube; - -import com.google.common.base.Splitter; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeSet; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.lang3.StringUtils; -import org.apache.inlong.sdk.sort.api.ClientContext; -import org.apache.inlong.sdk.sort.api.InLongTopicFetcher; -import org.apache.inlong.sdk.sort.api.SysConstants; -import org.apache.inlong.sdk.sort.entity.InLongMessage; -import org.apache.inlong.sdk.sort.entity.InLongTopic; -import org.apache.inlong.sdk.sort.entity.MessageRecord; -import org.apache.inlong.sdk.sort.fetcher.tube.TubeConsumerCreator; -import org.apache.inlong.sdk.sort.util.StringUtil; -import org.apache.inlong.tubemq.client.config.ConsumerConfig; -import org.apache.inlong.tubemq.client.config.TubeClientConfig; -import org.apache.inlong.tubemq.client.consumer.ConsumerResult; -import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer; -import org.apache.inlong.tubemq.corebase.Message; -import org.apache.inlong.tubemq.corebase.TErrCodeConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Deprecated -public class InLongTubeFetcherImpl extends InLongTopicFetcher { - - private static final Logger LOG = LoggerFactory.getLogger(InLongTubeFetcherImpl.class); - private PullMessageConsumer messageConsumer; - private volatile Thread fetchThread; - - public InLongTubeFetcherImpl(InLongTopic inLongTopic, ClientContext context) { - super(inLongTopic, context); - } - - @Override - public boolean init(Object object) { - TubeConsumerCreator tubeConsumerCreator = (TubeConsumerCreator) object; - TubeClientConfig tubeClientConfig = tubeConsumerCreator.getTubeClientConfig(); - try { - ConsumerConfig consumerConfig = new ConsumerConfig(tubeClientConfig.getMasterInfo(), - context.getConfig().getSortTaskId()); - - messageConsumer = tubeConsumerCreator.getMessageSessionFactory().createPullConsumer(consumerConfig); - if (messageConsumer != null) { - TreeSet<String> filters = null; - if (inLongTopic.getProperties() != null && inLongTopic.getProperties().containsKey( - SysConstants.TUBE_TOPIC_FILTER_KEY)) { - String filterStr = inLongTopic.getProperties().get(SysConstants.TUBE_TOPIC_FILTER_KEY); - String[] filterArray = filterStr.split(" "); - filters = new TreeSet<>(Arrays.asList(filterArray)); - } - messageConsumer.subscribe(inLongTopic.getTopic(), filters); - messageConsumer.completeSubscribe(); - - String threadName = "sort_sdk_fetch_thread_" + StringUtil.formatDate(new Date()); - this.fetchThread = new Thread(new Fetcher(), threadName); - this.fetchThread.start(); - } else { - return false; - } - } catch (Exception e) { - e.printStackTrace(); - return false; - } - return true; - } - - @Override - public void ack(String msgOffset) throws Exception { - if (!StringUtils.isEmpty(msgOffset)) { - if (messageConsumer == null) { - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addAckFailTimes(1L); - LOG.warn("consumer == null"); - return; - } - - try { - ConsumerResult consumerResult = messageConsumer.confirmConsume(msgOffset, true); - int errCode = consumerResult.getErrCode(); - if (TErrCodeConstants.SUCCESS != errCode) { - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addAckFailTimes(1L); - } else { - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addAckSuccTimes(1L); - } - } catch (Exception e) { - context.getStatManager().getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()).addAckFailTimes(1L); - LOG.error(e.getMessage(), e); - throw e; - } - } - } - - @Override - public void pause() { - this.closed = true; - } - - @Override - public void resume() { - this.closed = false; - } - - @Override - public boolean close() { - try { - if (fetchThread != null) { - fetchThread.interrupt(); - } - if (messageConsumer != null) { - messageConsumer.shutdown(); - } - } catch (Throwable throwable) { - throwable.printStackTrace(); - } finally { - this.closed = true; - } - LOG.info("closed {}", inLongTopic); - return true; - } - - @Override - public boolean isClosed() { - return closed; - } - - @Override - public void stopConsume(boolean stopConsume) { - this.isStopConsume = stopConsume; - } - - @Override - public boolean isConsumeStop() { - return isStopConsume; - } - - @Override - public InLongTopic getInLongTopic() { - return inLongTopic; - } - - @Override - public long getConsumedDataSize() { - return 0L; - } - - @Override - public long getAckedOffset() { - return 0L; - } - - public class Fetcher implements Runnable { - - /** - * put the received msg to onFinished method - * - * @param messageRecord {@link MessageRecord} - */ - private void handleAndCallbackMsg(MessageRecord messageRecord) { - long start = System.currentTimeMillis(); - try { - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addCallbackTimes(1L); - context.getConfig().getCallback().onFinishedBatch(Collections.singletonList(messageRecord)); - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addCallbackTimeCost(System.currentTimeMillis() - start).addCallbackDoneTimes(1L); - } catch (Exception e) { - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addCallbackErrorTimes(1L); - e.printStackTrace(); - } - } - - /** - * parseAttr from k1=v1&k2=v2 to kv map - * - * @param splitter {@link Splitter} - * @param attr String - * @param entrySplitterStr String - * @return {@link Map} - */ - private Map<String, String> parseAttr(Splitter splitter, String attr, String entrySplitterStr) { - Map<String, String> map = new HashMap<>(); - for (String s : splitter.split(attr)) { - int idx = s.indexOf(entrySplitterStr); - String k = s; - String v = null; - if (idx > 0) { - k = s.substring(0, idx); - v = s.substring(idx + 1); - } - map.put(k, v); - } - return map; - } - - private Map<String, String> getAttributeMap(String attribute) { - final Splitter splitter = Splitter.on("&"); - return parseAttr(splitter, attribute, "="); - } - - @Override - public void run() { - boolean hasPermit; - while (true) { - hasPermit = false; - try { - if (context.getConfig().isStopConsume() || isStopConsume) { - TimeUnit.MILLISECONDS.sleep(50L); - continue; - } - - if (sleepTime > 0) { - TimeUnit.MILLISECONDS.sleep(sleepTime); - } - - context.acquireRequestPermit(); - hasPermit = true; - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addMsgCount(1L).addFetchTimes(1L); - - long startFetchTime = System.currentTimeMillis(); - ConsumerResult message = messageConsumer.getMessage(); - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addFetchTimeCost(System.currentTimeMillis() - startFetchTime); - if (null != message && TErrCodeConstants.SUCCESS == message.getErrCode()) { - for (Message msg : message.getMessageList()) { - List<InLongMessage> msgs = new ArrayList<>(); - List<InLongMessage> deserialize = deserializer - .deserialize(context, inLongTopic, getAttributeMap(msg.getAttribute()), - msg.getData()); - deserialize = interceptor.intercept(deserialize); - if (deserialize.isEmpty()) { - continue; - } - msgs.addAll(deserialize); - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addMsgCount(deserialize.size()).addConsumeSize(msg.getData().length); - handleAndCallbackMsg(new MessageRecord(inLongTopic.getTopicKey(), msgs, - message.getConfirmContext(), System.currentTimeMillis())); - } - - sleepTime = 0L; - } else { - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addEmptyFetchTimes(1L); - emptyFetchTimes++; - if (emptyFetchTimes >= context.getConfig().getEmptyPollTimes()) { - sleepTime = Math.min((sleepTime += context.getConfig().getEmptyPollSleepStepMs()), - context.getConfig().getMaxEmptyPollSleepMs()); - emptyFetchTimes = 0; - } - } - } catch (Exception e) { - context.getStatManager() - .getStatistics(context.getConfig().getSortTaskId(), - inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()) - .addFetchErrorTimes(1L); - LOG.error(e.getMessage(), e); - } finally { - if (hasPermit) { - context.releaseRequestPermit(); - } - } - - if (closed) { - break; - } - } - } - } -} diff --git a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/InlongTopicManagerImplTest.java b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/InlongTopicManagerImplTest.java index 363a8244f..52011b3c6 100644 --- a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/InlongTopicManagerImplTest.java +++ b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/InlongTopicManagerImplTest.java @@ -24,12 +24,12 @@ import java.util.HashMap; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.inlong.sdk.sort.api.ClientContext; -import org.apache.inlong.sdk.sort.api.InLongTopicFetcher; -import org.apache.inlong.sdk.sort.api.InlongTopicManager; import org.apache.inlong.sdk.sort.api.QueryConsumeConfig; import org.apache.inlong.sdk.sort.api.SortClientConfig; +import org.apache.inlong.sdk.sort.api.TopicFetcher; import org.apache.inlong.sdk.sort.entity.CacheZoneCluster; import org.apache.inlong.sdk.sort.entity.InLongTopic; +import org.apache.inlong.sdk.sort.manager.InlongSingleTopicManager; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -45,7 +45,7 @@ public class InlongTopicManagerImplTest { private InLongTopic inLongTopic; private ClientContext clientContext; private QueryConsumeConfig queryConsumeConfig; - private InlongTopicManager inLongTopicManager; + private InlongSingleTopicManager inLongTopicManager; { System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString()); @@ -66,40 +66,40 @@ public class InlongTopicManagerImplTest { when(sortClientConfig.getSortTaskId()).thenReturn("test"); when(sortClientConfig.getUpdateMetaDataIntervalSec()).thenReturn(60); queryConsumeConfig = PowerMockito.mock(QueryConsumeConfigImpl.class); - inLongTopicManager = new InlongTopicManagerImpl(clientContext, queryConsumeConfig); + inLongTopicManager = new InlongSingleTopicManager(clientContext, queryConsumeConfig); } @Test public void testAddFetcher() { - InlongTopicManager inLongTopicManager = new InlongTopicManagerImpl(clientContext, queryConsumeConfig); + InlongSingleTopicManager inLongTopicManager = new InlongSingleTopicManager(clientContext, queryConsumeConfig); - InLongTopicFetcher inLongTopicFetcher = inLongTopicManager.addFetcher(inLongTopic); + TopicFetcher inLongTopicFetcher = inLongTopicManager.addTopic(inLongTopic); Assert.assertNull(inLongTopicFetcher); } @Test public void testRemoveFetcher() { - InLongTopicFetcher inLongTopicFetcher = inLongTopicManager.removeFetcher(inLongTopic, true); + TopicFetcher inLongTopicFetcher = inLongTopicManager.removeTopic(inLongTopic, true); Assert.assertNull(inLongTopicFetcher); - ConcurrentHashMap<String, InLongTopicFetcher> fetchers = new ConcurrentHashMap<>(); - InLongTopicFetcher inLongTopicFetcherRmMock = PowerMockito.mock(InLongTopicFetcher.class); + ConcurrentHashMap<String, TopicFetcher> fetchers = new ConcurrentHashMap<>(); + TopicFetcher inLongTopicFetcherRmMock = PowerMockito.mock(TopicFetcher.class); fetchers.put(inLongTopic.getTopicKey(), inLongTopicFetcherRmMock); Whitebox.setInternalState(inLongTopicManager, "fetchers", fetchers); - inLongTopicFetcher = inLongTopicManager.removeFetcher(inLongTopic, true); + inLongTopicFetcher = inLongTopicManager.removeTopic(inLongTopic, true); Assert.assertNotNull(inLongTopicFetcher); } @Test public void testGetFetcher() { - InLongTopicFetcher fetcher = inLongTopicManager.getFetcher(inLongTopic.getTopicKey()); + TopicFetcher fetcher = inLongTopicManager.getFetcher(inLongTopic.getTopicKey()); Assert.assertNull(fetcher); - ConcurrentHashMap<String, InLongTopicFetcher> fetchers = new ConcurrentHashMap<>(); - InLongTopicFetcher inLongTopicFetcherRmMock = PowerMockito.mock(InLongTopicFetcher.class); + ConcurrentHashMap<String, TopicFetcher> fetchers = new ConcurrentHashMap<>(); + TopicFetcher inLongTopicFetcherRmMock = PowerMockito.mock(TopicFetcher.class); fetchers.put(inLongTopic.getTopicKey(), inLongTopicFetcherRmMock); Whitebox.setInternalState(inLongTopicManager, "fetchers", fetchers); @@ -114,8 +114,8 @@ public class InlongTopicManagerImplTest { Set<String> managedInLongTopics = inLongTopicManager.getManagedInLongTopics(); Assert.assertEquals(0, managedInLongTopics.size()); - ConcurrentHashMap<String, InLongTopicFetcher> fetchers = new ConcurrentHashMap<>(); - InLongTopicFetcher inLongTopicFetcherRmMock = PowerMockito.mock(InLongTopicFetcher.class); + ConcurrentHashMap<String, TopicFetcher> fetchers = new ConcurrentHashMap<>(); + TopicFetcher inLongTopicFetcherRmMock = PowerMockito.mock(TopicFetcher.class); fetchers.put(inLongTopic.getTopicKey(), inLongTopicFetcherRmMock); Whitebox.setInternalState(inLongTopicManager, "fetchers", fetchers); managedInLongTopics = inLongTopicManager.getManagedInLongTopics(); @@ -125,11 +125,11 @@ public class InlongTopicManagerImplTest { @Test public void testGetAllFetchers() { - Collection<InLongTopicFetcher> allFetchers = inLongTopicManager.getAllFetchers(); + Collection<TopicFetcher> allFetchers = inLongTopicManager.getAllFetchers(); Assert.assertEquals(0, allFetchers.size()); - ConcurrentHashMap<String, InLongTopicFetcher> fetchers = new ConcurrentHashMap<>(); - InLongTopicFetcher inLongTopicFetcherRmMock = PowerMockito.mock(InLongTopicFetcher.class); + ConcurrentHashMap<String, TopicFetcher> fetchers = new ConcurrentHashMap<>(); + TopicFetcher inLongTopicFetcherRmMock = PowerMockito.mock(TopicFetcher.class); fetchers.put(inLongTopic.getTopicKey(), inLongTopicFetcherRmMock); Whitebox.setInternalState(inLongTopicManager, "fetchers", fetchers); allFetchers = inLongTopicManager.getAllFetchers(); diff --git a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializerTest.java b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializerTest.java index 815f1047c..4fc965625 100644 --- a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializerTest.java +++ b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializerTest.java @@ -17,9 +17,6 @@ package org.apache.inlong.sdk.sort.impl.decode; -import static org.mockito.ArgumentMatchers.anyString; -import static org.powermock.api.mockito.PowerMockito.when; - import com.google.protobuf.ByteString; import java.util.HashMap; import java.util.List; @@ -29,13 +26,10 @@ import org.apache.inlong.sdk.commons.protocol.ProxySdk.MapFieldEntry; import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj; import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObjs; import org.apache.inlong.sdk.sort.api.ClientContext; -import org.apache.inlong.sdk.sort.api.SortClientConfig; import org.apache.inlong.sdk.sort.entity.CacheZoneCluster; import org.apache.inlong.sdk.sort.entity.InLongMessage; import org.apache.inlong.sdk.sort.entity.InLongTopic; import org.apache.inlong.sdk.sort.impl.ClientContextImpl; -import org.apache.inlong.sdk.sort.stat.SortClientStateCounter; -import org.apache.inlong.sdk.sort.stat.StatManager; import org.apache.inlong.sdk.sort.util.Utils; import org.junit.Assert; import org.junit.Test; @@ -52,16 +46,12 @@ public class MessageDeserializerTest { private InLongTopic inLongTopic; private String testData; private MessageObjs messageObjs; - private SortClientConfig sortClientConfig; - private StatManager statManager; private void setUp() throws Exception { System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString()); messageDeserializer = new MessageDeserializer(); headers = new HashMap<>(); context = PowerMockito.mock(ClientContextImpl.class); - sortClientConfig = PowerMockito.mock(SortClientConfig.class); - statManager = PowerMockito.mock(StatManager.class); inLongTopic = new InLongTopic(); inLongTopic.setTopic("testTopic"); @@ -69,13 +59,6 @@ public class MessageDeserializerTest { inLongTopic.setInLongCluster(cacheZoneCluster); inLongTopic.setProperties(new HashMap<>()); - when(context.getConfig()).thenReturn(sortClientConfig); - when(context.getStatManager()).thenReturn(statManager); - SortClientStateCounter sortClientStateCounter = new SortClientStateCounter("sortTaskId", - cacheZoneCluster.getClusterId(), - inLongTopic.getTopic(), 0); - when(statManager.getStatistics(anyString(), anyString(), anyString())).thenReturn(sortClientStateCounter); - when(sortClientConfig.getSortTaskId()).thenReturn("sortTaskId"); } @Test diff --git a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImplTest.java b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImplTest.java index 8d3a4b5e3..88b1ed7b2 100644 --- a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImplTest.java +++ b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImplTest.java @@ -17,16 +17,14 @@ package org.apache.inlong.sdk.sort.impl.kafka; -import static org.mockito.ArgumentMatchers.anyString; -import static org.powermock.api.mockito.PowerMockito.when; - import org.apache.inlong.sdk.sort.api.ClientContext; -import org.apache.inlong.sdk.sort.api.SortClientConfig; +import org.apache.inlong.sdk.sort.api.TopicFetcher; import org.apache.inlong.sdk.sort.entity.CacheZoneCluster; import org.apache.inlong.sdk.sort.entity.InLongTopic; +import org.apache.inlong.sdk.sort.fetcher.kafka.KafkaSingleTopicFetcher; import org.apache.inlong.sdk.sort.impl.ClientContextImpl; -import org.apache.inlong.sdk.sort.stat.SortClientStateCounter; -import org.apache.inlong.sdk.sort.stat.StatManager; +import org.apache.inlong.sdk.sort.impl.decode.MessageDeserializer; +import org.apache.inlong.sdk.sort.interceptor.MsgTimeInterceptor; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -45,8 +43,7 @@ public class InLongKafkaFetcherImplTest { private ClientContext clientContext; private InLongTopic inLongTopic; - private SortClientConfig sortClientConfig; - private StatManager statManager; + private static final String TEST_BOOTSTRAP = "testBootstrap"; /** * setUp @@ -65,34 +62,26 @@ public class InLongKafkaFetcherImplTest { inLongTopic.setInLongCluster(cacheZoneCluster); clientContext = PowerMockito.mock(ClientContextImpl.class); - sortClientConfig = PowerMockito.mock(SortClientConfig.class); - statManager = PowerMockito.mock(StatManager.class); - - when(clientContext.getConfig()).thenReturn(sortClientConfig); - when(clientContext.getStatManager()).thenReturn(statManager); - SortClientStateCounter sortClientStateCounter = new SortClientStateCounter("sortTaskId", - cacheZoneCluster.getClusterId(), - inLongTopic.getTopic(), 0); - when(statManager.getStatistics(anyString(), anyString(), anyString())).thenReturn(sortClientStateCounter); - when(sortClientConfig.getSortTaskId()).thenReturn("sortTaskId"); - } @Test public void pause() { - InLongKafkaFetcherImpl inLongTopicFetcher = new InLongKafkaFetcherImpl(inLongTopic, clientContext); + TopicFetcher inLongTopicFetcher = new KafkaSingleTopicFetcher(inLongTopic, clientContext, + new MsgTimeInterceptor(), new MessageDeserializer(), TEST_BOOTSTRAP); inLongTopicFetcher.pause(); } @Test public void resume() { - InLongKafkaFetcherImpl inLongTopicFetcher = new InLongKafkaFetcherImpl(inLongTopic, clientContext); + TopicFetcher inLongTopicFetcher = new KafkaSingleTopicFetcher(inLongTopic, clientContext, + new MsgTimeInterceptor(), new MessageDeserializer(), TEST_BOOTSTRAP); inLongTopicFetcher.resume(); } @Test public void close() { - InLongKafkaFetcherImpl inLongTopicFetcher = new InLongKafkaFetcherImpl(inLongTopic, clientContext); + TopicFetcher inLongTopicFetcher = new KafkaSingleTopicFetcher(inLongTopic, clientContext, + new MsgTimeInterceptor(), new MessageDeserializer(), TEST_BOOTSTRAP); boolean close = inLongTopicFetcher.close(); Assert.assertTrue(close); } diff --git a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java index 935387f9d..202e8b468 100644 --- a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java +++ b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java @@ -27,13 +27,14 @@ import static org.powermock.api.mockito.PowerMockito.when; import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap; import org.apache.inlong.sdk.sort.api.ClientContext; -import org.apache.inlong.sdk.sort.api.InLongTopicFetcher; import org.apache.inlong.sdk.sort.api.SortClientConfig; +import org.apache.inlong.sdk.sort.api.TopicFetcher; import org.apache.inlong.sdk.sort.entity.CacheZoneCluster; import org.apache.inlong.sdk.sort.entity.InLongTopic; +import org.apache.inlong.sdk.sort.fetcher.pulsar.PulsarSingleTopicFetcher; import org.apache.inlong.sdk.sort.impl.ClientContextImpl; -import org.apache.inlong.sdk.sort.stat.SortClientStateCounter; -import org.apache.inlong.sdk.sort.stat.StatManager; +import org.apache.inlong.sdk.sort.impl.decode.MessageDeserializer; +import org.apache.inlong.sdk.sort.interceptor.MsgTimeInterceptor; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.MessageId; @@ -56,7 +57,6 @@ public class InLongPulsarFetcherImplTest { private ClientContext clientContext; private InLongTopic inLongTopic; private SortClientConfig sortClientConfig; - private StatManager statManager; /** * setUp @@ -76,52 +76,35 @@ public class InLongPulsarFetcherImplTest { clientContext = PowerMockito.mock(ClientContextImpl.class); sortClientConfig = PowerMockito.mock(SortClientConfig.class); - statManager = PowerMockito.mock(StatManager.class); when(clientContext.getConfig()).thenReturn(sortClientConfig); - when(clientContext.getStatManager()).thenReturn(statManager); - SortClientStateCounter sortClientStateCounter = new SortClientStateCounter("sortTaskId", - cacheZoneCluster.getClusterId(), - inLongTopic.getTopic(), 0); - when(statManager.getStatistics(anyString(), anyString(), anyString())).thenReturn(sortClientStateCounter); when(sortClientConfig.getSortTaskId()).thenReturn("sortTaskId"); } @Test public void stopConsume() { - InLongTopicFetcher inLongTopicFetcher = new InLongPulsarFetcherImpl(inLongTopic, clientContext); - boolean consumeStop = inLongTopicFetcher.isConsumeStop(); + TopicFetcher inLongTopicFetcher = new PulsarSingleTopicFetcher(inLongTopic, clientContext, + new MsgTimeInterceptor(), new MessageDeserializer(), null); + boolean consumeStop = inLongTopicFetcher.isStopConsume(); Assert.assertFalse(consumeStop); - inLongTopicFetcher.stopConsume(true); - consumeStop = inLongTopicFetcher.isConsumeStop(); + inLongTopicFetcher.setStopConsume(true); + consumeStop = inLongTopicFetcher.isStopConsume(); Assert.assertTrue(consumeStop); } @Test public void getInLongTopic() { - InLongTopicFetcher inLongTopicFetcher = new InLongPulsarFetcherImpl(inLongTopic, clientContext); - InLongTopic inLongTopic = inLongTopicFetcher.getInLongTopic(); + TopicFetcher inLongTopicFetcher = new PulsarSingleTopicFetcher(inLongTopic, clientContext, + new MsgTimeInterceptor(), new MessageDeserializer(), null); + InLongTopic inLongTopic = inLongTopicFetcher.getTopics().get(0); Assert.assertEquals(inLongTopic.getInLongCluster(), inLongTopic.getInLongCluster()); } - @Test - public void getConsumedDataSize() { - InLongTopicFetcher inLongTopicFetcher = new InLongPulsarFetcherImpl(inLongTopic, clientContext); - long consumedDataSize = inLongTopicFetcher.getConsumedDataSize(); - Assert.assertEquals(0L, consumedDataSize); - } - - @Test - public void getAckedOffset() { - InLongTopicFetcher inLongTopicFetcher = new InLongPulsarFetcherImpl(inLongTopic, clientContext); - long ackedOffset = inLongTopicFetcher.getAckedOffset(); - Assert.assertEquals(0L, ackedOffset); - } - @Test public void ack() { - InLongTopicFetcher inLongTopicFetcher = new InLongPulsarFetcherImpl(inLongTopic, clientContext); + TopicFetcher inLongTopicFetcher = new PulsarSingleTopicFetcher(inLongTopic, clientContext, + new MsgTimeInterceptor(), new MessageDeserializer(), null); MessageId messageId = PowerMockito.mock(MessageId.class); ConcurrentHashMap<String, MessageId> offsetCache = new ConcurrentHashMap<>(); offsetCache.put("test", messageId); @@ -137,10 +120,11 @@ public class InLongPulsarFetcherImplTest { @Test public void init() { - InLongTopicFetcher inLongTopicFetcher = new InLongPulsarFetcherImpl(inLongTopic, clientContext); PulsarClient pulsarClient = PowerMockito.mock(PulsarClient.class); ConsumerBuilder consumerBuilder = PowerMockito.mock(ConsumerBuilder.class); + TopicFetcher inLongTopicFetcher = new PulsarSingleTopicFetcher(inLongTopic, clientContext, + new MsgTimeInterceptor(), new MessageDeserializer(), pulsarClient); try { when(pulsarClient.newConsumer(any())).thenReturn(consumerBuilder); when(consumerBuilder.topic(anyString())).thenReturn(consumerBuilder); @@ -156,7 +140,7 @@ public class InLongPulsarFetcherImplTest { Consumer consumer = PowerMockito.mock(Consumer.class); when(consumerBuilder.subscribe()).thenReturn(consumer); doNothing().when(consumer).close(); - boolean init = inLongTopicFetcher.init(pulsarClient); + boolean init = inLongTopicFetcher.init(); inLongTopicFetcher.close(); Assert.assertTrue(init); } catch (Exception e) { @@ -166,19 +150,22 @@ public class InLongPulsarFetcherImplTest { @Test public void pause() { - InLongTopicFetcher inLongTopicFetcher = new InLongPulsarFetcherImpl(inLongTopic, clientContext); + TopicFetcher inLongTopicFetcher = new PulsarSingleTopicFetcher(inLongTopic, clientContext, + new MsgTimeInterceptor(), new MessageDeserializer(), null); inLongTopicFetcher.pause(); } @Test public void resume() { - InLongTopicFetcher inLongTopicFetcher = new InLongPulsarFetcherImpl(inLongTopic, clientContext); + TopicFetcher inLongTopicFetcher = new PulsarSingleTopicFetcher(inLongTopic, clientContext, + new MsgTimeInterceptor(), new MessageDeserializer(), null); inLongTopicFetcher.resume(); } @Test public void close() { - InLongTopicFetcher inLongTopicFetcher = new InLongPulsarFetcherImpl(inLongTopic, clientContext); + TopicFetcher inLongTopicFetcher = new PulsarSingleTopicFetcher(inLongTopic, clientContext, + new MsgTimeInterceptor(), new MessageDeserializer(), null); boolean close = inLongTopicFetcher.close(); Assert.assertTrue(close); diff --git a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImplTest.java b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImplTest.java index 3e35f43f6..dde55f53b 100644 --- a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImplTest.java +++ b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImplTest.java @@ -17,17 +17,15 @@ package org.apache.inlong.sdk.sort.impl.tube; -import static org.mockito.ArgumentMatchers.anyString; import static org.powermock.api.mockito.PowerMockito.when; import org.apache.inlong.sdk.sort.api.ClientContext; -import org.apache.inlong.sdk.sort.api.InLongTopicFetcher; import org.apache.inlong.sdk.sort.api.SortClientConfig; +import org.apache.inlong.sdk.sort.api.TopicFetcher; import org.apache.inlong.sdk.sort.entity.CacheZoneCluster; import org.apache.inlong.sdk.sort.entity.InLongTopic; +import org.apache.inlong.sdk.sort.fetcher.tube.TubeSingleTopicFetcher; import org.apache.inlong.sdk.sort.impl.ClientContextImpl; -import org.apache.inlong.sdk.sort.stat.SortClientStateCounter; -import org.apache.inlong.sdk.sort.stat.StatManager; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -40,7 +38,6 @@ public class InLongTubeFetcherImplTest { private ClientContext clientContext; private InLongTopic inLongTopic; private SortClientConfig sortClientConfig; - private StatManager statManager; /** * setUp @@ -60,33 +57,27 @@ public class InLongTubeFetcherImplTest { clientContext = PowerMockito.mock(ClientContextImpl.class); sortClientConfig = PowerMockito.mock(SortClientConfig.class); - statManager = PowerMockito.mock(StatManager.class); when(clientContext.getConfig()).thenReturn(sortClientConfig); - when(clientContext.getStatManager()).thenReturn(statManager); - SortClientStateCounter sortClientStateCounter = new SortClientStateCounter("sortTaskId", - cacheZoneCluster.getClusterId(), - inLongTopic.getTopic(), 0); - when(statManager.getStatistics(anyString(), anyString(), anyString())).thenReturn(sortClientStateCounter); when(sortClientConfig.getSortTaskId()).thenReturn("sortTaskId"); } @Test public void pause() { - InLongTubeFetcherImpl inLongTopicFetcher = new InLongTubeFetcherImpl(inLongTopic, clientContext); + TopicFetcher inLongTopicFetcher = new TubeSingleTopicFetcher(inLongTopic, clientContext, null, null, null); inLongTopicFetcher.pause(); } @Test public void resume() { - InLongTubeFetcherImpl inLongTopicFetcher = new InLongTubeFetcherImpl(inLongTopic, clientContext); + TopicFetcher inLongTopicFetcher = new TubeSingleTopicFetcher(inLongTopic, clientContext, null, null, null); inLongTopicFetcher.resume(); } @Test public void close() { - InLongTopicFetcher inLongTopicFetcher = new InLongTubeFetcherImpl(inLongTopic, clientContext); + TopicFetcher inLongTopicFetcher = new TubeSingleTopicFetcher(inLongTopic, clientContext, null, null, null); boolean close = inLongTopicFetcher.close(); Assert.assertTrue(close); diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java index f00a1a03b..b74ad9c6b 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java @@ -196,14 +196,14 @@ public final class SortSdkSource extends AbstractSource if (SortClusterConfigType.FILE.name().equalsIgnoreCase(configType)) { LOG.info("Create sort sdk client in file way:{}", configType); ClassResourceQueryConsumeConfig queryConfig = new ClassResourceQueryConsumeConfig(); - client = SortClientFactory.createSortClientV2(clientConfig, + client = SortClientFactory.createSortClient(clientConfig, queryConfig, new MetricReporterImpl(clientConfig), new ManagerReportHandlerImpl()); } else if (SortClusterConfigType.MANAGER.name().equalsIgnoreCase(configType)) { LOG.info("Create sort sdk client in manager way:{}", configType); clientConfig.setManagerApiUrl(ManagerUrlHandler.getSortSourceConfigUrl()); - client = SortClientFactory.createSortClientV2(clientConfig); + client = SortClientFactory.createSortClient(clientConfig); } else { LOG.info("Create sort sdk client in custom way:{}", configType); // user-defined @@ -218,7 +218,7 @@ public final class SortSdkSource extends AbstractSource return null; } // if it specifies the type of QueryConsumeConfig. - client = SortClientFactory.createSortClientV2(clientConfig, + client = SortClientFactory.createSortClient(clientConfig, (QueryConsumeConfig) loaderObject, new MetricReporterImpl(clientConfig), new ManagerReportHandlerImpl());