This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fe527d053 [INLONG-11729][SDK] Optimize TcpClientExample and 
HttpClientExample codes (#11730)
1fe527d053 is described below

commit 1fe527d053121d7be9381784d3bb15f0c14ddb24
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Sat Feb 8 16:51:07 2025 +0800

    [INLONG-11729][SDK] Optimize TcpClientExample and HttpClientExample codes 
(#11730)
    
    Co-authored-by: gosonzhang <gosonzh...@tencent.com>
---
 .../apache/inlong/sdk/dataproxy/example/Event.java |  97 --------------------
 .../sdk/dataproxy/example/HttpClientExample.java   | 102 +++++++++++++++------
 .../sdk/dataproxy/example/MyMessageCallBack.java   |  60 ------------
 .../sdk/dataproxy/example/SendMsgThread.java       |  74 ---------------
 .../sdk/dataproxy/example/TcpClientExample.java    |  85 ++++++++---------
 5 files changed, 115 insertions(+), 303 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/Event.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/Event.java
deleted file mode 100644
index bbb7fe9140..0000000000
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/Event.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.example;
-
-import java.util.ArrayList;
-
-public class Event {
-
-    private byte[] body;
-    private String groupId;
-    private String streamId;
-    private long dt;
-    private int tryTimes = 0;
-    ArrayList<byte[]> bodylist = new ArrayList<byte[]>();
-
-    public Event(byte[] body, String groupId, String streamId, long dt) {
-        super();
-        this.body = body;
-        this.groupId = groupId;
-        this.streamId = streamId;
-        this.dt = dt;
-        this.setTryTimes(0);
-    }
-
-    public Event(ArrayList<byte[]> bodylist, String groupId, String streamId, 
long dt) {
-        super();
-        this.bodylist = bodylist;
-        this.groupId = groupId;
-        this.streamId = streamId;
-        this.dt = dt;
-        this.setTryTimes(0);
-    }
-
-    public ArrayList<byte[]> getBodylist() {
-        return bodylist;
-    }
-
-    public void setBodylist(ArrayList<byte[]> bodylist) {
-        this.bodylist = bodylist;
-    }
-
-    public byte[] getBody() {
-        return body;
-    }
-
-    public void setBody(byte[] body) {
-        this.body = body;
-    }
-
-    public String getGroupId() {
-        return groupId;
-    }
-
-    public void setGroupId(String groupId) {
-        this.groupId = groupId;
-    }
-
-    public String getStreamId() {
-        return streamId;
-    }
-
-    public void setStreamId(String streamId) {
-        this.streamId = streamId;
-    }
-
-    public long getDt() {
-        return dt;
-    }
-
-    public void setDt(long dt) {
-        this.dt = dt;
-    }
-
-    public int getTryTimes() {
-        return tryTimes;
-    }
-
-    public void setTryTimes(int tryTimes) {
-        this.tryTimes = tryTimes;
-    }
-
-}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
index 502932d17d..730cb775c9 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
@@ -17,16 +17,21 @@
 
 package org.apache.inlong.sdk.dataproxy.example;
 
-import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
-import org.apache.inlong.sdk.dataproxy.network.HttpProxySender;
+import org.apache.inlong.sdk.dataproxy.MsgSenderFactory;
+import org.apache.inlong.sdk.dataproxy.MsgSenderSingleFactory;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpEventInfo;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSender;
 import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class HttpClientExample {
 
+    private static final Logger logger = 
LoggerFactory.getLogger(HttpClientExample.class);
+
     public static void main(String[] args) {
         String inlongGroupId = "test_group_id";
         String inlongStreamId = "test_stream_id";
@@ -35,40 +40,77 @@ public class HttpClientExample {
         String inLongManagerPort = "8083";
         String messageBody = "inlong message body!";
 
-        HttpProxySender sender = getMessageSender(inLongManagerAddr,
-                inLongManagerPort, inlongGroupId, true, false,
-                configBasePath);
+        // build sender factory
+        MsgSenderSingleFactory senderFactory = new MsgSenderSingleFactory();
+        // build sender object
+        HttpMsgSender sender = getMessageSender(senderFactory, false,
+                inLongManagerAddr, inLongManagerPort, inlongGroupId, false, 
configBasePath);
+        // send message
         sendHttpMessage(sender, inlongGroupId, inlongStreamId, messageBody);
-        sender.close(); // close the sender
+        // close all senders
+        sender.close();
     }
 
-    public static HttpProxySender getMessageSender(String inLongManagerAddr,
-            String inLongManagerPort, String inlongGroupId,
-            boolean requestByHttp, boolean isReadProxyIPFromLocal,
+    public static HttpMsgSender getMessageSender(MsgSenderFactory 
senderFactory, boolean visitMsgByHttps,
+            String managerAddr, String managerPort, String inlongGroupId, 
boolean useLocalMetaConfig,
             String configBasePath) {
-        HttpMsgSenderConfig httpConfig = null;
-        HttpProxySender sender = null;
+        HttpMsgSender sender = null;
         try {
-            httpConfig = new HttpMsgSenderConfig(requestByHttp, 
inLongManagerAddr,
-                    Integer.valueOf(inLongManagerPort),
-                    inlongGroupId, "admin", "inlong");// user and password of 
manager
-            httpConfig.setMetaStoreBasePath(configBasePath);
-            httpConfig.setOnlyUseLocalProxyConfig(isReadProxyIPFromLocal);
+            HttpMsgSenderConfig httpConfig = new 
HttpMsgSenderConfig(visitMsgByHttps, managerAddr,
+                    Integer.parseInt(managerPort), inlongGroupId, "admin", 
"inlong");
             httpConfig.setDiscardHttpCacheWhenClosing(true);
-            sender = new HttpProxySender(httpConfig);
-        } catch (ProxySdkException e) {
-            e.printStackTrace();
-        } catch (Exception e) {
-            e.printStackTrace();
+            httpConfig.setMetaStoreBasePath(configBasePath);
+            httpConfig.setOnlyUseLocalProxyConfig(useLocalMetaConfig);
+            httpConfig.setHttpConTimeoutMs(20000);
+            sender = senderFactory.genHttpSenderByGroupId(httpConfig);
+        } catch (Throwable ex) {
+            System.out.println("Get MessageSender throw exception, " + ex);
         }
         return sender;
     }
 
-    public static void sendHttpMessage(HttpProxySender sender, String 
inlongGroupId,
-            String inlongStreamId, String messageBody) {
-        List<String> bodyList = new ArrayList<>();
-        bodyList.add(messageBody);
-        sender.asyncSendMessage(bodyList, inlongGroupId, inlongStreamId, 
System.currentTimeMillis(),
-                20, TimeUnit.SECONDS, new MyMessageCallBack());
+    public static void sendHttpMessage(HttpMsgSender sender,
+            String inlongGroupId, String inlongStreamId, String messageBody) {
+        try {
+            ProcessResult procResult = new ProcessResult();
+            if (!sender.asyncSendMessage(new HttpEventInfo(inlongGroupId,
+                    inlongStreamId, System.currentTimeMillis(), messageBody), 
new MyMessageCallBack(), procResult)) {
+                System.out.println("Send message failure, result = " + 
procResult);
+                return;
+            }
+            System.out.println("Send message success!");
+        } catch (Throwable ex) {
+            System.out.println("Send message exception" + ex);
+        }
+    }
+
+    // async callback class
+    public static class MyMessageCallBack implements MsgSendCallback {
+
+        private HttpMsgSender messageSender = null;
+        private HttpEventInfo event = null;
+
+        public MyMessageCallBack() {
+
+        }
+
+        public MyMessageCallBack(HttpMsgSender messageSender, HttpEventInfo 
event) {
+            this.messageSender = messageSender;
+            this.event = event;
+        }
+
+        @Override
+        public void onMessageAck(ProcessResult result) {
+            if (result.isSuccess()) {
+                logger.info("onMessageAck return Ok");
+            } else {
+                logger.info("onMessageAck return failure = {}", result);
+            }
+        }
+
+        @Override
+        public void onException(Throwable ex) {
+            logger.error("Send message throw exception", ex);
+        }
     }
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyMessageCallBack.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyMessageCallBack.java
deleted file mode 100644
index d9b5c08132..0000000000
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyMessageCallBack.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.example;
-
-import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MyMessageCallBack implements SendMessageCallback {
-
-    private static final Logger logger = LoggerFactory
-            .getLogger(MyMessageCallBack.class);
-
-    private DefaultMessageSender messageSender = null;
-    private Event event = null;
-
-    public MyMessageCallBack() {
-
-    }
-
-    public MyMessageCallBack(DefaultMessageSender messageSender, Event event) {
-        super();
-        this.messageSender = messageSender;
-        this.event = event;
-    }
-
-    @Override
-    public void onMessageAck(SendResult result) {
-        if (result == SendResult.OK) {
-            logger.info("onMessageAck return Ok");
-        } else {
-            logger.info("onMessageAck return failure = {}", result);
-        }
-    }
-
-    @Override
-    public void onException(Throwable e) {
-        logger.error("Send message failure, error {}", e.getMessage());
-        e.printStackTrace();
-    }
-
-}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/SendMsgThread.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/SendMsgThread.java
deleted file mode 100644
index 4658bb1a05..0000000000
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/SendMsgThread.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.example;
-
-import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.io.IOException;
-
-public class SendMsgThread extends Thread {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(SendMsgThread.class);
-    private DefaultMessageSender messageSender = null;
-
-    public SendMsgThread(DefaultMessageSender messageSender) {
-        this.messageSender = messageSender;
-    }
-
-    @Override
-    public void run() {
-        FileReader reader = null;
-        try {
-            reader = new FileReader("/data/work/jessey/d5.txt");
-
-            BufferedReader br = new BufferedReader(reader);
-            String line = null;
-            while ((line = br.readLine()) != null) {
-
-                long startTime = System.currentTimeMillis();
-                SendResult result = 
messageSender.sendMessage("hhhh".getBytes("utf8"),
-                        "b_test", "n_test1", 0, 
String.valueOf(System.currentTimeMillis()));
-                long endTime = System.currentTimeMillis();
-                if (result == result.OK) {
-                    logger.info("this msg is ok time {}", endTime - startTime);
-                } else {
-                    logger.info("this msg is error ,{}", result);
-                }
-            }
-        } catch (Exception e) {
-            logger.error("{}", e.getMessage());
-            e.printStackTrace();
-
-        } finally {
-            if (reader != null) {
-                try {
-                    reader.close();
-                } catch (IOException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-
-    }
-}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
index ab7e674f80..ae653a02de 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
@@ -17,15 +17,18 @@
 
 package org.apache.inlong.sdk.dataproxy.example;
 
-import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
+import org.apache.inlong.common.msg.MsgType;
+import org.apache.inlong.sdk.dataproxy.MsgSenderFactory;
+import org.apache.inlong.sdk.dataproxy.MsgSenderSingleFactory;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender;
 import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
 
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
 
 public class TcpClientExample {
 
@@ -38,59 +41,57 @@ public class TcpClientExample {
 
         String inlongGroupId = "test_group_id";
         String inlongStreamId = "test_stream_id";
-
         String configBasePath = "";
         String inLongManagerAddr = "127.0.0.1";
         String inLongManagerPort = "8083";
-
-        /*
-         * It is recommended to use type 7. For others, please refer to the 
official related documents
-         */
-        int msgType = 7;
+        int msgType = 7; // default report type
         String messageBody = "inglong-message-random-body!";
 
+        // build sender factory
+        MsgSenderSingleFactory senderFactory = new MsgSenderSingleFactory();
+        // build sender object
         TcpClientExample tcpClientExample = new TcpClientExample();
-        DefaultMessageSender sender = tcpClientExample
-                .getMessageSender(inLongManagerAddr, inLongManagerPort,
-                        inlongGroupId, true, false, configBasePath, msgType);
-        tcpClientExample.sendTcpMessage(sender, inlongGroupId, inlongStreamId,
-                messageBody, System.currentTimeMillis());
-        sender.close(); // close the sender
+        TcpMsgSender sender = tcpClientExample.getMessageSender(senderFactory, 
false,
+                inLongManagerAddr, inLongManagerPort, inlongGroupId, msgType, 
false, configBasePath);
+        // send message
+        tcpClientExample.sendTcpMessage(sender,
+                inlongGroupId, inlongStreamId, System.currentTimeMillis(), 
messageBody);
+        // close all senders
+        senderFactory.shutdownAll();
     }
 
-    public DefaultMessageSender getMessageSender(String inLongManagerAddr, 
String inLongManagerPort,
-            String inlongGroupId, boolean requestByHttp, boolean 
isReadProxyIPFromLocal,
-            String configBasePath, int msgType) {
-        TcpMsgSenderConfig tcpConfig = null;
-        DefaultMessageSender messageSender = null;
+    public TcpMsgSender getMessageSender(MsgSenderFactory senderFactory, 
boolean visitMgrByHttps,
+            String managerAddr, String managerPort, String inlongGroupId, int 
msgType,
+            boolean useLocalMetaConfig, String configBasePath) {
+        TcpMsgSender messageSender = null;
         try {
-            tcpConfig = new TcpMsgSenderConfig(requestByHttp, 
inLongManagerAddr,
-                    Integer.valueOf(inLongManagerPort), inlongGroupId, 
"admin", "inlong");
-            if (StringUtils.isNotEmpty(configBasePath)) {
-                tcpConfig.setMetaStoreBasePath(configBasePath);
-            }
-            tcpConfig.setOnlyUseLocalProxyConfig(isReadProxyIPFromLocal);
+            // build sender configure
+            TcpMsgSenderConfig tcpConfig =
+                    new TcpMsgSenderConfig(visitMgrByHttps, managerAddr,
+                            Integer.parseInt(managerPort), inlongGroupId, 
"admin", "inlong");
+            tcpConfig.setMetaStoreBasePath(configBasePath);
+            tcpConfig.setOnlyUseLocalProxyConfig(useLocalMetaConfig);
+            tcpConfig.setSdkMsgType(MsgType.valueOf(msgType));
             tcpConfig.setRequestTimeoutMs(20000L);
-            messageSender = 
DefaultMessageSender.generateSenderByClusterId(tcpConfig);
-            messageSender.setMsgtype(msgType);
-        } catch (Exception e) {
-            logger.error("getMessageSender has exception e = {}", e);
+            // build sender object
+            messageSender = senderFactory.genTcpSenderByClusterId(tcpConfig);
+        } catch (Throwable ex) {
+            System.out.println("Get MessageSender throw exception, " + ex);
         }
         return messageSender;
     }
 
-    public void sendTcpMessage(DefaultMessageSender sender, String 
inlongGroupId,
-            String inlongStreamId, String messageBody, long dt) {
-        SendResult result = null;
+    public void sendTcpMessage(TcpMsgSender sender,
+            String inlongGroupId, String inlongStreamId, long dt, String 
messageBody) {
+        ProcessResult procResult = new ProcessResult();
         try {
-            result = sender.sendMessage(messageBody.getBytes("utf8"), 
inlongGroupId, inlongStreamId,
-                    0, String.valueOf(dt));
-
-        } catch (UnsupportedEncodingException e) {
-            e.printStackTrace();
+            sender.sendMessage(new TcpEventInfo(inlongGroupId, inlongStreamId,
+                    dt, null, messageBody.getBytes(StandardCharsets.UTF_8)), 
procResult);
+        } catch (Throwable ex) {
+            System.out.println("Message sent throw exception, " + ex);
+            return;
         }
-        System.out.println("messageSender" + result);
-        logger.info("messageSender {}", result);
+        System.out.println("Message sent result = " + procResult);
+        logger.info("Message sent result = {}", procResult);
     }
-
 }

Reply via email to