mymeiyi commented on code in PR #39986: URL: https://github.com/apache/doris/pull/39986#discussion_r1733770236
########## fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java: ########## @@ -102,21 +102,25 @@ public Object load(HttpServletRequest request, HttpServletResponse response, @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_stream_load", method = RequestMethod.PUT) public Object streamLoad(HttpServletRequest request, - HttpServletResponse response, - @PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) { - LOG.info("streamload action, db: {}, tbl: {}, headers: {}", db, table, getAllHeaders(request)); + HttpServletResponse response, + @PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) { + LOG.info("streamload action, db: {}, tbl: {}, headers: {}", db, table, getAllHeaders(request)); boolean groupCommit = false; String groupCommitStr = request.getHeader("group_commit"); - if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) { - groupCommit = true; - try { - if (isGroupCommitBlock(db, table)) { - String msg = "insert table " + table + GroupCommitPlanner.SCHEMA_CHANGE; - return new RestBaseResult(msg); + if (groupCommitStr != null) { + if (groupCommitStr.equalsIgnoreCase("async_mode")) { + groupCommit = true; + try { + if (isGroupCommitBlock(db, table)) { + String msg = "insert table " + table + GroupCommitPlanner.SCHEMA_CHANGE; + return new RestBaseResult(msg); + } + } catch (Exception e) { + LOG.info("exception:" + e); + return new RestBaseResult(e.getMessage()); } - } catch (Exception e) { - LOG.info("exception:" + e); - return new RestBaseResult(e.getMessage()); + } else if (groupCommitStr.equalsIgnoreCase("sync_mode")) { + groupCommit = true; } Review Comment: throw exception if group commit is set but none of async_mode or sync_mode ########## fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java: ########## @@ -647,4 +651,29 @@ private String getAllHeaders(HttpServletRequest request) { } return headers.toString(); } + + private Backend selectBackendForGroupCommit(String clusterName, HttpServletRequest req, long tableId, + boolean isCloud) + throws LoadException { + ConnectContext ctx = new ConnectContext(); + ctx.setEnv(Env.getCurrentEnv()); + ctx.setThreadLocalInfo(); + ctx.setRemoteIP(req.getRemoteAddr()); + // We set this variable to fulfill required field 'user' in + // TMasterOpRequest(FrontendService.thrift) + ctx.setQualifiedUser(Auth.ADMIN_USER); + ctx.setThreadLocalInfo(); + if (isCloud) { + ctx.setCloudCluster(clusterName); + } + + Backend backend = null; + try { + backend = Env.getCurrentEnv().getGroupCommitManager() + .selectBackendForGroupCommit(tableId, ctx, isCloud); + } catch (DdlException e) { + throw new RuntimeException(e); Review Comment: throw DdlException directly, RuntimeException is strange ########## fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java: ########## @@ -147,21 +151,29 @@ public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse boolean groupCommit = false; long tableId = -1; String groupCommitStr = request.getHeader("group_commit"); - if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) { - groupCommit = true; - try { - String[] pair = parseDbAndTb(sql); - Database db = Env.getCurrentInternalCatalog() - .getDbOrException(pair[0], s -> new TException("database is invalid for dbName: " + s)); - Table tbl = db.getTableOrException(pair[1], s -> new TException("table is invalid: " + s)); - tableId = tbl.getId(); - if (isGroupCommitBlock(pair[0], pair[1])) { - String msg = "insert table " + pair[1] + GroupCommitPlanner.SCHEMA_CHANGE; - return new RestBaseResult(msg); + if (groupCommitStr != null) { + if (groupCommitStr.equalsIgnoreCase("async_mode")) { + try { + groupCommit = true; + String[] pair = parseDbAndTb(sql); + Database db = Env.getCurrentInternalCatalog() + .getDbOrException(pair[0], s -> new TException("database is invalid for dbName: " + s)); + Table tbl = db.getTableOrException(pair[1], s -> new TException("table is invalid: " + s)); + tableId = tbl.getId(); + + if (groupCommitStr.equalsIgnoreCase("async_mode")) { + if (isGroupCommitBlock(pair[0], pair[1])) { + String msg = "insert table " + pair[1] + GroupCommitPlanner.SCHEMA_CHANGE; + return new RestBaseResult(msg); + } + + } + } catch (Exception e) { + LOG.info("exception:" + e); + return new RestBaseResult(e.getMessage()); } - } catch (Exception e) { - LOG.info("exception:" + e); - return new RestBaseResult(e.getMessage()); + } else if (groupCommitStr.equalsIgnoreCase("sync_mode")) { + groupCommit = true; Review Comment: table_id is -1 for sync_mode -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org