This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 9d36b2338c [INLONG-11745][SDK] Clean up HttpProxySender and related implementations (#11748) 9d36b2338c is described below commit 9d36b2338c1b7b3f151582eb447d9b6af2abbfe7 Author: Goson Zhang <4675...@qq.com> AuthorDate: Tue Feb 11 16:19:19 2025 +0800 [INLONG-11745][SDK] Clean up HttpProxySender and related implementations (#11748) * [INLONG-11745][SDK] Clean up HttpProxySender and related implementations * [INLONG-11745][SDK] Clean up HttpProxySender and related implementations --------- Co-authored-by: gosonzhang <gosonzh...@tencent.com> --- .../sdk/dataproxy/common/SendMessageCallback.java | 32 --- .../sdk/dataproxy/http/InternalHttpSender.java | 255 --------------------- .../inlong/sdk/dataproxy/network/HttpMessage.java | 76 ------ .../sdk/dataproxy/network/HttpProxySender.java | 229 ------------------ .../sdk/dataproxy/utils/ConcurrentHashSet.java | 45 ---- .../sdk/dataproxy/utils/ConsistencyHashUtil.java | 37 --- .../inlong/sdk/dataproxy/utils/MapBackedSet.java | 74 ------ 7 files changed, 748 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java deleted file mode 100644 index 48ce607037..0000000000 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dataproxy.common; - -@Deprecated -/** - * Replace by MsgSendCallback - * - */ -public interface SendMessageCallback { - - /* Invoked when a message is confirmed by TDBus. */ - void onMessageAck(SendResult result); - - /* Invoked when a message transportation interrupted by an exception. */ - void onException(Throwable e); -} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java deleted file mode 100644 index 382ba58ad8..0000000000 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java +++ /dev/null @@ -1,255 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dataproxy.http; - -import org.apache.inlong.common.enums.DataProxyErrCode; -import org.apache.inlong.sdk.dataproxy.common.SendResult; -import org.apache.inlong.sdk.dataproxy.config.HostInfo; -import org.apache.inlong.sdk.dataproxy.network.HttpMessage; -import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig; -import org.apache.inlong.sdk.dataproxy.utils.ConcurrentHashSet; -import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils; - -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import org.apache.commons.lang3.StringUtils; -import org.apache.http.HttpHeaders; -import org.apache.http.HttpStatus; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.client.utils.URLEncodedUtils; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClientBuilder; -import org.apache.http.message.BasicNameValuePair; -import org.apache.http.util.EntityUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -@Deprecated -/** - * Replace by InLongHttpMsgSender - */ -public class InternalHttpSender { - - private static final Logger logger = LoggerFactory.getLogger(InternalHttpSender.class); - - private final HttpMsgSenderConfig httpConfig; - private final ConcurrentHashSet<HostInfo> hostList; - - private final LinkedBlockingQueue<HttpMessage> messageCache; - private final ExecutorService workerServices = Executors.newCachedThreadPool(); - private CloseableHttpClient httpClient; - private boolean bShutDown = false; - - public InternalHttpSender(HttpMsgSenderConfig httpConfig, - ConcurrentHashSet<HostInfo> hostList, LinkedBlockingQueue<HttpMessage> messageCache) { - this.httpConfig = httpConfig; - this.hostList = hostList; - this.messageCache = messageCache; - submitWorkThread(); - } - - private void submitWorkThread() { - for (int i = 0; i < httpConfig.getHttpAsyncRptWorkerNum(); i++) { - workerServices.execute(new WorkerRunner()); - } - } - - /** - * construct header - */ - private ArrayList<BasicNameValuePair> getHeaders(List<String> bodies, - String groupId, String streamId, long dt) { - ArrayList<BasicNameValuePair> params = new ArrayList<>(); - params.add(new BasicNameValuePair("groupId", groupId)); - params.add(new BasicNameValuePair("streamId", streamId)); - params.add(new BasicNameValuePair("dt", String.valueOf(dt))); - params.add(new BasicNameValuePair("body", StringUtils.join(bodies, "\n"))); - params.add(new BasicNameValuePair("cnt", String.valueOf(bodies.size()))); - - return params; - } - - /** - * http client - */ - private synchronized CloseableHttpClient constructHttpClient(long timeout, TimeUnit timeUnit) { - if (httpClient != null) { - return httpClient; - } - long timeoutInMs = timeUnit.toMillis(timeout); - RequestConfig requestConfig = RequestConfig.custom() - .setConnectTimeout((int) timeoutInMs) - .setSocketTimeout((int) timeoutInMs).build(); - HttpClientBuilder httpClientBuilder = HttpClientBuilder.create(); - httpClientBuilder.setDefaultRequestConfig(requestConfig); - return httpClientBuilder.build(); - } - - /** - * check cache runner - */ - private class WorkerRunner implements Runnable { - - @Override - public void run() { - // if not shutdown or queue is not empty - while (!bShutDown || !messageCache.isEmpty()) { - try { - while (!messageCache.isEmpty()) { - HttpMessage httpMessage = messageCache.poll(); - if (httpMessage != null) { - SendResult result = sendMessageWithHostInfo( - httpMessage.getBodies(), httpMessage.getGroupId(), - httpMessage.getStreamId(), httpMessage.getDt(), - httpMessage.getTimeout(), httpMessage.getTimeUnit()); - httpMessage.getCallback().onMessageAck(result); - } - } - TimeUnit.MILLISECONDS.sleep(httpConfig.getHttpAsyncWorkerIdleWaitMs()); - } catch (Exception exception) { - logger.error("exception caught", exception); - } - } - } - } - - /** - * get random ip - * - * @return list of host info - */ - public List<HostInfo> getRandomHostInfo() { - List<HostInfo> tmpHostList = new ArrayList<>(hostList); - Collections.shuffle(tmpHostList); - // respect alive connection - int maxIndex = Math.min(httpConfig.getAliveConnections(), tmpHostList.size()); - return tmpHostList.subList(0, maxIndex); - } - - /** - * send request by http - */ - private SendResult sendByHttp(List<String> bodies, String groupId, String streamId, long dt, - long timeout, TimeUnit timeUnit, HostInfo hostInfo) throws Exception { - HttpPost httpPost = null; - CloseableHttpResponse response = null; - try { - if (httpClient == null) { - httpClient = constructHttpClient(timeout, timeUnit); - } - - String url = "http://" + hostInfo.getHostName() + ":" + hostInfo.getPortNumber() + "/dataproxy/message"; - httpPost = new HttpPost(url); - httpPost.setHeader(HttpHeaders.CONNECTION, "close"); - httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded"); - ArrayList<BasicNameValuePair> contents = getHeaders(bodies, groupId, streamId, dt); - String encodedContents = URLEncodedUtils.format(contents, StandardCharsets.UTF_8); - httpPost.setEntity(new StringEntity(encodedContents)); - - logger.info("begin to post request to {}, encoded content is: {}", url, encodedContents); - response = httpClient.execute(httpPost); - - String returnStr = EntityUtils.toString(response.getEntity()); - int returnCode = response.getStatusLine().getStatusCode(); - if (StringUtils.isBlank(returnStr) || HttpStatus.SC_OK != returnCode) { - throw new Exception("get config from manager failed, result: " + returnStr + ", code: " + returnCode); - } - - logger.debug("success to get config from manager, result str: " + returnStr); - JsonObject jsonResponse = JsonParser.parseString(returnStr).getAsJsonObject(); - JsonElement codeElement = jsonResponse.get("code"); - if (codeElement != null) { - if (DataProxyErrCode.SUCCESS.getErrCode() == codeElement.getAsInt()) { - return SendResult.OK; - } else { - return SendResult.INVALID_DATA; - } - } - } finally { - if (httpPost != null) { - httpPost.releaseConnection(); - } - if (response != null) { - response.close(); - } - } - - return SendResult.UNKOWN_ERROR; - } - - /** - * send message with host info - */ - public SendResult sendMessageWithHostInfo(List<String> bodies, String groupId, String streamId, long dt, - long timeout, TimeUnit timeUnit) { - - List<HostInfo> randomHostList = getRandomHostInfo(); - Exception tmpException = null; - for (HostInfo hostInfo : randomHostList) { - try { - return sendByHttp(bodies, groupId, streamId, dt, timeout, timeUnit, hostInfo); - } catch (Exception exception) { - tmpException = exception; - logger.debug("error while sending data, resending it", exception); - } - } - if (tmpException != null) { - logger.error("error while sending data", tmpException); - } - return SendResult.UNKOWN_ERROR; - } - - /** - * close resources - */ - public void close() throws Exception { - bShutDown = true; - if (!messageCache.isEmpty()) { - if (httpConfig.isDiscardHttpCacheWhenClosing()) { - messageCache.clear(); - } else { - long curTime = System.currentTimeMillis(); - while (!messageCache.isEmpty() - || (System.currentTimeMillis() - curTime) < httpConfig.getHttpCloseWaitPeriodMs()) { - ProxyUtils.sleepSomeTime(100); - } - if (!messageCache.isEmpty()) { - logger.warn("Close httpClient, remain {} messages", messageCache.size()); - messageCache.clear(); - } - } - } - if (httpClient != null) { - httpClient.close(); - } - workerServices.shutdown(); - } -} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpMessage.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpMessage.java deleted file mode 100644 index da3bbf45e2..0000000000 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpMessage.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dataproxy.network; - -import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback; - -import java.util.List; -import java.util.concurrent.TimeUnit; - -/** - * http message for cache. - */ -public class HttpMessage { - - private final String groupId; - private final String streamId; - private final List<String> bodies; - private final SendMessageCallback callback; - private final long dt; - private final long timeout; - private final TimeUnit timeUnit; - - public HttpMessage(List<String> bodies, String groupId, String streamId, long dt, - long timeout, TimeUnit timeUnit, SendMessageCallback callback) { - this.groupId = groupId; - this.streamId = streamId; - this.bodies = bodies; - this.callback = callback; - this.dt = dt; - this.timeout = timeout; - this.timeUnit = timeUnit; - } - - public String getGroupId() { - return groupId; - } - - public String getStreamId() { - return streamId; - } - - public List<String> getBodies() { - return bodies; - } - - public SendMessageCallback getCallback() { - return callback; - } - - public long getDt() { - return dt; - } - - public long getTimeout() { - return timeout; - } - - public TimeUnit getTimeUnit() { - return timeUnit; - } -} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java deleted file mode 100644 index f4bcde915f..0000000000 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dataproxy.network; - -import org.apache.inlong.sdk.dataproxy.common.ProcessResult; -import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback; -import org.apache.inlong.sdk.dataproxy.common.SendResult; -import org.apache.inlong.sdk.dataproxy.config.HostInfo; -import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry; -import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager; -import org.apache.inlong.sdk.dataproxy.http.InternalHttpSender; -import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig; -import org.apache.inlong.sdk.dataproxy.utils.ConcurrentHashSet; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; - -@Deprecated -/** - * http sender - * Replace by InLongHttpMsgSender - */ -public class HttpProxySender extends Thread { - - private static final Logger logger = LoggerFactory.getLogger(HttpProxySender.class); - - private final ConcurrentHashSet<HostInfo> hostList = new ConcurrentHashSet<>(); - - private final HttpMsgSenderConfig proxyClientConfig; - private ProxyConfigManager proxyConfigManager; - - private boolean bShutDown = false; - - private final InternalHttpSender internalHttpSender; - private final LinkedBlockingQueue<HttpMessage> messageCache; - - public HttpProxySender(HttpMsgSenderConfig httpConfig) throws Exception { - logger.info("Initial http sender, configure is {}", httpConfig); - this.proxyClientConfig = httpConfig; - initTDMClientAndRequest(httpConfig); - this.messageCache = new LinkedBlockingQueue<>(httpConfig.getHttpAsyncRptCacheSize()); - internalHttpSender = new InternalHttpSender(httpConfig, hostList, messageCache); - } - - /** - * get proxy list - * - * @param httpConfig - * @throws Exception - */ - private void initTDMClientAndRequest(HttpMsgSenderConfig httpConfig) throws Exception { - - try { - proxyConfigManager = new ProxyConfigManager(httpConfig); - ProxyConfigEntry proxyConfigEntry = retryGettingProxyConfig(); - hostList.addAll(proxyConfigEntry.getHostMap().values()); - - this.setDaemon(true); - this.start(); - } catch (Throwable e) { - if (httpConfig.isOnlyUseLocalProxyConfig()) { - throw new Exception("Get local proxy configure failure! e = {}", e); - } else { - throw new Exception("Visit TDManager error! e = {}", e); - } - } - logger.info("http proxy sender starts"); - } - - /** - * retry fetching proxy config in case of network issue. - * - * @return proxy config entry. - */ - private ProxyConfigEntry retryGettingProxyConfig() { - ProcessResult procResult = new ProcessResult(); - if (proxyConfigManager.getGroupIdConfigure(true, procResult)) { - return (ProxyConfigEntry) procResult.getRetData(); - } - return null; - } - - /** - * get proxy list - */ - @Override - public void run() { - ProcessResult procResult = new ProcessResult(); - while (!bShutDown) { - try { - int rand = ThreadLocalRandom.current().nextInt(0, 600); - long randSleepTime = proxyClientConfig.getMgrMetaSyncInrMs() + rand; - TimeUnit.MILLISECONDS.sleep(randSleepTime); - if (proxyConfigManager != null) { - if (!proxyConfigManager.getGroupIdConfigure(false, procResult)) { - throw new Exception(procResult.toString()); - } - ProxyConfigEntry configEntry = (ProxyConfigEntry) procResult.getRetData(); - hostList.addAll(configEntry.getHostMap().values()); - hostList.retainAll(configEntry.getHostMap().values()); - } else { - logger.error("manager is null, please check it!"); - } - logger.info("get new proxy list " + hostList.toString()); - } catch (InterruptedException ignored) { - // ignore it. - } catch (Exception ex) { - logger.error("managerFetcher get or save managerIpList occur error,", ex); - } - } - } - - /** - * send by http - * - * @param body - * @param groupId - * @param streamId - * @param dt - * @param timeout - * @param timeUnit - * @return - */ - public SendResult sendMessage(String body, String groupId, String streamId, long dt, - long timeout, TimeUnit timeUnit) { - return sendMessage(Collections.singletonList(body), groupId, streamId, dt, timeout, timeUnit); - } - - /** - * send multiple messages. - * - * @param bodies list of bodies - * @param groupId - * @param streamId - * @param dt - * @param timeout - * @param timeUnit - * @return - */ - public SendResult sendMessage(List<String> bodies, String groupId, String streamId, long dt, - long timeout, TimeUnit timeUnit) { - if (hostList.isEmpty()) { - logger.error("proxy list is empty, maybe client has been " - + "closed or groupId is not assigned with proxy list"); - return SendResult.NO_CONNECTION; - } - return internalHttpSender.sendMessageWithHostInfo( - bodies, groupId, streamId, dt, timeout, timeUnit); - - } - - /** - * async sender - * - * @param bodies - * @param groupId - * @param streamId - * @param dt - * @param timeout - * @param timeUnit - * @param callback - */ - public void asyncSendMessage(List<String> bodies, String groupId, String streamId, long dt, - long timeout, TimeUnit timeUnit, SendMessageCallback callback) { - List<String> bodyList = new ArrayList<>(bodies); - HttpMessage httpMessage = new HttpMessage(bodyList, groupId, streamId, dt, - timeout, timeUnit, callback); - try { - if (!messageCache.offer(httpMessage)) { - callback.onMessageAck(SendResult.ASYNC_CALLBACK_BUFFER_FULL); - } - } catch (Exception exception) { - logger.error("error async sending data", exception); - } - } - - /** - * async send single message. - * - * @param body - * @param groupId - * @param streamId - * @param dt - * @param timeout - * @param timeUnit - * @param callback - */ - public void asyncSendMessage(String body, String groupId, String streamId, long dt, - long timeout, TimeUnit timeUnit, SendMessageCallback callback) { - asyncSendMessage(Collections.singletonList(body), groupId, streamId, - dt, timeout, timeUnit, callback); - } - - /** - * close - */ - public void close() { - hostList.clear(); - bShutDown = true; - try { - this.interrupt(); - internalHttpSender.close(); - } catch (Exception exception) { - logger.error("error while closing http client", exception); - } - } -} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ConcurrentHashSet.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ConcurrentHashSet.java deleted file mode 100644 index 574450263a..0000000000 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ConcurrentHashSet.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dataproxy.utils; - -import java.util.Collection; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * A {@link ConcurrentHashMap}-backed {@link Set}. - */ -public class ConcurrentHashSet<E> extends MapBackedSet<E> { - - private static final long serialVersionUID = 8518578988740277828L; - - public ConcurrentHashSet() { - super(new ConcurrentHashMap<E, Boolean>()); - } - - public ConcurrentHashSet(Collection<E> c) { - super(new ConcurrentHashMap<E, Boolean>(), c); - } - - @Override - public boolean add(E o) { - Boolean answer = ((ConcurrentMap<E, Boolean>) map).putIfAbsent(o, Boolean.TRUE); - return answer == null; - } -} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ConsistencyHashUtil.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ConsistencyHashUtil.java deleted file mode 100644 index d352aa6d89..0000000000 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ConsistencyHashUtil.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dataproxy.utils; - -import com.google.common.hash.HashFunction; -import com.google.common.hash.Hashing; - -import java.nio.charset.StandardCharsets; -import java.util.Random; - -public class ConsistencyHashUtil { - - public static String hashMurMurHash(String key, int seed) { - HashFunction hashFunction = Hashing.murmur3_128(seed); - return hashFunction.hashString(key, StandardCharsets.UTF_8).toString(); - } - - public static String hashMurMurHash(String key) { - Random random = new Random(System.currentTimeMillis()); - return hashMurMurHash(key, random.nextInt()); - } -} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/MapBackedSet.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/MapBackedSet.java deleted file mode 100644 index 2b1d7a31ef..0000000000 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/MapBackedSet.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dataproxy.utils; - -import java.io.Serializable; -import java.util.AbstractSet; -import java.util.Collection; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; - -/** - * A {@link Map}-backed {@link Set}. - */ -public class MapBackedSet<E> extends AbstractSet<E> implements Serializable { - - private static final long serialVersionUID = -8347878570391674042L; - - protected final Map<E, Boolean> map; - - public MapBackedSet(Map<E, Boolean> map) { - this.map = map; - } - - public MapBackedSet(Map<E, Boolean> map, Collection<E> c) { - this.map = map; - addAll(c); - } - - @Override - public int size() { - return map.size(); - } - - @Override - public boolean contains(Object o) { - return map.containsKey(o); - } - - @Override - public Iterator<E> iterator() { - return map.keySet().iterator(); - } - - @Override - public boolean add(E o) { - return map.put(o, Boolean.TRUE) == null; - } - - @Override - public boolean remove(Object o) { - return map.remove(o) != null; - } - - @Override - public void clear() { - map.clear(); - } -}