morningman commented on code in PR #18325: URL: https://github.com/apache/doris/pull/18325#discussion_r1164813455
########## fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java: ########## @@ -547,6 +546,7 @@ private Env(boolean isCheckpointCatalog) { this.routineLoadManager = new RoutineLoadManager(); this.sqlBlockRuleMgr = new SqlBlockRuleMgr(); this.exportMgr = new ExportMgr(); + this.exportMgr.start(); Review Comment: Move this to `startMasterOnlyDaemonThreads()` ########## fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java: ########## @@ -67,7 +72,12 @@ public class ExportMgr { private Map<Long, ExportJob> idToJob = Maps.newHashMap(); // exportJobId to exportJob private Map<String, Long> labelToJobId = Maps.newHashMap(); + private MasterTaskExecutor exportingExecutor; + public ExportMgr() { + int poolSize = Config.export_running_job_num_limit == 0 ? 5 : Config.export_running_job_num_limit; + exportingExecutor = new MasterTaskExecutor("export-exporting-job", poolSize, true); + exportingExecutor.start(); Review Comment: Override the `start()` of `MasterDaemon` and move this `exportingExecutor.start()` to `start()` ########## fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java: ########## @@ -86,6 +96,44 @@ private void writeUnlock() { lock.writeLock().unlock(); } + @Override + protected void runAfterCatalogReady() { + List<ExportJob> jobs = getExportJobs(JobState.PENDING); + // Because exportJob may be replayed from log + // we also need handle EXPORTING state exportJob. + jobs.addAll(getExportJobs(JobState.EXPORTING)); Review Comment: I think we don't need to handle `exporting` state jobs here. All jobs replayed from log should be reset to PENDING. ########## fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java: ########## @@ -86,6 +96,44 @@ private void writeUnlock() { lock.writeLock().unlock(); } + @Override + protected void runAfterCatalogReady() { + List<ExportJob> jobs = getExportJobs(JobState.PENDING); + // Because exportJob may be replayed from log + // we also need handle EXPORTING state exportJob. + jobs.addAll(getExportJobs(JobState.EXPORTING)); + int runningJobNumLimit = Config.export_running_job_num_limit; + if (runningJobNumLimit > 0 && !jobs.isEmpty()) { Review Comment: No need to calc the remaining job num. The number of thread in executor pool is limit to 5, and the submitted job will be queued when all threads are taken. So here, we can simply do: 1. change the job's state to EXPORTING, so that it won't be scheduled again. 2. submit the job to the executor. ########## fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java: ########## @@ -281,67 +199,78 @@ private void registerProfile() { ProfileManager.getInstance().pushProfile(profile); } - private Status moveTmpFiles() { - FsBroker broker = null; - try { - String localIP = FrontendOptions.getLocalHostAddress(); - broker = Env.getCurrentEnv().getBrokerMgr().getBroker(job.getBrokerDesc().getName(), localIP); - } catch (AnalysisException e) { - String failMsg = "get broker failed. export job: " + job.getId() + ". msg: " + e.getMessage(); - LOG.warn(failMsg); - return new Status(TStatusCode.CANCELLED, failMsg); + private void handlePendingState() { + long dbId = job.getDbId(); + Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); + if (db == null) { + job.cancel(ExportFailMsg.CancelType.RUN_FAIL, "database does not exist"); + return; } - TNetworkAddress address = new TNetworkAddress(broker.ip, broker.port); - TPaloBrokerService.Client client = null; - try { - client = ClientPool.brokerPool.borrowObject(address); - } catch (Exception e) { - try { - client = ClientPool.brokerPool.borrowObject(address); - } catch (Exception e1) { - String failMsg = "create connection to broker(" + address + ") failed"; - LOG.warn(failMsg); - return new Status(TStatusCode.CANCELLED, failMsg); - } + + if (job.isReplayed()) { + // If the job is created from replay thread, all plan info will be lost. + // so the job has to be cancelled. + String failMsg = "FE restarted or Master changed during exporting. Job must be cancelled."; + job.cancel(ExportFailMsg.CancelType.RUN_FAIL, failMsg); + return; + } + + // make snapshots + Status snapshotStatus = makeSnapshots(); + if (!snapshotStatus.ok()) { + job.cancel(ExportFailMsg.CancelType.RUN_FAIL, snapshotStatus.getErrorMsg()); + return; + } + + if (job.updateState(ExportJob.JobState.EXPORTING)) { Review Comment: No need to persist `EXPORTING` state. The state should already be changed before submitting to the executor thread pool ########## fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java: ########## @@ -281,67 +199,78 @@ private void registerProfile() { ProfileManager.getInstance().pushProfile(profile); } - private Status moveTmpFiles() { - FsBroker broker = null; - try { - String localIP = FrontendOptions.getLocalHostAddress(); - broker = Env.getCurrentEnv().getBrokerMgr().getBroker(job.getBrokerDesc().getName(), localIP); - } catch (AnalysisException e) { - String failMsg = "get broker failed. export job: " + job.getId() + ". msg: " + e.getMessage(); - LOG.warn(failMsg); - return new Status(TStatusCode.CANCELLED, failMsg); + private void handlePendingState() { + long dbId = job.getDbId(); + Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); + if (db == null) { + job.cancel(ExportFailMsg.CancelType.RUN_FAIL, "database does not exist"); + return; } - TNetworkAddress address = new TNetworkAddress(broker.ip, broker.port); - TPaloBrokerService.Client client = null; - try { - client = ClientPool.brokerPool.borrowObject(address); - } catch (Exception e) { - try { - client = ClientPool.brokerPool.borrowObject(address); - } catch (Exception e1) { - String failMsg = "create connection to broker(" + address + ") failed"; - LOG.warn(failMsg); - return new Status(TStatusCode.CANCELLED, failMsg); - } + + if (job.isReplayed()) { + // If the job is created from replay thread, all plan info will be lost. + // so the job has to be cancelled. + String failMsg = "FE restarted or Master changed during exporting. Job must be cancelled."; + job.cancel(ExportFailMsg.CancelType.RUN_FAIL, failMsg); + return; + } + + // make snapshots + Status snapshotStatus = makeSnapshots(); + if (!snapshotStatus.ok()) { + job.cancel(ExportFailMsg.CancelType.RUN_FAIL, snapshotStatus.getErrorMsg()); + return; + } + + if (job.updateState(ExportJob.JobState.EXPORTING)) { + LOG.info("Exchange pending status to exporting status success. job: {}", job); + return; } - boolean failed = false; - Set<String> exportedFiles = job.getExportedFiles(); - List<String> newFiles = Lists.newArrayList(); - String exportPath = job.getExportPath(); - for (String exportedFile : exportedFiles) { - // move exportPath/__doris_tmp/file to exportPath/file - String file = exportedFile.substring(exportedFile.lastIndexOf("/") + 1); - String destPath = exportPath + "/" + file; - LOG.debug("rename {} to {}, export job: {}", exportedFile, destPath, job.getId()); - String failMsg = ""; - try { - TBrokerRenamePathRequest request = new TBrokerRenamePathRequest( - TBrokerVersion.VERSION_ONE, exportedFile, destPath, job.getBrokerDesc().getProperties()); - TBrokerOperationStatus tBrokerOperationStatus = null; - tBrokerOperationStatus = client.renamePath(request); - if (tBrokerOperationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) { - failed = true; - failMsg = "Broker renamePath failed. srcPath=" + exportedFile + ", destPath=" + destPath - + ", broker=" + address + ", msg=" + tBrokerOperationStatus.getMessage(); - return new Status(TStatusCode.CANCELLED, failMsg); - } else { - newFiles.add(destPath); + } + + private Status makeSnapshots() { Review Comment: Currently, we can comment out this `makeSnapshots()` method because it is not used. It can be reopened after we implement exporting tablet one by one ########## fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java: ########## @@ -308,13 +340,17 @@ private void checkProperties(Map<String, String> properties) throws UserExceptio properties.put(TABLET_NUMBER_PER_TASK_PROP, String.valueOf(Config.export_tablet_num_per_task)); } + // max_file_size + this.maxFileSize = properties.getOrDefault(OutFileClause.PROP_MAX_FILE_SIZE, ""); + if (properties.containsKey(LABEL)) { FeNameFormat.checkLabel(properties.get(LABEL)); } else { // generate a random label - String label = "export_" + UUID.randomUUID().toString(); + String label = "export_" + UUID.randomUUID(); properties.put(LABEL, label); } + lable = properties.get(LABEL); Review Comment: ```suggestion label = properties.get(LABEL); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org