mymeiyi commented on code in PR #63594:
URL: https://github.com/apache/doris/pull/63594#discussion_r3296908189


##########
fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java:
##########
@@ -2779,13 +2784,86 @@ private void recordFinishedLoadJobRequestImpl(String 
label, long txnId, String d
                 EtlJobType.INSERT, createTime, failMsg, trackingUrl, 
firstErrorMsg, userIdentity, -1);
     }
 
+    private static int nextGroupCommitFollowerIndex(int followerCount) {
+        return Math.floorMod(GROUP_COMMIT_FOLLOWER_INDEX.getAndIncrement(), 
followerCount);
+    }
+
+    private TStreamLoadPutResult 
forwardGroupCommitStreamLoad(TStreamLoadPutRequest request) {
+        HostInfo selfNode = Env.getCurrentEnv().getSelfNode();
+        List<Frontend> followers = 
Env.getCurrentEnv().getFrontends(FrontendNodeType.FOLLOWER).stream()
+                .filter(fe -> fe.isAlive() && 
!(fe.getHost().equals(selfNode.getHost())
+                        && fe.getEditLogPort() == selfNode.getPort())).collect(
+                        Collectors.toList());
+        if (CollectionUtils.isEmpty(followers)) {
+            return null;
+        }
+
+        // check table enable light_schema_change and group commit does not 
block for schema change
+        TStreamLoadPutResult result = new TStreamLoadPutResult();
+        TStatus status = new TStatus(TStatusCode.OK);
+        result.setStatus(status);
+        try {
+            Database db = 
Env.getCurrentInternalCatalog().getDbOrDdlException(request.getDb());
+            OlapTable table = (OlapTable) 
db.getTableOrDdlException(request.getTbl());
+            if (!table.getTableProperty().getUseSchemaLightChange()) {
+                status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+                status.addToErrorMsgs(
+                        "table light_schema_change is false, can't do stream 
load with group commit mode");
+                return result;
+            }
+            if 
(Env.getCurrentEnv().getGroupCommitManager().isBlock(table.getId())) {
+                String msg = "insert table " + table.getId() + 
GroupCommitPlanner.SCHEMA_CHANGE;
+                LOG.info(msg);
+                status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+                status.addToErrorMsgs(msg);
+                return result;
+            }
+        } catch (Exception e) {
+            LOG.warn("failed to pre-check group commit stream load, fallback 
to local. db={}, tbl={}",
+                    request.getDb(), request.getTbl(), e);
+            return null;
+        }
+
+        int idx = nextGroupCommitFollowerIndex(followers.size());
+        Frontend follower = followers.get(idx);
+        TNetworkAddress address = new TNetworkAddress(follower.getHost(), 
follower.getRpcPort());
+        LOG.info("forward group commit stream load put to follower {}, db={}, 
tbl={}, groupCommitMode={}",
+                address, request.getDb(), request.getTbl(), 
request.getGroupCommitMode());
+        FrontendService.Client client = null;
+        boolean ok = false;
+        try {
+            client = ClientPool.frontendPool.borrowObject(address);
+            TStreamLoadPutResult streamLoadPutResult = 
client.streamLoadPut(request);
+            ok = true;

Review Comment:
   StreamLoadHandler.generatePlan() does not call 
GroupCommitManager.selectBackendForGroupCommit()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to