mymeiyi commented on code in PR #39986:
URL: https://github.com/apache/doris/pull/39986#discussion_r1733770236


##########
fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java:
##########
@@ -102,21 +102,25 @@ public Object load(HttpServletRequest request, 
HttpServletResponse response,
 
     @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + 
"}/_stream_load", method = RequestMethod.PUT)
     public Object streamLoad(HttpServletRequest request,
-                             HttpServletResponse response,
-                             @PathVariable(value = DB_KEY) String db, 
@PathVariable(value = TABLE_KEY) String table) {
-        LOG.info("streamload action, db: {}, tbl: {}, headers: {}", db, table, 
 getAllHeaders(request));
+            HttpServletResponse response,
+            @PathVariable(value = DB_KEY) String db, @PathVariable(value = 
TABLE_KEY) String table) {
+        LOG.info("streamload action, db: {}, tbl: {}, headers: {}", db, table, 
getAllHeaders(request));
         boolean groupCommit = false;
         String groupCommitStr = request.getHeader("group_commit");
-        if (groupCommitStr != null && 
groupCommitStr.equalsIgnoreCase("async_mode")) {
-            groupCommit = true;
-            try {
-                if (isGroupCommitBlock(db, table)) {
-                    String msg = "insert table " + table + 
GroupCommitPlanner.SCHEMA_CHANGE;
-                    return new RestBaseResult(msg);
+        if (groupCommitStr != null) {
+            if (groupCommitStr.equalsIgnoreCase("async_mode")) {
+                groupCommit = true;
+                try {
+                    if (isGroupCommitBlock(db, table)) {
+                        String msg = "insert table " + table + 
GroupCommitPlanner.SCHEMA_CHANGE;
+                        return new RestBaseResult(msg);
+                    }
+                } catch (Exception e) {
+                    LOG.info("exception:" + e);
+                    return new RestBaseResult(e.getMessage());
                 }
-            } catch (Exception e) {
-                LOG.info("exception:" + e);
-                return new RestBaseResult(e.getMessage());
+            } else if (groupCommitStr.equalsIgnoreCase("sync_mode")) {
+                groupCommit = true;
             }

Review Comment:
   throw exception if group commit is set but none of async_mode or sync_mode



##########
fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java:
##########
@@ -647,4 +651,29 @@ private String getAllHeaders(HttpServletRequest request) {
         }
         return headers.toString();
     }
+
+    private Backend selectBackendForGroupCommit(String clusterName, 
HttpServletRequest req, long tableId,
+            boolean isCloud)
+            throws LoadException {
+        ConnectContext ctx = new ConnectContext();
+        ctx.setEnv(Env.getCurrentEnv());
+        ctx.setThreadLocalInfo();
+        ctx.setRemoteIP(req.getRemoteAddr());
+        // We set this variable to fulfill required field 'user' in
+        // TMasterOpRequest(FrontendService.thrift)
+        ctx.setQualifiedUser(Auth.ADMIN_USER);
+        ctx.setThreadLocalInfo();
+        if (isCloud) {
+            ctx.setCloudCluster(clusterName);
+        }
+
+        Backend backend = null;
+        try {
+            backend = Env.getCurrentEnv().getGroupCommitManager()
+                    .selectBackendForGroupCommit(tableId, ctx, isCloud);
+        } catch (DdlException e) {
+            throw new RuntimeException(e);

Review Comment:
   throw DdlException directly, RuntimeException is strange



##########
fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java:
##########
@@ -147,21 +151,29 @@ public Object streamLoadWithSql(HttpServletRequest 
request, HttpServletResponse
         boolean groupCommit = false;
         long tableId = -1;
         String groupCommitStr = request.getHeader("group_commit");
-        if (groupCommitStr != null && 
groupCommitStr.equalsIgnoreCase("async_mode")) {
-            groupCommit = true;
-            try {
-                String[] pair = parseDbAndTb(sql);
-                Database db = Env.getCurrentInternalCatalog()
-                        .getDbOrException(pair[0], s -> new 
TException("database is invalid for dbName: " + s));
-                Table tbl = db.getTableOrException(pair[1], s -> new 
TException("table is invalid: " + s));
-                tableId = tbl.getId();
-                if (isGroupCommitBlock(pair[0], pair[1])) {
-                    String msg = "insert table " + pair[1] + 
GroupCommitPlanner.SCHEMA_CHANGE;
-                    return new RestBaseResult(msg);
+        if (groupCommitStr != null) {
+            if (groupCommitStr.equalsIgnoreCase("async_mode")) {
+                try {
+                    groupCommit = true;
+                    String[] pair = parseDbAndTb(sql);
+                    Database db = Env.getCurrentInternalCatalog()
+                            .getDbOrException(pair[0], s -> new 
TException("database is invalid for dbName: " + s));
+                    Table tbl = db.getTableOrException(pair[1], s -> new 
TException("table is invalid: " + s));
+                    tableId = tbl.getId();
+
+                    if (groupCommitStr.equalsIgnoreCase("async_mode")) {
+                        if (isGroupCommitBlock(pair[0], pair[1])) {
+                            String msg = "insert table " + pair[1] + 
GroupCommitPlanner.SCHEMA_CHANGE;
+                            return new RestBaseResult(msg);
+                        }
+
+                    }
+                } catch (Exception e) {
+                    LOG.info("exception:" + e);
+                    return new RestBaseResult(e.getMessage());
                 }
-            } catch (Exception e) {
-                LOG.info("exception:" + e);
-                return new RestBaseResult(e.getMessage());
+            } else if (groupCommitStr.equalsIgnoreCase("sync_mode")) {
+                groupCommit = true;

Review Comment:
   table_id is -1 for sync_mode



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to