Copilot commented on code in PR #55274:
URL: https://github.com/apache/doris/pull/55274#discussion_r2361552779


##########
be/src/vec/exec/format/table/remote_doris_reader.cpp:
##########
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "remote_doris_reader.h"
+
+#include <iostream>
+#include <map>
+#include <memory>
+#include <string>
+
+#include "arrow/flight/client.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/memory_pool.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "common/status.h"
+#include "runtime/descriptors.h"
+#include "runtime/runtime_state.h"
+#include "runtime/types.h"
+#include "util/arrow/utils.h"
+#include "vec/core/block.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/core/types.h"
+
+namespace doris {
+class RuntimeProfile;
+class RuntimeState;
+
+namespace vectorized {
+class Block;
+} // namespace vectorized
+} // namespace doris
+
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+
+RemoteDorisReader::RemoteDorisReader(const std::vector<SlotDescriptor*>& 
file_slot_descs,
+                                     RuntimeState* state, RuntimeProfile* 
profile,
+                                     const TFileRangeDesc& range)
+        : _range(range), _file_slot_descs(file_slot_descs) {
+    TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, 
_ctzz);
+}
+
+Status RemoteDorisReader::init_reader() {
+    RETURN_DORIS_STATUS_IF_ERROR(init_stream());
+    DCHECK(_stream != nullptr);
+    return Status::OK();
+}
+
+Status RemoteDorisReader::get_next_block(Block* block, size_t* read_rows, 
bool* eof) {
+    arrow::flight::FlightStreamChunk chunk;
+    RETURN_DORIS_STATUS_IF_ERROR(_stream->Next().Value(&chunk));
+
+    if (!chunk.data) {
+        *read_rows = 0;
+        *eof = true;
+        return Status::OK();
+    }
+
+    // convert arrow batch to block
+    auto batch = chunk.data;
+    auto num_rows = batch->num_rows();
+    auto num_columns = batch->num_columns();
+    for (int c = 0; c < num_columns; ++c) {
+        arrow::Array* column = batch->column(c).get();
+
+        std::string column_name = batch->schema()->field(c)->name();
+
+        try {
+            const vectorized::ColumnWithTypeAndName& column_with_name =
+                    block->get_by_name(column_name);
+            
RETURN_IF_ERROR(column_with_name.type->get_serde()->read_column_from_arrow(
+                    column_with_name.column->assume_mutable_ref(), column, 0, 
num_rows, _ctzz));
+        } catch (Exception& e) {
+            return Status::InternalError("Failed to convert from arrow to 
block: {}", e.what());

Review Comment:
   The error message should include more context such as the column name and 
column index to help with debugging data conversion issues.
   ```suggestion
               return Status::InternalError(
                   "Failed to convert from arrow to block for column '{}' 
(index {}): {}",
                   column_name, c, e.what());
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/doris/source/RemoteDorisScanNode.java:
##########
@@ -0,0 +1,308 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.doris.source;
+
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.BoolLiteral;
+import org.apache.doris.analysis.DateLiteral;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.ExprSubstitutionMap;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.FileQueryScanNode;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.spi.Split;
+import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.thrift.TExplainLevel;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileRangeDesc;
+import org.apache.doris.thrift.TRemoteDorisFileDesc;
+import org.apache.doris.thrift.TTableFormatFileDesc;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.flight.FlightEndpoint;
+import org.apache.arrow.flight.FlightInfo;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.flight.grpc.CredentialCallOption;
+import org.apache.arrow.flight.sql.FlightSqlClient;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class RemoteDorisScanNode extends FileQueryScanNode {
+    private static final Logger LOG = 
LogManager.getLogger(RemoteDorisScanNode.class);
+
+    private static final int ADBC_EXEC_RETRY = 3;
+
+    private final List<String> columns = new ArrayList<String>();
+    private final List<String> filters = new ArrayList<String>();
+
+    private RemoteDorisSource source;
+
+    public RemoteDorisScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv,
+                               SessionVariable sv) {
+        super(id, desc, "REMOTE_DORIS_SCAN_NODE", 
StatisticalType.REMOTE_DORIS_SCAN_NODE, needCheckColumnPriv, sv);
+    }
+
+    @Override
+    protected void doInitialize() throws UserException {
+        super.doInitialize();
+        source = new RemoteDorisSource(desc);
+    }
+
+    @Override
+    public List<Split> getSplits(int numBackends) throws UserException {
+        List<Pair<String, ByteBuffer>> locationAndTicketList = 
executeAdbcQuery();
+
+        List<Split> splits = Lists.newArrayList();
+
+        for (Pair<String, ByteBuffer> locationAndTicket : 
locationAndTicketList) {
+            splits.add(new RemoteDorisSplit(locationAndTicket.first, 
locationAndTicket.second));
+        }
+
+        return splits;
+    }
+
+    @Override
+    protected void setScanParams(TFileRangeDesc rangeDesc, Split split) {
+        if (split instanceof RemoteDorisSplit) {
+            RemoteDorisSplit dorisArrowSplit = (RemoteDorisSplit) split;
+            TRemoteDorisFileDesc fileDesc = new TRemoteDorisFileDesc();
+            fileDesc.setIp(source.getHostAndArrowPort().key());
+            
fileDesc.setArrowPort(source.getHostAndArrowPort().value().toString());
+            fileDesc.setTicket(dorisArrowSplit.getTicket());
+            fileDesc.setLocationUri(dorisArrowSplit.getLocation());
+            fileDesc.setUser(source.getCatalog().getUsername());
+            fileDesc.setPassword(source.getCatalog().getPassword());
+
+            // set TTableFormatFileDesc
+            TTableFormatFileDesc tableFormatFileDesc = new 
TTableFormatFileDesc();
+            tableFormatFileDesc.setRemoteDorisParams(fileDesc);
+            tableFormatFileDesc.setTableFormatType(((RemoteDorisSplit) 
split).getTableFormatType().value());
+
+            // set TFileRangeDesc
+            rangeDesc.setTableFormatParams(tableFormatFileDesc);
+        }
+    }
+
+    @Override
+    public String getNodeExplainString(String prefix, TExplainLevel 
detailLevel) {
+        StringBuilder output = new StringBuilder();
+
+        output.append(prefix).append("TABLE: 
").append(source.getTargetTable().getExternalTableName()).append("\n");
+        if (detailLevel == TExplainLevel.BRIEF) {
+            return output.toString();
+        }
+        output.append(prefix).append("QUERY: 
").append(getAdbcQueryStr()).append("\n");
+        if (!conjuncts.isEmpty()) {
+            Expr expr = convertConjunctsToAndCompoundPredicate(conjuncts);
+            output.append(prefix).append("PREDICATES: 
").append(expr.toSql()).append("\n");
+        }
+
+        return output.toString();
+    }
+
+    @Override
+    protected TFileFormatType getFileFormatType() throws UserException {
+        return TFileFormatType.FORMAT_ARROW;
+    }
+
+    @Override
+    protected List<String> getPathPartitionKeys() throws UserException {
+        return new ArrayList<>();
+    }
+
+    @Override
+    protected TableIf getTargetTable() throws UserException {
+        return desc.getTable();
+    }
+
+    @Override
+    protected Map<String, String> getLocationProperties() throws UserException 
{
+        return source.getCatalog().getProperties();
+    }
+
+    // Send an adbc request and receive the response result from adbc
+    private List<Pair<String, ByteBuffer>> executeAdbcQuery() {
+        createAdbcColumns();
+        createAdbcFilters();
+
+        String queryStr = getAdbcQueryStr();
+        RuntimeException scratchExceptionForThrow = null;
+
+        for (int i = 0; i < ADBC_EXEC_RETRY; i++) {
+            try {
+                return getUriAndTickets(
+                    source.nextHostAndArrowPort(),
+                    source.getCatalog().getUsername(),
+                    source.getCatalog().getPassword(),
+                    queryStr
+                );
+            } catch (Exception e) {
+                LOG.warn("arrow request node [{}] failures {}, try next nodes",
+                        source.getHostAndArrowPort().toString(), e);
+                scratchExceptionForThrow = new 
RuntimeException(e.getMessage());
+            }
+        }
+
+        LOG.error("try all arrow nodes [{}], no other nodes left, sql:{}", 
source.getHostAndArrowPort(), queryStr);
+        throw scratchExceptionForThrow;
+    }
+
+    public static List<Pair<String, ByteBuffer>> getUriAndTickets(Pair<String, 
Integer> hostAndPort,
+                                                              String user, 
String psw, String sql) throws Exception {
+        List<Pair<String, ByteBuffer>> uniquePairs = new ArrayList<>();
+        FlightClient clientFE = null;
+        FlightSqlClient sqlClientFE = null;
+        try {
+            BufferAllocator allocatorFE = new RootAllocator(Integer.MAX_VALUE);
+            final Location clientLocationFE = new Location(
+                    new URI("grpc", null, hostAndPort.first,
+                        hostAndPort.second, null, null, null)
+            );
+
+            clientFE = FlightClient.builder(allocatorFE, 
clientLocationFE).build();
+            sqlClientFE = new FlightSqlClient(clientFE);
+
+            CredentialCallOption credentialCallOption = 
clientFE.authenticateBasicToken(user, psw).get();
+            FlightSqlClient.PreparedStatement preparedStatement = 
sqlClientFE.prepare(sql, credentialCallOption);
+            FlightInfo info = preparedStatement.execute(credentialCallOption);
+            List<FlightEndpoint> endpoints = info.getEndpoints();
+
+            Set<String> seenPairs = new HashSet<>();
+            for (FlightEndpoint endpoint : endpoints) {
+                ByteBuffer ticket = endpoint.getTicket().serialize();
+                String ticketStr = ticket.toString();
+
+                for (Location location : endpoint.getLocations()) {
+                    String uri = location.getUri().toString();
+                    String compositeKey = ticketStr + "|" + uri;
+
+                    if (seenPairs.add(compositeKey)) {
+                        uniquePairs.add(Pair.of(uri, ticket));
+                    }
+                }
+            }
+        } finally {
+            if (sqlClientFE != null) {
+                sqlClientFE.close();
+            }
+            if (clientFE != null) {
+                clientFE.close();
+            }
+        }
+
+        return uniquePairs;
+    }
+
+    private void createAdbcColumns() {
+        columns.clear();
+        for (SlotDescriptor slot : desc.getSlots()) {
+            if (!slot.isMaterialized()) {
+                continue;
+            }
+            Column col = slot.getColumn();
+            columns.add("`" + col.getName() + "`");
+        }
+        if (columns.isEmpty()) {
+            columns.add("*");
+        }
+    }
+
+    private String getAdbcQueryStr() {
+        StringBuilder sql = new StringBuilder("SELECT ");
+
+        if (source.getCatalog().enableParallelResultSink()) {
+            sql.append("/*+ SET_VAR(enable_parallel_result_sink=true) */ ");
+        }
+
+        sql.append(Joiner.on(", ").join(columns));
+
+        sql.append(" FROM 
").append(source.getTargetTable().getExternalTableName());
+
+        if (!filters.isEmpty()) {
+            sql.append(" WHERE (");
+            sql.append(Joiner.on(") AND (").join(filters));
+            sql.append(")");
+        }
+
+        if (limit != -1) {
+            sql.append(" LIMIT ").append(limit);
+        }
+
+        return sql.toString();
+    }
+
+    private void createAdbcFilters() {
+        if (conjuncts.isEmpty()) {
+            return;
+        }
+
+        List<SlotRef> slotRefs = Lists.newArrayList();
+        Expr.collectList(conjuncts, SlotRef.class, slotRefs);
+        ExprSubstitutionMap sMap = new ExprSubstitutionMap();
+        for (SlotRef slotRef : slotRefs) {
+            SlotRef slotRef1 = (SlotRef) slotRef.clone();
+            slotRef1.setTblName(null);
+            slotRef1.setLabel("`" + slotRef1.getColumnName() + "`");
+            sMap.put(slotRef, slotRef1);
+        }
+
+        ArrayList<Expr> conjunctsList = Expr.cloneList(conjuncts, sMap);
+        for (Expr expr : conjunctsList) {
+            String filter = conjunctExprToString(expr, desc.getTable());
+            filters.add(filter);
+        }
+    }
+
+    private String conjunctExprToString(Expr expr, TableIf tbl) {
+        if (expr.contains(DateLiteral.class) && expr instanceof 
BinaryPredicate) {
+            ArrayList<Expr> children = expr.getChildren();
+            String filter = 
children.get(0).toExternalSql(TableIf.TableType.DORIS_EXTERNAL_TABLE, tbl);
+            filter += " " + ((BinaryPredicate) expr).getOp().toString() + " ";
+
+            filter += 
children.get(1).toExternalSql(TableIf.TableType.DORIS_EXTERNAL_TABLE, tbl);
+
+            return filter;
+        }
+
+        // Only for old planner
+        if (expr.contains(BoolLiteral.class) && 
"1".equals(expr.getStringValue()) && expr.getChildren().isEmpty()) {
+            return "1 = 1";
+        }

Review Comment:
   This magic string '1' should be defined as a constant to improve code 
maintainability and make the logic clearer.



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/doris/source/RemoteDorisScanNode.java:
##########
@@ -0,0 +1,308 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.doris.source;
+
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.BoolLiteral;
+import org.apache.doris.analysis.DateLiteral;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.ExprSubstitutionMap;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.FileQueryScanNode;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.spi.Split;
+import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.thrift.TExplainLevel;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileRangeDesc;
+import org.apache.doris.thrift.TRemoteDorisFileDesc;
+import org.apache.doris.thrift.TTableFormatFileDesc;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.flight.FlightEndpoint;
+import org.apache.arrow.flight.FlightInfo;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.flight.grpc.CredentialCallOption;
+import org.apache.arrow.flight.sql.FlightSqlClient;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class RemoteDorisScanNode extends FileQueryScanNode {
+    private static final Logger LOG = 
LogManager.getLogger(RemoteDorisScanNode.class);
+
+    private static final int ADBC_EXEC_RETRY = 3;
+
+    private final List<String> columns = new ArrayList<String>();

Review Comment:
   Consider making the retry count configurable through a system property or 
configuration parameter instead of using a hard-coded magic number.
   ```suggestion
       private static final int DEFAULT_ADBC_EXEC_RETRY = 3;
       private static final int ADBC_EXEC_RETRY = getAdbcExecRetry();
   
       private static int getAdbcExecRetry() {
           String prop = System.getProperty("doris.adbc.exec.retry");
           if (prop != null) {
               try {
                   int value = Integer.parseInt(prop);
                   if (value > 0) {
                       return value;
                   }
               } catch (NumberFormatException e) {
                   // Ignore and use default
               }
           }
           return DEFAULT_ADBC_EXEC_RETRY;
       }
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/doris/RemoteDorisRestClient.java:
##########
@@ -0,0 +1,296 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.doris;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.common.util.JsonUtil;
+import org.apache.doris.httpv2.entity.ResponseBody;
+import org.apache.doris.httpv2.rest.HealthAction;
+import org.apache.doris.httpv2.rest.RestApiStatusCode;
+import org.apache.doris.httpv2.rest.response.GsonSchemaResponse;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.gson.reflect.TypeToken;
+import okhttp3.Credentials;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.apache.http.HttpHeaders;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.util.Strings;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.security.SecureRandom;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+
+/**
+ * Use this restClient when the remote Doris cluster is the same version as 
the current cluster,
+ * ensuring complete tableSchema compatibility.
+ */
+public class RemoteDorisRestClient {
+    private static final Logger LOG = 
LogManager.getLogger(RemoteDorisRestClient.class);
+
+    private static final OkHttpClient networkClient = new OkHttpClient
+            .Builder().readTimeout(10, TimeUnit.SECONDS).build();
+
+    private static OkHttpClient sslNetworkClient;
+    private final Request.Builder builder;
+    private final List<String> feNodes;
+    private String currentNode;
+    private int currentNodeIndex = 0;
+    private final boolean httpSslEnable;
+
+    /**
+     * For DorisTable.
+     **/
+    public RemoteDorisRestClient(List<String> feNodes, String authUser, String 
authPassword, boolean httpSslEnable) {
+        this.feNodes = feNodes;
+        this.builder = new Request.Builder();
+        if (!Strings.isEmpty(authUser)) {
+            this.builder.addHeader(HttpHeaders.AUTHORIZATION,
+                    Credentials.basic(authUser, Strings.isEmpty(authPassword) 
? "" : authPassword));
+        }
+        this.currentNode = feNodes.get(currentNodeIndex);
+        this.httpSslEnable = httpSslEnable;
+    }
+
+    public List<String> getDatabaseNameList() {
+        return 
parseStringLists(execute("api/meta/namespaces/default_cluster/databases"));
+    }
+
+    public List<String> getTablesNameList(String dbName) {
+        return 
parseStringLists(execute("api/meta/namespaces/default_cluster/databases/" + 
dbName + "/tables"));
+    }
+
+    public boolean isTableExist(String dbName, String tableName) {
+        return parseSuccessResponse(execute("api/" + dbName + "/" + tableName 
+ "/_schema"));
+    }
+
+    public boolean health() {
+        int aliveBeNum = parseOnlineBeNum(execute("api/health"));
+        return aliveBeNum > 0;
+    }
+
+    public List<Column> getColumns(String dbName, String tableName) {
+        return parseColumns(execute("api/" + dbName + "/" + tableName + 
"/_gson_schema"));
+    }
+
+    public long getRowCount(String dbName, String tableName) {
+        return parseRowCount(execute("api/rowcount?db=" + dbName + "&table=" + 
tableName));
+    }
+
+    public static List<String> parseStringLists(String executeResult) {
+        ResponseBody<ArrayList<String>> databasesResponse = parseResponse(
+            new TypeToken<ArrayList<String>>() {},
+                executeResult);
+        if (successResponse(databasesResponse)) {
+            return databasesResponse.getData();
+        }
+        return new ArrayList<>();
+    }
+
+    public static boolean parseSuccessResponse(String executeResult) {
+        ObjectNode objectNode = JsonUtil.parseObject(executeResult);
+        Integer code = JsonUtil.safeGetAsInt(objectNode, "code");
+        return code != null && code == RestApiStatusCode.OK.code;
+    }
+
+    public static int parseOnlineBeNum(String executeResult) {
+        ResponseBody<HashMap<String, Integer>> healthResponse = parseResponse(
+            new TypeToken<HashMap<String, Integer>>() {},
+                executeResult);
+        if (successResponse(healthResponse)) {
+            return 
healthResponse.getData().get(HealthAction.ONLINE_BACKEND_NUM);
+        }
+        throw new RuntimeException("get doris table schema error, msg: " + 
healthResponse.getMsg());
+    }
+
+    public static List<Column> parseColumns(String executeResult) {
+        ResponseBody<GsonSchemaResponse> getColumnsResponse = parseResponse(
+            new TypeToken<GsonSchemaResponse>(){},
+                executeResult);
+        if (successResponse(getColumnsResponse)) {
+            return getColumnsResponse.getData().getJsonColumns().stream()
+                .map(json -> GsonUtils.GSON.fromJson(json, Column.class))
+                .collect(Collectors.toList());
+        }
+        throw new RuntimeException("get doris table schema error, msg: " + 
getColumnsResponse.getMsg());
+    }
+
+    public static long parseRowCount(String executeResult) {
+        ResponseBody<HashMap<String, Long>> rowCountResponse = parseResponse(
+            new TypeToken<HashMap<String, Long>>() {},
+                executeResult);
+        if (successResponse(rowCountResponse)) {
+            return rowCountResponse.getData().values().iterator().next();
+        }
+        throw new RuntimeException("get doris table row count error, msg: " + 
rowCountResponse.getMsg());
+    }
+
+    private void selectNextNode() {
+        currentNodeIndex++;
+        currentNodeIndex = currentNodeIndex % feNodes.size();
+        currentNode = feNodes.get(currentNodeIndex);
+    }
+
+    public OkHttpClient getClient() {
+        if (httpSslEnable) {
+            return getOrCreateSslNetworkClient();
+        }
+        return networkClient;
+    }
+
+    /**
+     * init ssl networkClient use lazy way
+     **/
+    private synchronized OkHttpClient getOrCreateSslNetworkClient() {
+        if (sslNetworkClient == null) {
+            sslNetworkClient = new OkHttpClient.Builder().readTimeout(10, 
TimeUnit.SECONDS)
+                .sslSocketFactory(createSSLSocketFactory(), new 
TrustAllCerts())
+                .hostnameVerifier(new 
RemoteDorisRestClient.TrustAllHostnameVerifier()).build();
+        }
+        return sslNetworkClient;
+    }
+
+    private Response executeResponse(OkHttpClient httpClient, String path) 
throws IOException {
+        currentNode = currentNode.trim();
+        if (!(currentNode.startsWith("http://";) || 
currentNode.startsWith("https://";))) {
+            currentNode = "http://"; + currentNode;
+        }
+        if (!currentNode.endsWith("/")) {
+            currentNode = currentNode + "/";
+        }
+        Request request = builder.get().url(currentNode + path).build();
+        if (LOG.isInfoEnabled()) {
+            LOG.info("doris rest client request URL: {}", 
request.url().toString());
+        }
+        return httpClient.newCall(request).execute();
+    }
+
+    /**
+     * execute request for specific path,it will try again nodes.length times 
if it fails
+     *
+     * @param path the path must not leading with '/'
+     * @return response
+     */
+    protected String execute(String path) {
+        // try 3 times for every node
+        int retrySize = feNodes.size() * 3;

Review Comment:
   The retry multiplier '3' should be defined as a named constant to improve 
code readability and maintainability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to