+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_FENODES;
+import static 
+import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_READ_FIELD;
+import static 
+import static 
+import static 
+import static 
+import static 
+import static 
+import static org.apache.doris.spark.util.ErrorMessages.CONNECT_FAILED_MESSAGE;
+import static 
+import static 
+import static 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.spark.cfg.ConfigurationOptions;
+import org.apache.doris.spark.cfg.Settings;
+import org.apache.doris.spark.exception.ConnectedFailedException;
+import org.apache.doris.spark.exception.DorisException;
+import org.apache.doris.spark.exception.IllegalArgumentException;
+import org.apache.doris.spark.exception.ShouldNeverHappenException;
+import org.apache.http.HttpStatus;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.codehaus.jackson.JsonParseException;
+import org.slf4j.Logger;
+ * Service for communicate with Doris FE.
+ */
+public class RestService implements Serializable {
+    public final static int REST_RESPONSE_STATUS_OK = 200;
+    private static final String API_PREFIX = "/api";
+    private static final String SCHEMA = "_schema";
+    private static final String QUERY_PLAN = "_query_plan";
+    /**
+     * send request to Doris FE and get response json string.
+     * @param cfg configuration of request
+     * @param request {@link HttpRequestBase} real request
+     * @param logger {@link Logger}
+     * @return Doris FE response in json string
+     * @throws ConnectedFailedException throw when cannot connect to Doris FE
+     */
+    private static String send(Settings cfg, HttpRequestBase request, Logger 
logger) throws
+            ConnectedFailedException {
+        int connectTimeout = 
+                ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_DEFAULT);
+        int socketTimeout = 
+                ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_DEFAULT);
+        int retries = 
+                ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT);
+        logger.trace("connect timeout set to '{}'. socket timeout set to '{}'. 
retries set to '{}'.",
+                connectTimeout, socketTimeout, retries);
+        RequestConfig requestConfig = RequestConfig.custom()
+                .setConnectTimeout(connectTimeout)
+                .setSocketTimeout(socketTimeout)
+                .build();
+        request.setConfig(requestConfig);
+        String user = cfg.getProperty(DORIS_REQUEST_AUTH_USER, "");
+        String password = cfg.getProperty(DORIS_REQUEST_AUTH_PASSWORD, "");
+        CredentialsProvider credentialsProvider = new 
+        credentialsProvider.setCredentials(
+                AuthScope.ANY,
+                new UsernamePasswordCredentials(user, password));
+        HttpClientContext context = HttpClientContext.create();
+        context.setCredentialsProvider(credentialsProvider);
+"Send request to Doris FE '{}' with user '{}'.", 
request.getURI(), user);
+        IOException ex = null;
+        int statusCode = -1;
+        for (int attempt = 0; attempt < retries; attempt++) {
+            CloseableHttpClient httpClient = HttpClients.createDefault();
+            logger.debug("Attempt {} to request {}.", attempt, 
+            try {
+                CloseableHttpResponse response = httpClient.execute(request, 
+                statusCode = response.getStatusLine().getStatusCode();
+                if (statusCode != HttpStatus.SC_OK) {
+                    logger.warn("Failed to get response from Doris FE {}, http 
code is {}",
+                            request.getURI(), statusCode);
+                    continue;
+                }
+                String res = EntityUtils.toString(response.getEntity(), 
+                logger.trace("Success get response from Doris FE: {}, response 
is: {}.",
+                        request.getURI(), res);
+                return res;
+            } catch (IOException e) {
+                ex = e;
+                logger.warn(CONNECT_FAILED_MESSAGE, request.getURI(), e);
+            }
+        }
+        logger.error(CONNECT_FAILED_MESSAGE, request.getURI(), ex);
+        throw new ConnectedFailedException(request.getURI().toString(), 
statusCode, ex);
+    }
+    /**
+     * parse table identifier to array.
+     * @param tableIdentifier table identifier string
+     * @param logger {@link Logger}
+     * @return first element is db name, second element is table name
+     * @throws IllegalArgumentException table identifier is illegal
+     */
+    @VisibleForTesting
+    static String[] parseIdentifier(String tableIdentifier, Logger logger) 
throws IllegalArgumentException {
+        logger.trace("Parse identifier '{}'.", tableIdentifier);
+        if (StringUtils.isEmpty(tableIdentifier)) {
+            logger.error(ILLEGAL_ARGUMENT_MESSAGE, "table.identifier", 
+            throw new IllegalArgumentException("table.identifier", 
+        }
+        String[] identifier = tableIdentifier.split("\\.");
+        if (identifier.length != 2) {
+            logger.error(ILLEGAL_ARGUMENT_MESSAGE, "table.identifier", 
+            throw new IllegalArgumentException("table.identifier", 
+        }
+        return identifier;
+    }
+    /**
+     * choice a Doris FE node to request.
+     * @param feNodes Doris FE node list, separate be comma
+     * @param logger slf4j logger
+     * @return the chosen one Doris FE node
+     * @throws IllegalArgumentException fe nodes is illegal
+     */
+    @VisibleForTesting
+    static String randomEndpoint(String feNodes, Logger logger) throws 
IllegalArgumentException {
+        logger.trace("Parse fenodes '{}'.", feNodes);
+        if (StringUtils.isEmpty(feNodes)) {
+            logger.error(ILLEGAL_ARGUMENT_MESSAGE, "fenodes", feNodes);
+            throw new IllegalArgumentException("fenodes", feNodes);
+        }
+        List<String> nodes = Arrays.asList(feNodes.split(","));
+        Collections.shuffle(nodes);
+        return nodes.get(0).trim();
+    }
+    /**
+     * get a valid URI to connect Doris FE.
+     * @param cfg configuration of request
+     * @param logger {@link Logger}
+     * @return uri string
+     * @throws IllegalArgumentException throw when configuration is illegal
+     */
+    @VisibleForTesting
+    static String getUriStr(Settings cfg, Logger logger) throws 
IllegalArgumentException {
+        String[] identifier = 
parseIdentifier(cfg.getProperty(DORIS_TABLE_IDENTIFIER), logger);
+        return "http://"; +
+                randomEndpoint(cfg.getProperty(DORIS_FENODES), logger) + 
+                "/" + identifier[0] +
+                "/" + identifier[1] +
+                "/";
+    }
+    /**
+     * discover Doris table schema from Doris FE.
+     * @param cfg configuration of request
+     * @param logger slf4j logger
+     * @return Doris table schema
+     * @throws DorisException throw when discover failed
+     */
+    public static Schema getSchema(Settings cfg, Logger logger)
+            throws DorisException {
+        logger.trace("Finding schema.");
+        HttpGet httpGet = new HttpGet(getUriStr(cfg, logger) + SCHEMA);
+        String response = send(cfg, httpGet, logger);
+        logger.debug("Find schema response is '{}'.", response);
+        return parseSchema(response, logger);
+    }
+    /**
+     * translate Doris FE response to inner {@link Schema} struct.
+     * @param response Doris FE response
+     * @param logger {@link Logger}
+     * @return inner {@link Schema} struct
+     * @throws DorisException throw when translate failed
+     */
+    @VisibleForTesting
+    public static Schema parseSchema(String response, Logger logger) throws 
DorisException {
+        logger.trace("Parse response '{}' to schema.", response);
+        ObjectMapper mapper = new ObjectMapper();
+        Schema schema;
+        try {
+            schema = mapper.readValue(response, Schema.class);
+        } catch (JsonParseException e) {
+            String errMsg = "Doris FE's response is not a json. res: " + 
+            logger.error(errMsg, e);
+            throw new DorisException(errMsg, e);
+        } catch (JsonMappingException e) {
+            String errMsg = "Doris FE's response cannot map to schema. res: " 
+ response;
+            logger.error(errMsg, e);
+            throw new DorisException(errMsg, e);
+        } catch (IOException e) {
+            String errMsg = "Parse Doris FE's response to json failed. res: " 
+ response;
+            logger.error(errMsg, e);
+            throw new DorisException(errMsg, e);
+        }
+        if (schema == null) {
+            logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
+            throw new ShouldNeverHappenException();
+        }
+        if (schema.getStatus() != REST_RESPONSE_STATUS_OK) {
+            String errMsg = "Doris FE's response is not OK, status is " + 
+            logger.error(errMsg);
+            throw new DorisException(errMsg);
+        }
+        logger.debug("Parsing schema result is '{}'.", schema);
+        return schema;
+    }
+    /**
+     * find Doris RDD partitions from Doris FE.
+     * @param cfg configuration of request
+     * @param logger {@link Logger}
+     * @return an list of Doris RDD partitions
+     * @throws DorisException throw when find partition failed
+     */
+    public static List<PartitionDefinition> findPartitions(Settings cfg, 
Logger logger) throws DorisException {
+        logger.trace("Finding partitions.");
+        String[] tableIdentifiers = 
parseIdentifier(cfg.getProperty(DORIS_TABLE_IDENTIFIER), logger);
+        String sql = "select " + cfg.getProperty(DORIS_READ_FIELD, "*") +
+                " from `" + tableIdentifiers[0] + "`.`" + tableIdentifiers[1] 
+ "`";
+        if (!StringUtils.isEmpty(cfg.getProperty(DORIS_FILTER_QUERY))) {
+            sql += " where " + cfg.getProperty(DORIS_FILTER_QUERY);
+        }
+        logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql);
+        HttpPost httpPost = new HttpPost(getUriStr(cfg, logger) + QUERY_PLAN);
+        String entity = "{\"sql\": \""+ sql +"\"}";
+        logger.debug("Post body Sending to Doris FE is: '{}'.", entity);
+        StringEntity stringEntity = new StringEntity(entity, 
+        stringEntity.setContentEncoding("UTF-8");
+        stringEntity.setContentType("application/json");
+        httpPost.setEntity(stringEntity);
+        String resStr = send(cfg, httpPost, logger);
+        logger.debug("Find partition response is '{}'.", resStr);
+        QueryPlan queryPlan = getQueryPlan(resStr, logger);
+        Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan, 
+        return tabletsMapToPartition(
+                cfg,
+                be2Tablets,
+                queryPlan.getOpaqued_query_plan(),
+                tableIdentifiers[0],
+                tableIdentifiers[1],
+                logger);
+    }
+    /**
+     * translate Doris FE response string to inner {@link QueryPlan} struct.
+     * @param response Doris FE response string
+     * @param logger {@link Logger}
+     * @return inner {@link QueryPlan} struct
+     * @throws DorisException throw when translate failed.
+     */
+    @VisibleForTesting
+    static QueryPlan getQueryPlan(String response, Logger logger) throws 
DorisException {
+        logger.trace("Parsing fe response to query plan.");
+        ObjectMapper mapper = new ObjectMapper();
+        QueryPlan queryPlan;
+        try {
+            queryPlan = mapper.readValue(response, QueryPlan.class);
+        } catch (JsonParseException e) {
+            String errMsg = "Doris FE's response is not a json. res: " + 
+            logger.error(errMsg, e);
+            throw new DorisException(errMsg, e);
+        } catch (JsonMappingException e) {
+            String errMsg = "Doris FE's response cannot map to schema. res: " 
+ response;
+            logger.error(errMsg, e);
+            throw new DorisException(errMsg, e);
+        } catch (IOException e) {
+            String errMsg = "Parse Doris FE's response to json failed. res: " 
+ response;
+            logger.error(errMsg, e);
+            throw new DorisException(errMsg, e);
+        }
+        if (queryPlan == null) {
+            logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
+            throw new ShouldNeverHappenException();
+        }
+        if (queryPlan.getStatus() != REST_RESPONSE_STATUS_OK) {
+            String errMsg = "Doris FE's response is not OK, status is " + 
+            logger.error(errMsg);
+            throw new DorisException(errMsg);
+        }
+        logger.debug("Parsing partition result is '{}'.", queryPlan);
+        return queryPlan;
+    }
+    /**
+     * select which Doris BE to get tablet data.
+     * @param queryPlan {@link QueryPlan} translated from Doris FE response
+     * @param logger {@link Logger}
+     * @return BE to tablets {@link Map}
+     * @throws DorisException throw when select failed.
+     */
+    @VisibleForTesting
+    static  Map<String, List<Long>> selectBeForTablet(QueryPlan queryPlan, 
Logger logger) throws DorisException {
+        logger.trace("Choice tablet targeting Doris BE.");
+        Map<String, List<Long>> be2Tablets = new HashMap<>();
+        for (Map.Entry<String, Tablet> part : 
queryPlan.getPartitions().entrySet()) {
+            logger.debug("Parse tablet info: '{}'.", part);
+            long tabletId;
+            try {
+                tabletId = Long.parseLong(part.getKey());
