morningman commented on code in PR #21911: URL: https://github.com/apache/doris/pull/21911#discussion_r1278521853
########## fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java: ########## @@ -266,17 +278,101 @@ private void generateQueryStmt() { } } - List<TableRef> tableRefList = Lists.newArrayList(); - tableRefList.add(this.tableRef); - FromClause fromClause = new FromClause(tableRefList); + ArrayList<ArrayList<TableRef>> tableRefListPerQuery = splitTablets(stmt); + LOG.info("Export task is split into {} outfile statements.", tableRefListPerQuery.size()); - SelectStmt selectStmt = new SelectStmt(list, fromClause, this.whereExpr, null, - null, null, LimitElement.NO_LIMIT); - // generate outfile clause - OutFileClause outfile = new OutFileClause(this.exportPath, this.format, convertOutfileProperties()); - selectStmt.setOutFileClause(outfile); - selectStmt.setOrigStmt(new OriginStatement(selectStmt.toSql(), 0)); - selectStmtList.add(selectStmt); + if (LOG.isDebugEnabled()) { + for (int i = 0; i < tableRefListPerQuery.size(); i++) { + LOG.debug("Outfile clause {} is responsible for tables: {}", i, + tableRefListPerQuery.get(i).get(0).getSampleTabletIds()); + } + } + + for (ArrayList<TableRef> tableRefList : tableRefListPerQuery) { + FromClause fromClause = new FromClause(tableRefList); + // generate outfile clause + OutFileClause outfile = new OutFileClause(this.exportPath, this.format, convertOutfileProperties()); + SelectStmt selectStmt = new SelectStmt(list, fromClause, this.whereExpr, null, + null, null, LimitElement.NO_LIMIT); + selectStmt.setOutFileClause(outfile); + selectStmt.setOrigStmt(new OriginStatement(selectStmt.toSql(), 0)); + selectStmtList.add(selectStmt); + } + stmtExecutorList = Arrays.asList(new StmtExecutor[selectStmtList.size()]); + if (LOG.isDebugEnabled()) { + for (int i = 0; i < selectStmtList.size(); i++) { + LOG.debug("Outfile clause {} is: {}", i, selectStmtList.get(i).toSql()); + } + } + } + + private ArrayList<ArrayList<TableRef>> splitTablets(ExportStmt stmt) throws UserException { + // get tablets + Database db = Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(stmt.getTblName().getDb()); + OlapTable table = db.getOlapTableOrAnalysisException(stmt.getTblName().getTbl()); + List<Long> tabletIdList = Lists.newArrayList(); + table.readLock(); + try { + Collection<Partition> partitions = new ArrayList<Partition>(); + // get partitions + // user specifies partitions, already checked in ExportStmt + if (this.partitionNames != null) { + if (partitionNames.size() > Config.maximum_number_of_export_partitions) { + throw new UserException("The partitions number of this export job is larger than the maximum number" + + " of partitions allowed by a export job"); + } + for (String partName : this.partitionNames) { + partitions.add(table.getPartition(partName)); + } + } else { + if (table.getPartitions().size() > Config.maximum_number_of_export_partitions) { + throw new UserException("The partitions number of this export job is larger than the maximum number" + + " of partitions allowed by a export job"); + } + partitions = table.getPartitions(); + } + + // get tablets + for (Partition partition : partitions) { + partition.getVisibleVersion(); Review Comment: Unused code? ########## fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java: ########## @@ -266,17 +278,101 @@ private void generateQueryStmt() { } } - List<TableRef> tableRefList = Lists.newArrayList(); - tableRefList.add(this.tableRef); - FromClause fromClause = new FromClause(tableRefList); + ArrayList<ArrayList<TableRef>> tableRefListPerQuery = splitTablets(stmt); + LOG.info("Export task is split into {} outfile statements.", tableRefListPerQuery.size()); - SelectStmt selectStmt = new SelectStmt(list, fromClause, this.whereExpr, null, - null, null, LimitElement.NO_LIMIT); - // generate outfile clause - OutFileClause outfile = new OutFileClause(this.exportPath, this.format, convertOutfileProperties()); - selectStmt.setOutFileClause(outfile); - selectStmt.setOrigStmt(new OriginStatement(selectStmt.toSql(), 0)); - selectStmtList.add(selectStmt); + if (LOG.isDebugEnabled()) { + for (int i = 0; i < tableRefListPerQuery.size(); i++) { + LOG.debug("Outfile clause {} is responsible for tables: {}", i, + tableRefListPerQuery.get(i).get(0).getSampleTabletIds()); + } + } + + for (ArrayList<TableRef> tableRefList : tableRefListPerQuery) { + FromClause fromClause = new FromClause(tableRefList); + // generate outfile clause + OutFileClause outfile = new OutFileClause(this.exportPath, this.format, convertOutfileProperties()); + SelectStmt selectStmt = new SelectStmt(list, fromClause, this.whereExpr, null, + null, null, LimitElement.NO_LIMIT); + selectStmt.setOutFileClause(outfile); + selectStmt.setOrigStmt(new OriginStatement(selectStmt.toSql(), 0)); + selectStmtList.add(selectStmt); + } + stmtExecutorList = Arrays.asList(new StmtExecutor[selectStmtList.size()]); + if (LOG.isDebugEnabled()) { + for (int i = 0; i < selectStmtList.size(); i++) { + LOG.debug("Outfile clause {} is: {}", i, selectStmtList.get(i).toSql()); + } + } + } + + private ArrayList<ArrayList<TableRef>> splitTablets(ExportStmt stmt) throws UserException { + // get tablets + Database db = Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(stmt.getTblName().getDb()); + OlapTable table = db.getOlapTableOrAnalysisException(stmt.getTblName().getTbl()); + List<Long> tabletIdList = Lists.newArrayList(); + table.readLock(); + try { + Collection<Partition> partitions = new ArrayList<Partition>(); + // get partitions + // user specifies partitions, already checked in ExportStmt + if (this.partitionNames != null) { + if (partitionNames.size() > Config.maximum_number_of_export_partitions) { + throw new UserException("The partitions number of this export job is larger than the maximum number" + + " of partitions allowed by a export job"); + } + for (String partName : this.partitionNames) { + partitions.add(table.getPartition(partName)); + } + } else { + if (table.getPartitions().size() > Config.maximum_number_of_export_partitions) { + throw new UserException("The partitions number of this export job is larger than the maximum number" + + " of partitions allowed by a export job"); + } + partitions = table.getPartitions(); + } + + // get tablets + for (Partition partition : partitions) { + partition.getVisibleVersion(); + partitionToVersion.put(partition.getName(), partition.getVisibleVersion()); + for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) { + tabletIdList.addAll(index.getTabletIdsInOrder()); + } + } + } finally { + table.readUnlock(); + } + + Integer tabletsAllNum = tabletIdList.size(); + Integer tabletsNumPerQuery = tabletsAllNum / this.parallelNum; + Integer tabletsNumPerQueryRemainder = tabletsAllNum - tabletsNumPerQuery * this.parallelNum; + + Integer start = 0; + + ArrayList<ArrayList<TableRef>> tableRefListPerQuery = Lists.newArrayList(); + + int outfileNum = this.parallelNum; + if (tabletsAllNum < this.parallelNum) { + outfileNum = tabletsAllNum; + LOG.warn("The number of tablets is smaller than parallelism, set parallelism to tablets num."); Review Comment: This log does not contains any useful info, eg, job id, number of tablet, parallelism, etc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org