This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 1fe527d053 [INLONG-11729][SDK] Optimize TcpClientExample and HttpClientExample codes (#11730) 1fe527d053 is described below commit 1fe527d053121d7be9381784d3bb15f0c14ddb24 Author: Goson Zhang <4675...@qq.com> AuthorDate: Sat Feb 8 16:51:07 2025 +0800 [INLONG-11729][SDK] Optimize TcpClientExample and HttpClientExample codes (#11730) Co-authored-by: gosonzhang <gosonzh...@tencent.com> --- .../apache/inlong/sdk/dataproxy/example/Event.java | 97 -------------------- .../sdk/dataproxy/example/HttpClientExample.java | 102 +++++++++++++++------ .../sdk/dataproxy/example/MyMessageCallBack.java | 60 ------------ .../sdk/dataproxy/example/SendMsgThread.java | 74 --------------- .../sdk/dataproxy/example/TcpClientExample.java | 85 ++++++++--------- 5 files changed, 115 insertions(+), 303 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/Event.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/Event.java deleted file mode 100644 index bbb7fe9140..0000000000 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/Event.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dataproxy.example; - -import java.util.ArrayList; - -public class Event { - - private byte[] body; - private String groupId; - private String streamId; - private long dt; - private int tryTimes = 0; - ArrayList<byte[]> bodylist = new ArrayList<byte[]>(); - - public Event(byte[] body, String groupId, String streamId, long dt) { - super(); - this.body = body; - this.groupId = groupId; - this.streamId = streamId; - this.dt = dt; - this.setTryTimes(0); - } - - public Event(ArrayList<byte[]> bodylist, String groupId, String streamId, long dt) { - super(); - this.bodylist = bodylist; - this.groupId = groupId; - this.streamId = streamId; - this.dt = dt; - this.setTryTimes(0); - } - - public ArrayList<byte[]> getBodylist() { - return bodylist; - } - - public void setBodylist(ArrayList<byte[]> bodylist) { - this.bodylist = bodylist; - } - - public byte[] getBody() { - return body; - } - - public void setBody(byte[] body) { - this.body = body; - } - - public String getGroupId() { - return groupId; - } - - public void setGroupId(String groupId) { - this.groupId = groupId; - } - - public String getStreamId() { - return streamId; - } - - public void setStreamId(String streamId) { - this.streamId = streamId; - } - - public long getDt() { - return dt; - } - - public void setDt(long dt) { - this.dt = dt; - } - - public int getTryTimes() { - return tryTimes; - } - - public void setTryTimes(int tryTimes) { - this.tryTimes = tryTimes; - } - -} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java index 502932d17d..730cb775c9 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java @@ -17,16 +17,21 @@ package org.apache.inlong.sdk.dataproxy.example; -import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException; -import org.apache.inlong.sdk.dataproxy.network.HttpProxySender; +import org.apache.inlong.sdk.dataproxy.MsgSenderFactory; +import org.apache.inlong.sdk.dataproxy.MsgSenderSingleFactory; +import org.apache.inlong.sdk.dataproxy.common.ProcessResult; +import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback; +import org.apache.inlong.sdk.dataproxy.sender.http.HttpEventInfo; +import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSender; import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class HttpClientExample { + private static final Logger logger = LoggerFactory.getLogger(HttpClientExample.class); + public static void main(String[] args) { String inlongGroupId = "test_group_id"; String inlongStreamId = "test_stream_id"; @@ -35,40 +40,77 @@ public class HttpClientExample { String inLongManagerPort = "8083"; String messageBody = "inlong message body!"; - HttpProxySender sender = getMessageSender(inLongManagerAddr, - inLongManagerPort, inlongGroupId, true, false, - configBasePath); + // build sender factory + MsgSenderSingleFactory senderFactory = new MsgSenderSingleFactory(); + // build sender object + HttpMsgSender sender = getMessageSender(senderFactory, false, + inLongManagerAddr, inLongManagerPort, inlongGroupId, false, configBasePath); + // send message sendHttpMessage(sender, inlongGroupId, inlongStreamId, messageBody); - sender.close(); // close the sender + // close all senders + sender.close(); } - public static HttpProxySender getMessageSender(String inLongManagerAddr, - String inLongManagerPort, String inlongGroupId, - boolean requestByHttp, boolean isReadProxyIPFromLocal, + public static HttpMsgSender getMessageSender(MsgSenderFactory senderFactory, boolean visitMsgByHttps, + String managerAddr, String managerPort, String inlongGroupId, boolean useLocalMetaConfig, String configBasePath) { - HttpMsgSenderConfig httpConfig = null; - HttpProxySender sender = null; + HttpMsgSender sender = null; try { - httpConfig = new HttpMsgSenderConfig(requestByHttp, inLongManagerAddr, - Integer.valueOf(inLongManagerPort), - inlongGroupId, "admin", "inlong");// user and password of manager - httpConfig.setMetaStoreBasePath(configBasePath); - httpConfig.setOnlyUseLocalProxyConfig(isReadProxyIPFromLocal); + HttpMsgSenderConfig httpConfig = new HttpMsgSenderConfig(visitMsgByHttps, managerAddr, + Integer.parseInt(managerPort), inlongGroupId, "admin", "inlong"); httpConfig.setDiscardHttpCacheWhenClosing(true); - sender = new HttpProxySender(httpConfig); - } catch (ProxySdkException e) { - e.printStackTrace(); - } catch (Exception e) { - e.printStackTrace(); + httpConfig.setMetaStoreBasePath(configBasePath); + httpConfig.setOnlyUseLocalProxyConfig(useLocalMetaConfig); + httpConfig.setHttpConTimeoutMs(20000); + sender = senderFactory.genHttpSenderByGroupId(httpConfig); + } catch (Throwable ex) { + System.out.println("Get MessageSender throw exception, " + ex); } return sender; } - public static void sendHttpMessage(HttpProxySender sender, String inlongGroupId, - String inlongStreamId, String messageBody) { - List<String> bodyList = new ArrayList<>(); - bodyList.add(messageBody); - sender.asyncSendMessage(bodyList, inlongGroupId, inlongStreamId, System.currentTimeMillis(), - 20, TimeUnit.SECONDS, new MyMessageCallBack()); + public static void sendHttpMessage(HttpMsgSender sender, + String inlongGroupId, String inlongStreamId, String messageBody) { + try { + ProcessResult procResult = new ProcessResult(); + if (!sender.asyncSendMessage(new HttpEventInfo(inlongGroupId, + inlongStreamId, System.currentTimeMillis(), messageBody), new MyMessageCallBack(), procResult)) { + System.out.println("Send message failure, result = " + procResult); + return; + } + System.out.println("Send message success!"); + } catch (Throwable ex) { + System.out.println("Send message exception" + ex); + } + } + + // async callback class + public static class MyMessageCallBack implements MsgSendCallback { + + private HttpMsgSender messageSender = null; + private HttpEventInfo event = null; + + public MyMessageCallBack() { + + } + + public MyMessageCallBack(HttpMsgSender messageSender, HttpEventInfo event) { + this.messageSender = messageSender; + this.event = event; + } + + @Override + public void onMessageAck(ProcessResult result) { + if (result.isSuccess()) { + logger.info("onMessageAck return Ok"); + } else { + logger.info("onMessageAck return failure = {}", result); + } + } + + @Override + public void onException(Throwable ex) { + logger.error("Send message throw exception", ex); + } } } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyMessageCallBack.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyMessageCallBack.java deleted file mode 100644 index d9b5c08132..0000000000 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyMessageCallBack.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dataproxy.example; - -import org.apache.inlong.sdk.dataproxy.DefaultMessageSender; -import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback; -import org.apache.inlong.sdk.dataproxy.common.SendResult; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MyMessageCallBack implements SendMessageCallback { - - private static final Logger logger = LoggerFactory - .getLogger(MyMessageCallBack.class); - - private DefaultMessageSender messageSender = null; - private Event event = null; - - public MyMessageCallBack() { - - } - - public MyMessageCallBack(DefaultMessageSender messageSender, Event event) { - super(); - this.messageSender = messageSender; - this.event = event; - } - - @Override - public void onMessageAck(SendResult result) { - if (result == SendResult.OK) { - logger.info("onMessageAck return Ok"); - } else { - logger.info("onMessageAck return failure = {}", result); - } - } - - @Override - public void onException(Throwable e) { - logger.error("Send message failure, error {}", e.getMessage()); - e.printStackTrace(); - } - -} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/SendMsgThread.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/SendMsgThread.java deleted file mode 100644 index 4658bb1a05..0000000000 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/SendMsgThread.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dataproxy.example; - -import org.apache.inlong.sdk.dataproxy.DefaultMessageSender; -import org.apache.inlong.sdk.dataproxy.common.SendResult; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.FileReader; -import java.io.IOException; - -public class SendMsgThread extends Thread { - - private static final Logger logger = LoggerFactory.getLogger(SendMsgThread.class); - private DefaultMessageSender messageSender = null; - - public SendMsgThread(DefaultMessageSender messageSender) { - this.messageSender = messageSender; - } - - @Override - public void run() { - FileReader reader = null; - try { - reader = new FileReader("/data/work/jessey/d5.txt"); - - BufferedReader br = new BufferedReader(reader); - String line = null; - while ((line = br.readLine()) != null) { - - long startTime = System.currentTimeMillis(); - SendResult result = messageSender.sendMessage("hhhh".getBytes("utf8"), - "b_test", "n_test1", 0, String.valueOf(System.currentTimeMillis())); - long endTime = System.currentTimeMillis(); - if (result == result.OK) { - logger.info("this msg is ok time {}", endTime - startTime); - } else { - logger.info("this msg is error ,{}", result); - } - } - } catch (Exception e) { - logger.error("{}", e.getMessage()); - e.printStackTrace(); - - } finally { - if (reader != null) { - try { - reader.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - - } -} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java index ab7e674f80..ae653a02de 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java @@ -17,15 +17,18 @@ package org.apache.inlong.sdk.dataproxy.example; -import org.apache.inlong.sdk.dataproxy.DefaultMessageSender; -import org.apache.inlong.sdk.dataproxy.common.SendResult; +import org.apache.inlong.common.msg.MsgType; +import org.apache.inlong.sdk.dataproxy.MsgSenderFactory; +import org.apache.inlong.sdk.dataproxy.MsgSenderSingleFactory; +import org.apache.inlong.sdk.dataproxy.common.ProcessResult; +import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo; +import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender; import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; public class TcpClientExample { @@ -38,59 +41,57 @@ public class TcpClientExample { String inlongGroupId = "test_group_id"; String inlongStreamId = "test_stream_id"; - String configBasePath = ""; String inLongManagerAddr = "127.0.0.1"; String inLongManagerPort = "8083"; - - /* - * It is recommended to use type 7. For others, please refer to the official related documents - */ - int msgType = 7; + int msgType = 7; // default report type String messageBody = "inglong-message-random-body!"; + // build sender factory + MsgSenderSingleFactory senderFactory = new MsgSenderSingleFactory(); + // build sender object TcpClientExample tcpClientExample = new TcpClientExample(); - DefaultMessageSender sender = tcpClientExample - .getMessageSender(inLongManagerAddr, inLongManagerPort, - inlongGroupId, true, false, configBasePath, msgType); - tcpClientExample.sendTcpMessage(sender, inlongGroupId, inlongStreamId, - messageBody, System.currentTimeMillis()); - sender.close(); // close the sender + TcpMsgSender sender = tcpClientExample.getMessageSender(senderFactory, false, + inLongManagerAddr, inLongManagerPort, inlongGroupId, msgType, false, configBasePath); + // send message + tcpClientExample.sendTcpMessage(sender, + inlongGroupId, inlongStreamId, System.currentTimeMillis(), messageBody); + // close all senders + senderFactory.shutdownAll(); } - public DefaultMessageSender getMessageSender(String inLongManagerAddr, String inLongManagerPort, - String inlongGroupId, boolean requestByHttp, boolean isReadProxyIPFromLocal, - String configBasePath, int msgType) { - TcpMsgSenderConfig tcpConfig = null; - DefaultMessageSender messageSender = null; + public TcpMsgSender getMessageSender(MsgSenderFactory senderFactory, boolean visitMgrByHttps, + String managerAddr, String managerPort, String inlongGroupId, int msgType, + boolean useLocalMetaConfig, String configBasePath) { + TcpMsgSender messageSender = null; try { - tcpConfig = new TcpMsgSenderConfig(requestByHttp, inLongManagerAddr, - Integer.valueOf(inLongManagerPort), inlongGroupId, "admin", "inlong"); - if (StringUtils.isNotEmpty(configBasePath)) { - tcpConfig.setMetaStoreBasePath(configBasePath); - } - tcpConfig.setOnlyUseLocalProxyConfig(isReadProxyIPFromLocal); + // build sender configure + TcpMsgSenderConfig tcpConfig = + new TcpMsgSenderConfig(visitMgrByHttps, managerAddr, + Integer.parseInt(managerPort), inlongGroupId, "admin", "inlong"); + tcpConfig.setMetaStoreBasePath(configBasePath); + tcpConfig.setOnlyUseLocalProxyConfig(useLocalMetaConfig); + tcpConfig.setSdkMsgType(MsgType.valueOf(msgType)); tcpConfig.setRequestTimeoutMs(20000L); - messageSender = DefaultMessageSender.generateSenderByClusterId(tcpConfig); - messageSender.setMsgtype(msgType); - } catch (Exception e) { - logger.error("getMessageSender has exception e = {}", e); + // build sender object + messageSender = senderFactory.genTcpSenderByClusterId(tcpConfig); + } catch (Throwable ex) { + System.out.println("Get MessageSender throw exception, " + ex); } return messageSender; } - public void sendTcpMessage(DefaultMessageSender sender, String inlongGroupId, - String inlongStreamId, String messageBody, long dt) { - SendResult result = null; + public void sendTcpMessage(TcpMsgSender sender, + String inlongGroupId, String inlongStreamId, long dt, String messageBody) { + ProcessResult procResult = new ProcessResult(); try { - result = sender.sendMessage(messageBody.getBytes("utf8"), inlongGroupId, inlongStreamId, - 0, String.valueOf(dt)); - - } catch (UnsupportedEncodingException e) { - e.printStackTrace(); + sender.sendMessage(new TcpEventInfo(inlongGroupId, inlongStreamId, + dt, null, messageBody.getBytes(StandardCharsets.UTF_8)), procResult); + } catch (Throwable ex) { + System.out.println("Message sent throw exception, " + ex); + return; } - System.out.println("messageSender" + result); - logger.info("messageSender {}", result); + System.out.println("Message sent result = " + procResult); + logger.info("Message sent result = {}", procResult); } - }