This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d36b2338c [INLONG-11745][SDK] Clean up HttpProxySender and related 
implementations (#11748)
9d36b2338c is described below

commit 9d36b2338c1b7b3f151582eb447d9b6af2abbfe7
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Tue Feb 11 16:19:19 2025 +0800

    [INLONG-11745][SDK] Clean up HttpProxySender and related implementations 
(#11748)
    
    * [INLONG-11745][SDK] Clean up HttpProxySender and related implementations
    
    * [INLONG-11745][SDK] Clean up HttpProxySender and related implementations
    
    ---------
    
    Co-authored-by: gosonzhang <gosonzh...@tencent.com>
---
 .../sdk/dataproxy/common/SendMessageCallback.java  |  32 ---
 .../sdk/dataproxy/http/InternalHttpSender.java     | 255 ---------------------
 .../inlong/sdk/dataproxy/network/HttpMessage.java  |  76 ------
 .../sdk/dataproxy/network/HttpProxySender.java     | 229 ------------------
 .../sdk/dataproxy/utils/ConcurrentHashSet.java     |  45 ----
 .../sdk/dataproxy/utils/ConsistencyHashUtil.java   |  37 ---
 .../inlong/sdk/dataproxy/utils/MapBackedSet.java   |  74 ------
 7 files changed, 748 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java
deleted file mode 100644
index 48ce607037..0000000000
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.common;
-
-@Deprecated
-/**
- * Replace by MsgSendCallback
- *
- */
-public interface SendMessageCallback {
-
-    /* Invoked when a message is confirmed by TDBus. */
-    void onMessageAck(SendResult result);
-
-    /* Invoked when a message transportation interrupted by an exception. */
-    void onException(Throwable e);
-}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
deleted file mode 100644
index 382ba58ad8..0000000000
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.http;
-
-import org.apache.inlong.common.enums.DataProxyErrCode;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
-import org.apache.inlong.sdk.dataproxy.config.HostInfo;
-import org.apache.inlong.sdk.dataproxy.network.HttpMessage;
-import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
-import org.apache.inlong.sdk.dataproxy.utils.ConcurrentHashSet;
-import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
-
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpHeaders;
-import org.apache.http.HttpStatus;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.utils.URLEncodedUtils;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.message.BasicNameValuePair;
-import org.apache.http.util.EntityUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-@Deprecated
-/**
- * Replace by InLongHttpMsgSender
- */
-public class InternalHttpSender {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(InternalHttpSender.class);
-
-    private final HttpMsgSenderConfig httpConfig;
-    private final ConcurrentHashSet<HostInfo> hostList;
-
-    private final LinkedBlockingQueue<HttpMessage> messageCache;
-    private final ExecutorService workerServices = 
Executors.newCachedThreadPool();
-    private CloseableHttpClient httpClient;
-    private boolean bShutDown = false;
-
-    public InternalHttpSender(HttpMsgSenderConfig httpConfig,
-            ConcurrentHashSet<HostInfo> hostList, 
LinkedBlockingQueue<HttpMessage> messageCache) {
-        this.httpConfig = httpConfig;
-        this.hostList = hostList;
-        this.messageCache = messageCache;
-        submitWorkThread();
-    }
-
-    private void submitWorkThread() {
-        for (int i = 0; i < httpConfig.getHttpAsyncRptWorkerNum(); i++) {
-            workerServices.execute(new WorkerRunner());
-        }
-    }
-
-    /**
-     * construct header
-     */
-    private ArrayList<BasicNameValuePair> getHeaders(List<String> bodies,
-            String groupId, String streamId, long dt) {
-        ArrayList<BasicNameValuePair> params = new ArrayList<>();
-        params.add(new BasicNameValuePair("groupId", groupId));
-        params.add(new BasicNameValuePair("streamId", streamId));
-        params.add(new BasicNameValuePair("dt", String.valueOf(dt)));
-        params.add(new BasicNameValuePair("body", StringUtils.join(bodies, 
"\n")));
-        params.add(new BasicNameValuePair("cnt", 
String.valueOf(bodies.size())));
-
-        return params;
-    }
-
-    /**
-     * http client
-     */
-    private synchronized CloseableHttpClient constructHttpClient(long timeout, 
TimeUnit timeUnit) {
-        if (httpClient != null) {
-            return httpClient;
-        }
-        long timeoutInMs = timeUnit.toMillis(timeout);
-        RequestConfig requestConfig = RequestConfig.custom()
-                .setConnectTimeout((int) timeoutInMs)
-                .setSocketTimeout((int) timeoutInMs).build();
-        HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
-        httpClientBuilder.setDefaultRequestConfig(requestConfig);
-        return httpClientBuilder.build();
-    }
-
-    /**
-     * check cache runner
-     */
-    private class WorkerRunner implements Runnable {
-
-        @Override
-        public void run() {
-            // if not shutdown or queue is not empty
-            while (!bShutDown || !messageCache.isEmpty()) {
-                try {
-                    while (!messageCache.isEmpty()) {
-                        HttpMessage httpMessage = messageCache.poll();
-                        if (httpMessage != null) {
-                            SendResult result = sendMessageWithHostInfo(
-                                    httpMessage.getBodies(), 
httpMessage.getGroupId(),
-                                    httpMessage.getStreamId(), 
httpMessage.getDt(),
-                                    httpMessage.getTimeout(), 
httpMessage.getTimeUnit());
-                            httpMessage.getCallback().onMessageAck(result);
-                        }
-                    }
-                    
TimeUnit.MILLISECONDS.sleep(httpConfig.getHttpAsyncWorkerIdleWaitMs());
-                } catch (Exception exception) {
-                    logger.error("exception caught", exception);
-                }
-            }
-        }
-    }
-
-    /**
-     * get random ip
-     *
-     * @return list of host info
-     */
-    public List<HostInfo> getRandomHostInfo() {
-        List<HostInfo> tmpHostList = new ArrayList<>(hostList);
-        Collections.shuffle(tmpHostList);
-        // respect alive connection
-        int maxIndex = Math.min(httpConfig.getAliveConnections(), 
tmpHostList.size());
-        return tmpHostList.subList(0, maxIndex);
-    }
-
-    /**
-     * send request by http
-     */
-    private SendResult sendByHttp(List<String> bodies, String groupId, String 
streamId, long dt,
-            long timeout, TimeUnit timeUnit, HostInfo hostInfo) throws 
Exception {
-        HttpPost httpPost = null;
-        CloseableHttpResponse response = null;
-        try {
-            if (httpClient == null) {
-                httpClient = constructHttpClient(timeout, timeUnit);
-            }
-
-            String url = "http://"; + hostInfo.getHostName() + ":" + 
hostInfo.getPortNumber() + "/dataproxy/message";
-            httpPost = new HttpPost(url);
-            httpPost.setHeader(HttpHeaders.CONNECTION, "close");
-            httpPost.setHeader(HttpHeaders.CONTENT_TYPE, 
"application/x-www-form-urlencoded");
-            ArrayList<BasicNameValuePair> contents = getHeaders(bodies, 
groupId, streamId, dt);
-            String encodedContents = URLEncodedUtils.format(contents, 
StandardCharsets.UTF_8);
-            httpPost.setEntity(new StringEntity(encodedContents));
-
-            logger.info("begin to post request to {}, encoded content is: {}", 
url, encodedContents);
-            response = httpClient.execute(httpPost);
-
-            String returnStr = EntityUtils.toString(response.getEntity());
-            int returnCode = response.getStatusLine().getStatusCode();
-            if (StringUtils.isBlank(returnStr) || HttpStatus.SC_OK != 
returnCode) {
-                throw new Exception("get config from manager failed, result: " 
+ returnStr + ", code: " + returnCode);
-            }
-
-            logger.debug("success to get config from manager, result str: " + 
returnStr);
-            JsonObject jsonResponse = 
JsonParser.parseString(returnStr).getAsJsonObject();
-            JsonElement codeElement = jsonResponse.get("code");
-            if (codeElement != null) {
-                if (DataProxyErrCode.SUCCESS.getErrCode() == 
codeElement.getAsInt()) {
-                    return SendResult.OK;
-                } else {
-                    return SendResult.INVALID_DATA;
-                }
-            }
-        } finally {
-            if (httpPost != null) {
-                httpPost.releaseConnection();
-            }
-            if (response != null) {
-                response.close();
-            }
-        }
-
-        return SendResult.UNKOWN_ERROR;
-    }
-
-    /**
-     * send message with host info
-     */
-    public SendResult sendMessageWithHostInfo(List<String> bodies, String 
groupId, String streamId, long dt,
-            long timeout, TimeUnit timeUnit) {
-
-        List<HostInfo> randomHostList = getRandomHostInfo();
-        Exception tmpException = null;
-        for (HostInfo hostInfo : randomHostList) {
-            try {
-                return sendByHttp(bodies, groupId, streamId, dt, timeout, 
timeUnit, hostInfo);
-            } catch (Exception exception) {
-                tmpException = exception;
-                logger.debug("error while sending data, resending it", 
exception);
-            }
-        }
-        if (tmpException != null) {
-            logger.error("error while sending data", tmpException);
-        }
-        return SendResult.UNKOWN_ERROR;
-    }
-
-    /**
-     * close resources
-     */
-    public void close() throws Exception {
-        bShutDown = true;
-        if (!messageCache.isEmpty()) {
-            if (httpConfig.isDiscardHttpCacheWhenClosing()) {
-                messageCache.clear();
-            } else {
-                long curTime = System.currentTimeMillis();
-                while (!messageCache.isEmpty()
-                        || (System.currentTimeMillis() - curTime) < 
httpConfig.getHttpCloseWaitPeriodMs()) {
-                    ProxyUtils.sleepSomeTime(100);
-                }
-                if (!messageCache.isEmpty()) {
-                    logger.warn("Close httpClient, remain {} messages", 
messageCache.size());
-                    messageCache.clear();
-                }
-            }
-        }
-        if (httpClient != null) {
-            httpClient.close();
-        }
-        workerServices.shutdown();
-    }
-}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpMessage.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpMessage.java
deleted file mode 100644
index da3bbf45e2..0000000000
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpMessage.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.network;
-
-import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-/**
- * http message for cache.
- */
-public class HttpMessage {
-
-    private final String groupId;
-    private final String streamId;
-    private final List<String> bodies;
-    private final SendMessageCallback callback;
-    private final long dt;
-    private final long timeout;
-    private final TimeUnit timeUnit;
-
-    public HttpMessage(List<String> bodies, String groupId, String streamId, 
long dt,
-            long timeout, TimeUnit timeUnit, SendMessageCallback callback) {
-        this.groupId = groupId;
-        this.streamId = streamId;
-        this.bodies = bodies;
-        this.callback = callback;
-        this.dt = dt;
-        this.timeout = timeout;
-        this.timeUnit = timeUnit;
-    }
-
-    public String getGroupId() {
-        return groupId;
-    }
-
-    public String getStreamId() {
-        return streamId;
-    }
-
-    public List<String> getBodies() {
-        return bodies;
-    }
-
-    public SendMessageCallback getCallback() {
-        return callback;
-    }
-
-    public long getDt() {
-        return dt;
-    }
-
-    public long getTimeout() {
-        return timeout;
-    }
-
-    public TimeUnit getTimeUnit() {
-        return timeUnit;
-    }
-}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
deleted file mode 100644
index f4bcde915f..0000000000
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.network;
-
-import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
-import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
-import org.apache.inlong.sdk.dataproxy.config.HostInfo;
-import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
-import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
-import org.apache.inlong.sdk.dataproxy.http.InternalHttpSender;
-import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
-import org.apache.inlong.sdk.dataproxy.utils.ConcurrentHashSet;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-
-@Deprecated
-/**
- * http sender
- * Replace by InLongHttpMsgSender
- */
-public class HttpProxySender extends Thread {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(HttpProxySender.class);
-
-    private final ConcurrentHashSet<HostInfo> hostList = new 
ConcurrentHashSet<>();
-
-    private final HttpMsgSenderConfig proxyClientConfig;
-    private ProxyConfigManager proxyConfigManager;
-
-    private boolean bShutDown = false;
-
-    private final InternalHttpSender internalHttpSender;
-    private final LinkedBlockingQueue<HttpMessage> messageCache;
-
-    public HttpProxySender(HttpMsgSenderConfig httpConfig) throws Exception {
-        logger.info("Initial http sender, configure is {}", httpConfig);
-        this.proxyClientConfig = httpConfig;
-        initTDMClientAndRequest(httpConfig);
-        this.messageCache = new 
LinkedBlockingQueue<>(httpConfig.getHttpAsyncRptCacheSize());
-        internalHttpSender = new InternalHttpSender(httpConfig, hostList, 
messageCache);
-    }
-
-    /**
-     * get proxy list
-     *
-     * @param httpConfig
-     * @throws Exception
-     */
-    private void initTDMClientAndRequest(HttpMsgSenderConfig httpConfig) 
throws Exception {
-
-        try {
-            proxyConfigManager = new ProxyConfigManager(httpConfig);
-            ProxyConfigEntry proxyConfigEntry = retryGettingProxyConfig();
-            hostList.addAll(proxyConfigEntry.getHostMap().values());
-
-            this.setDaemon(true);
-            this.start();
-        } catch (Throwable e) {
-            if (httpConfig.isOnlyUseLocalProxyConfig()) {
-                throw new Exception("Get local proxy configure failure! e = 
{}", e);
-            } else {
-                throw new Exception("Visit TDManager error! e = {}", e);
-            }
-        }
-        logger.info("http proxy sender starts");
-    }
-
-    /**
-     * retry fetching proxy config in case of network issue.
-     *
-     * @return proxy config entry.
-     */
-    private ProxyConfigEntry retryGettingProxyConfig() {
-        ProcessResult procResult = new ProcessResult();
-        if (proxyConfigManager.getGroupIdConfigure(true, procResult)) {
-            return (ProxyConfigEntry) procResult.getRetData();
-        }
-        return null;
-    }
-
-    /**
-     * get proxy list
-     */
-    @Override
-    public void run() {
-        ProcessResult procResult = new ProcessResult();
-        while (!bShutDown) {
-            try {
-                int rand = ThreadLocalRandom.current().nextInt(0, 600);
-                long randSleepTime = proxyClientConfig.getMgrMetaSyncInrMs() + 
rand;
-                TimeUnit.MILLISECONDS.sleep(randSleepTime);
-                if (proxyConfigManager != null) {
-                    if (!proxyConfigManager.getGroupIdConfigure(false, 
procResult)) {
-                        throw new Exception(procResult.toString());
-                    }
-                    ProxyConfigEntry configEntry = (ProxyConfigEntry) 
procResult.getRetData();
-                    hostList.addAll(configEntry.getHostMap().values());
-                    hostList.retainAll(configEntry.getHostMap().values());
-                } else {
-                    logger.error("manager is null, please check it!");
-                }
-                logger.info("get new proxy list " + hostList.toString());
-            } catch (InterruptedException ignored) {
-                // ignore it.
-            } catch (Exception ex) {
-                logger.error("managerFetcher get or save managerIpList occur 
error,", ex);
-            }
-        }
-    }
-
-    /**
-     * send by http
-     *
-     * @param body
-     * @param groupId
-     * @param streamId
-     * @param dt
-     * @param timeout
-     * @param timeUnit
-     * @return
-     */
-    public SendResult sendMessage(String body, String groupId, String 
streamId, long dt,
-            long timeout, TimeUnit timeUnit) {
-        return sendMessage(Collections.singletonList(body), groupId, streamId, 
dt, timeout, timeUnit);
-    }
-
-    /**
-     * send multiple messages.
-     *
-     * @param bodies   list of bodies
-     * @param groupId
-     * @param streamId
-     * @param dt
-     * @param timeout
-     * @param timeUnit
-     * @return
-     */
-    public SendResult sendMessage(List<String> bodies, String groupId, String 
streamId, long dt,
-            long timeout, TimeUnit timeUnit) {
-        if (hostList.isEmpty()) {
-            logger.error("proxy list is empty, maybe client has been "
-                    + "closed or groupId is not assigned with proxy list");
-            return SendResult.NO_CONNECTION;
-        }
-        return internalHttpSender.sendMessageWithHostInfo(
-                bodies, groupId, streamId, dt, timeout, timeUnit);
-
-    }
-
-    /**
-     * async sender
-     *
-     * @param bodies
-     * @param groupId
-     * @param streamId
-     * @param dt
-     * @param timeout
-     * @param timeUnit
-     * @param callback
-     */
-    public void asyncSendMessage(List<String> bodies, String groupId, String 
streamId, long dt,
-            long timeout, TimeUnit timeUnit, SendMessageCallback callback) {
-        List<String> bodyList = new ArrayList<>(bodies);
-        HttpMessage httpMessage = new HttpMessage(bodyList, groupId, streamId, 
dt,
-                timeout, timeUnit, callback);
-        try {
-            if (!messageCache.offer(httpMessage)) {
-                callback.onMessageAck(SendResult.ASYNC_CALLBACK_BUFFER_FULL);
-            }
-        } catch (Exception exception) {
-            logger.error("error async sending data", exception);
-        }
-    }
-
-    /**
-     * async send single message.
-     *
-     * @param body
-     * @param groupId
-     * @param streamId
-     * @param dt
-     * @param timeout
-     * @param timeUnit
-     * @param callback
-     */
-    public void asyncSendMessage(String body, String groupId, String streamId, 
long dt,
-            long timeout, TimeUnit timeUnit, SendMessageCallback callback) {
-        asyncSendMessage(Collections.singletonList(body), groupId, streamId,
-                dt, timeout, timeUnit, callback);
-    }
-
-    /**
-     * close
-     */
-    public void close() {
-        hostList.clear();
-        bShutDown = true;
-        try {
-            this.interrupt();
-            internalHttpSender.close();
-        } catch (Exception exception) {
-            logger.error("error while closing http client", exception);
-        }
-    }
-}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ConcurrentHashSet.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ConcurrentHashSet.java
deleted file mode 100644
index 574450263a..0000000000
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ConcurrentHashSet.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.utils;
-
-import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * A {@link ConcurrentHashMap}-backed {@link Set}.
- */
-public class ConcurrentHashSet<E> extends MapBackedSet<E> {
-
-    private static final long serialVersionUID = 8518578988740277828L;
-
-    public ConcurrentHashSet() {
-        super(new ConcurrentHashMap<E, Boolean>());
-    }
-
-    public ConcurrentHashSet(Collection<E> c) {
-        super(new ConcurrentHashMap<E, Boolean>(), c);
-    }
-
-    @Override
-    public boolean add(E o) {
-        Boolean answer = ((ConcurrentMap<E, Boolean>) map).putIfAbsent(o, 
Boolean.TRUE);
-        return answer == null;
-    }
-}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ConsistencyHashUtil.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ConsistencyHashUtil.java
deleted file mode 100644
index d352aa6d89..0000000000
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ConsistencyHashUtil.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.utils;
-
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Random;
-
-public class ConsistencyHashUtil {
-
-    public static String hashMurMurHash(String key, int seed) {
-        HashFunction hashFunction = Hashing.murmur3_128(seed);
-        return hashFunction.hashString(key, StandardCharsets.UTF_8).toString();
-    }
-
-    public static String hashMurMurHash(String key) {
-        Random random = new Random(System.currentTimeMillis());
-        return hashMurMurHash(key, random.nextInt());
-    }
-}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/MapBackedSet.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/MapBackedSet.java
deleted file mode 100644
index 2b1d7a31ef..0000000000
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/MapBackedSet.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.utils;
-
-import java.io.Serializable;
-import java.util.AbstractSet;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * A {@link Map}-backed {@link Set}.
- */
-public class MapBackedSet<E> extends AbstractSet<E> implements Serializable {
-
-    private static final long serialVersionUID = -8347878570391674042L;
-
-    protected final Map<E, Boolean> map;
-
-    public MapBackedSet(Map<E, Boolean> map) {
-        this.map = map;
-    }
-
-    public MapBackedSet(Map<E, Boolean> map, Collection<E> c) {
-        this.map = map;
-        addAll(c);
-    }
-
-    @Override
-    public int size() {
-        return map.size();
-    }
-
-    @Override
-    public boolean contains(Object o) {
-        return map.containsKey(o);
-    }
-
-    @Override
-    public Iterator<E> iterator() {
-        return map.keySet().iterator();
-    }
-
-    @Override
-    public boolean add(E o) {
-        return map.put(o, Boolean.TRUE) == null;
-    }
-
-    @Override
-    public boolean remove(Object o) {
-        return map.remove(o) != null;
-    }
-
-    @Override
-    public void clear() {
-        map.clear();
-    }
-}

Reply via email to