This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-for-flink-before-1.13
in repository 
https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git


The following commit(s) were added to refs/heads/branch-for-flink-before-1.13 
by this push:
     new 3ebb83a  [improvement] Supports traversal of Doris FE nodes when 
searching for Doris BE (#11)
3ebb83a is described below

commit 3ebb83a9c65b275b538c1458be8cf4c0a77a68c1
Author: Jiangqiao Xu <96433131+bridgedr...@users.noreply.github.com>
AuthorDate: Sun Feb 27 11:08:39 2022 +0800

    [improvement] Supports traversal of Doris FE nodes when searching for Doris 
BE (#11)
---
 .../org/apache/doris/flink/rest/RestService.java   | 45 ++++++++++++++++++----
 1 file changed, 37 insertions(+), 8 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index 0c4264f..c229cf0 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -244,6 +244,26 @@ public class RestService implements Serializable {
     }
 
     /**
+     * choice a Doris FE node to request.
+     *
+     * @param feNodes Doris FE node list, separate be comma
+     * @param logger  slf4j logger
+     * @return the array of Doris FE nodes
+     * @throws IllegalArgumentException fe nodes is illegal
+     */
+    @VisibleForTesting
+    static List<String> allEndpoints(String feNodes, Logger logger) throws 
IllegalArgumentException {
+        logger.trace("Parse fenodes '{}'.", feNodes);
+        if (StringUtils.isEmpty(feNodes)) {
+            logger.error(ILLEGAL_ARGUMENT_MESSAGE, "fenodes", feNodes);
+            throw new IllegalArgumentException("fenodes", feNodes);
+        }
+        List<String> nodes = 
Arrays.stream(feNodes.split(",")).map(String::trim).collect(Collectors.toList());
+        Collections.shuffle(nodes);
+        return nodes;
+    }
+
+    /**
      * choice a Doris BE node to request.
      *
      * @param options configuration of request
@@ -322,19 +342,28 @@ public class RestService implements Serializable {
      *
      * @param options configuration of request
      * @param logger  slf4j logger
-     * @return the chosen one Doris BE node
+     * @return all Doris BE node
      * @throws IllegalArgumentException BE nodes is illegal
      */
     @VisibleForTesting
     static List<BackendV2.BackendRowV2> getBackendsV2(DorisOptions options, 
DorisReadOptions readOptions, Logger logger) throws DorisException, IOException 
{
         String feNodes = options.getFenodes();
-        String feNode = randomEndpoint(feNodes, logger);
-        String beUrl = "http://"; + feNode + BACKENDS_V2;
-        HttpGet httpGet = new HttpGet(beUrl);
-        String response = send(options, readOptions, httpGet, logger);
-        logger.info("Backend Info:{}", response);
-        List<BackendV2.BackendRowV2> backends = parseBackendV2(response, 
logger);
-        return backends;
+        List<String> feNodeList = allEndpoints(feNodes, logger);
+        for (String feNode: feNodeList) {
+            try {
+                String beUrl = "http://"; + feNode + BACKENDS_V2;
+                HttpGet httpGet = new HttpGet(beUrl);
+                String response = send(options, readOptions, httpGet, logger);
+                logger.info("Backend Info:{}", response);
+                List<BackendV2.BackendRowV2> backends = 
parseBackendV2(response, logger);
+                return backends;
+            } catch (ConnectedFailedException e) {
+                logger.info("Doris FE node {} is unavailable: {}, Request the 
next Doris FE node", feNode, e.getMessage());
+            }
+        }
+        String errMsg = "No Doris FE is available, please check configuration";
+        logger.error(errMsg);
+        throw new DorisException(errMsg);
     }
 
     static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger 
logger) throws DorisException, IOException {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to