This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 7effefa [improvement] Supports traversal of Doris FE nodes when searching for Doris BE (#11) 7effefa is described below commit 7effefa160f7b6a8e9753ad07c516f0ee160906e Author: Jiangqiao Xu <96433131+bridgedr...@users.noreply.github.com> AuthorDate: Sun Feb 27 11:08:53 2022 +0800 [improvement] Supports traversal of Doris FE nodes when searching for Doris BE (#11) --- .../org/apache/doris/flink/rest/RestService.java | 43 ++++++++++++++++++---- 1 file changed, 36 insertions(+), 7 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java index 9812840..f1d0512 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java @@ -244,6 +244,26 @@ public class RestService implements Serializable { } /** + * choice a Doris FE node to request. + * + * @param feNodes Doris FE node list, separate be comma + * @param logger slf4j logger + * @return the array of Doris FE nodes + * @throws IllegalArgumentException fe nodes is illegal + */ + @VisibleForTesting + static List<String> allEndpoints(String feNodes, Logger logger) throws IllegalArgumentException { + logger.trace("Parse fenodes '{}'.", feNodes); + if (StringUtils.isEmpty(feNodes)) { + logger.error(ILLEGAL_ARGUMENT_MESSAGE, "fenodes", feNodes); + throw new IllegalArgumentException("fenodes", feNodes); + } + List<String> nodes = Arrays.stream(feNodes.split(",")).map(String::trim).collect(Collectors.toList()); + Collections.shuffle(nodes); + return nodes; + } + + /** * choice a Doris BE node to request. * * @param options configuration of request @@ -328,13 +348,22 @@ public class RestService implements Serializable { @VisibleForTesting static List<BackendV2.BackendRowV2> getBackendsV2(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException { String feNodes = options.getFenodes(); - String feNode = randomEndpoint(feNodes, logger); - String beUrl = "http://" + feNode + BACKENDS_V2; - HttpGet httpGet = new HttpGet(beUrl); - String response = send(options, readOptions, httpGet, logger); - logger.info("Backend Info:{}", response); - List<BackendV2.BackendRowV2> backends = parseBackendV2(response, logger); - return backends; + List<String> feNodeList = allEndpoints(feNodes, logger); + for (String feNode: feNodeList) { + try { + String beUrl = "http://" + feNode + BACKENDS_V2; + HttpGet httpGet = new HttpGet(beUrl); + String response = send(options, readOptions, httpGet, logger); + logger.info("Backend Info:{}", response); + List<BackendV2.BackendRowV2> backends = parseBackendV2(response, logger); + return backends; + } catch (ConnectedFailedException e) { + logger.info("Doris FE node {} is unavailable: {}, Request the next Doris FE node", feNode, e.getMessage()); + } + } + String errMsg = "No Doris FE is available, please check configuration"; + logger.error(errMsg); + throw new DorisException(errMsg); } static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger logger) throws DorisException, IOException { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org