This is an automated email from the ASF dual-hosted git repository.

lide pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 17b1df5ca9a [Improvement](set) enable admin_set_frontend_config can 
apply to all fe (#34751)
17b1df5ca9a is described below

commit 17b1df5ca9a5936ba49b9bba5b1967c081ecabac
Author: Yulei-Yang <yulei.yang0...@gmail.com>
AuthorDate: Tue May 14 10:47:13 2024 +0800

    [Improvement](set) enable admin_set_frontend_config can apply to all fe 
(#34751)
---
 fe/fe-core/src/main/cup/sql_parser.cup             |  10 +-
 .../apache/doris/analysis/AdminSetConfigStmt.java  |  19 +-
 .../main/java/org/apache/doris/catalog/Env.java    |  19 +-
 .../java/org/apache/doris/qe/FEOpExecutor.java     | 203 +++++++++++++++++++++
 4 files changed, 248 insertions(+), 3 deletions(-)

diff --git a/fe/fe-core/src/main/cup/sql_parser.cup 
b/fe/fe-core/src/main/cup/sql_parser.cup
index 6159221f1e6..c6eb2ec7eab 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -7242,7 +7242,15 @@ admin_stmt ::=
     :}
     | KW_ADMIN KW_SET KW_FRONTEND KW_CONFIG opt_key_value_map:configs
     {:
-        RESULT = new 
AdminSetConfigStmt(AdminSetConfigStmt.ConfigType.FRONTEND, configs);
+        RESULT = new 
AdminSetConfigStmt(AdminSetConfigStmt.ConfigType.FRONTEND, configs, false);
+    :}
+    | KW_ADMIN KW_SET KW_ALL KW_FRONTENDS KW_CONFIG opt_key_value_map:configs
+    {:
+        RESULT = new 
AdminSetConfigStmt(AdminSetConfigStmt.ConfigType.FRONTEND, configs, true);
+    :}
+    | KW_ADMIN KW_SET KW_FRONTEND KW_CONFIG opt_key_value_map:configs KW_ALL
+    {:
+        RESULT = new 
AdminSetConfigStmt(AdminSetConfigStmt.ConfigType.FRONTEND, configs, true);
     :}
     // deprecated
     | KW_ADMIN KW_SHOW KW_FRONTEND KW_CONFIG opt_wild_where
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminSetConfigStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminSetConfigStmt.java
index 166f9a70096..1d2e22ee878 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminSetConfigStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminSetConfigStmt.java
@@ -25,6 +25,7 @@ import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.UserException;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.OriginStatement;
 
 import com.google.common.collect.Maps;
 
@@ -38,22 +39,25 @@ public class AdminSetConfigStmt extends DdlStmt {
         BACKEND
     }
 
+    private boolean applyToAll;
     private ConfigType type;
     private Map<String, String> configs;
 
     private RedirectStatus redirectStatus = RedirectStatus.NO_FORWARD;
 
-    public AdminSetConfigStmt(ConfigType type, Map<String, String> configs) {
+    public AdminSetConfigStmt(ConfigType type, Map<String, String> configs, 
boolean applyToAll) {
         this.type = type;
         this.configs = configs;
         if (this.configs == null) {
             this.configs = Maps.newHashMap();
         }
+        this.applyToAll = applyToAll;
 
         // we have to analyze configs here to determine whether to forward it 
to master
         for (String key : this.configs.keySet()) {
             if (ConfigBase.checkIsMasterOnly(key)) {
                 redirectStatus = RedirectStatus.FORWARD_NO_SYNC;
+                this.applyToAll = false;
             }
         }
     }
@@ -66,6 +70,10 @@ public class AdminSetConfigStmt extends DdlStmt {
         return configs;
     }
 
+    public boolean isApplyToAll() {
+        return applyToAll;
+    }
+
     @Override
     public void analyze(Analyzer analyzer) throws AnalysisException, 
UserException {
         super.analyze(analyzer);
@@ -87,4 +95,13 @@ public class AdminSetConfigStmt extends DdlStmt {
     public RedirectStatus getRedirectStatus() {
         return redirectStatus;
     }
+
+    public OriginStatement getLocalSetStmt() {
+        OriginStatement stmt = this.getOrigStmt();
+        Object[] keyArr = configs.keySet().toArray();
+        String sql = String.format("ADMIN SET FRONTEND CONFIG (\"%s\" = 
\"%s\");",
+                keyArr[0].toString(), configs.get(keyArr[0].toString()));
+
+        return new OriginStatement(sql, stmt.idx);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 50bbe3aba72..58802dbb21d 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -206,6 +206,7 @@ import org.apache.doris.plugin.PluginMgr;
 import org.apache.doris.policy.PolicyMgr;
 import org.apache.doris.qe.AuditEventProcessor;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.FEOpExecutor;
 import org.apache.doris.qe.GlobalVariable;
 import org.apache.doris.qe.JournalObservable;
 import org.apache.doris.qe.VariableMgr;
@@ -5070,7 +5071,7 @@ public class Env {
         globalFunctionMgr.replayDropFunction(functionSearchDesc);
     }
 
-    public void setConfig(AdminSetConfigStmt stmt) throws DdlException {
+    public void setConfig(AdminSetConfigStmt stmt) throws Exception {
         Map<String, String> configs = stmt.getConfigs();
         Preconditions.checkState(configs.size() == 1);
 
@@ -5081,6 +5082,22 @@ public class Env {
                 throw new DdlException(e.getMessage());
             }
         }
+
+        if (stmt.isApplyToAll()) {
+            for (Frontend fe : Env.getCurrentEnv().getFrontends(null /* all 
*/)) {
+                if (!fe.isAlive() || 
fe.getHost().equals(Env.getCurrentEnv().getSelfNode().getHost())) {
+                    continue;
+                }
+
+                TNetworkAddress feAddr = new TNetworkAddress(fe.getHost(), 
fe.getRpcPort());
+                FEOpExecutor executor = new FEOpExecutor(feAddr, 
stmt.getLocalSetStmt(), ConnectContext.get(), false);
+                executor.execute();
+                if (executor.getStatusCode() != TStatusCode.OK.getValue()) {
+                    throw new DdlException(String.format("failed to apply to 
fe %s:%s, error message: %s",
+                        fe.getHost(), fe.getRpcPort(), executor.getErrMsg()));
+                }
+            }
+        }
     }
 
     public void replayBackendReplicasInfo(BackendReplicasInfo 
backendReplicasInfo) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
new file mode 100644
index 00000000000..f1fa13b2585
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.thrift.FrontendService;
+import org.apache.doris.thrift.TExpr;
+import org.apache.doris.thrift.TExprNode;
+import org.apache.doris.thrift.TMasterOpRequest;
+import org.apache.doris.thrift.TMasterOpResult;
+import org.apache.doris.thrift.TNetworkAddress;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+
+import java.util.Map;
+
+public class FEOpExecutor {
+    private static final Logger LOG = LogManager.getLogger(FEOpExecutor.class);
+
+    private static final float RPC_TIMEOUT_COEFFICIENT = 1.2f;
+
+    private final OriginStatement originStmt;
+    private final ConnectContext ctx;
+    private TMasterOpResult result;
+    private TNetworkAddress feAddr;
+
+    // the total time of thrift connectTime, readTime and writeTime
+    private int thriftTimeoutMs;
+
+    private boolean shouldNotRetry;
+
+    public FEOpExecutor(TNetworkAddress feAddress, OriginStatement originStmt, 
ConnectContext ctx, boolean isQuery) {
+        this.feAddr = feAddress;
+        this.originStmt = originStmt;
+        this.ctx = ctx;
+        this.thriftTimeoutMs = (int) (ctx.getExecTimeout() * 1000 * 
RPC_TIMEOUT_COEFFICIENT);
+        // if isQuery=false, we shouldn't retry twice when catch exception 
because of Idempotency
+        this.shouldNotRetry = !isQuery;
+    }
+
+    public void execute() throws Exception {
+        result = forward(feAddr, buildStmtForwardParams());
+    }
+
+    // Send request to specific fe
+    private TMasterOpResult forward(TNetworkAddress thriftAddress, 
TMasterOpRequest params) throws Exception {
+        if (!ctx.getEnv().isReady()) {
+            throw new Exception("Env is not ready");
+        }
+
+        FrontendService.Client client;
+        try {
+            client = ClientPool.frontendPool.borrowObject(thriftAddress, 
thriftTimeoutMs);
+        } catch (Exception e) {
+            // may throw NullPointerException. add err msg
+            throw new Exception("Failed to get fe client: " + 
thriftAddress.toString(), e);
+        }
+        final StringBuilder forwardMsg = new StringBuilder("forward to FE " + 
thriftAddress.toString());
+        forwardMsg.append(", statement id: ").append(ctx.getStmtId());
+        LOG.info(forwardMsg.toString());
+
+        boolean isReturnToPool = false;
+        try {
+            final TMasterOpResult result = client.forward(params);
+            isReturnToPool = true;
+            return result;
+        } catch (TTransportException e) {
+            // wrap the raw exception.
+            forwardMsg.append(" : failed");
+            Exception exception = new 
ForwardToFEException(forwardMsg.toString(), e);
+
+            boolean ok = ClientPool.frontendPool.reopen(client, 
thriftTimeoutMs);
+            if (!ok) {
+                throw exception;
+            }
+            if (shouldNotRetry || e.getType() == 
TTransportException.TIMED_OUT) {
+                throw exception;
+            } else {
+                LOG.warn(forwardMsg.append(" twice").toString(), e);
+                try {
+                    TMasterOpResult result = client.forward(params);
+                    isReturnToPool = true;
+                    return result;
+                } catch (TException ex) {
+                    throw exception;
+                }
+            }
+        } finally {
+            if (isReturnToPool) {
+                ClientPool.frontendPool.returnObject(thriftAddress, client);
+            } else {
+                ClientPool.frontendPool.invalidateObject(thriftAddress, 
client);
+            }
+        }
+    }
+
+    private TMasterOpRequest buildStmtForwardParams() {
+        TMasterOpRequest params = new TMasterOpRequest();
+        // node ident
+        params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
+        params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
+        params.setSql(originStmt.originStmt);
+        params.setStmtIdx(originStmt.idx);
+        params.setUser(ctx.getQualifiedUser());
+        params.setDefaultCatalog(ctx.getDefaultCatalog());
+        params.setDefaultDatabase(ctx.getDatabase());
+        params.setDb(ctx.getDatabase());
+        params.setUserIp(ctx.getRemoteIP());
+        params.setStmtId(ctx.getStmtId());
+        params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
+
+        String cluster = ctx.getClusterName();
+        if (!Strings.isNullOrEmpty(cluster)) {
+            params.setCluster(cluster);
+        }
+
+        // query options
+        
params.setQueryOptions(ctx.getSessionVariable().getQueryOptionVariables());
+        // session variables
+        
params.setSessionVariables(ctx.getSessionVariable().getForwardVariables());
+        params.setUserVariables(getForwardUserVariables(ctx.getUserVars()));
+        if (null != ctx.queryId()) {
+            params.setQueryId(ctx.queryId());
+        }
+        return params;
+    }
+
+    public int getStatusCode() {
+        if (result == null || !result.isSetStatusCode()) {
+            return ErrorCode.ERR_UNKNOWN_ERROR.getCode();
+        }
+        return result.getStatusCode();
+    }
+
+    public String getErrMsg() {
+        if (result == null) {
+            return ErrorCode.ERR_UNKNOWN_ERROR.getErrorMsg();
+        }
+        if (!result.isSetErrMessage()) {
+            return "";
+        }
+        return result.getErrMessage();
+    }
+
+    private Map<String, TExprNode> getForwardUserVariables(Map<String, 
LiteralExpr> userVariables) {
+        Map<String, TExprNode> forwardVariables = Maps.newHashMap();
+        for (Map.Entry<String, LiteralExpr> entry : userVariables.entrySet()) {
+            LiteralExpr literalExpr = entry.getValue();
+            TExpr tExpr = literalExpr.treeToThrift();
+            TExprNode tExprNode = tExpr.nodes.get(0);
+            forwardVariables.put(entry.getKey(), tExprNode);
+        }
+        return forwardVariables;
+    }
+
+    public static class ForwardToFEException extends RuntimeException {
+
+        private static final Map<Integer, String> TYPE_MSG_MAP =
+                ImmutableMap.<Integer, String>builder()
+                .put(TTransportException.UNKNOWN, "Unknown exception")
+                .put(TTransportException.NOT_OPEN, "Connection is not open")
+                .put(TTransportException.ALREADY_OPEN, "Connection has already 
opened up")
+                .put(TTransportException.TIMED_OUT, "Connection timeout")
+                .put(TTransportException.END_OF_FILE, "EOF")
+                .put(TTransportException.CORRUPTED_DATA, "Corrupted data")
+                .build();
+
+        private final String msg;
+
+        public ForwardToFEException(String msg, TTransportException exception) 
{
+            this.msg = msg + ", cause: " + 
TYPE_MSG_MAP.get(exception.getType()) + ", " + exception.getMessage();
+        }
+
+        @Override
+        public String getMessage() {
+            return msg;
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to