morningman commented on code in PR #44944: URL: https://github.com/apache/doris/pull/44944#discussion_r1869857965
########## fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java: ########## @@ -108,26 +108,24 @@ public void addExportJobAndRegisterTask(ExportJob job) throws Exception { } } unprotectAddJob(job); - // delete existing files - if (Config.enable_delete_existing_files && Boolean.parseBoolean(job.getDeleteExistingFiles())) { - if (job.getBrokerDesc() == null) { - throw new AnalysisException("Local file system does not support delete existing files"); - } - String fullPath = job.getExportPath(); - BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0, fullPath.lastIndexOf('/') + 1), - job.getBrokerDesc()); - } - Env.getCurrentEnv().getEditLog().logExportCreate(job); - // ATTN: Must add task after edit log, otherwise the job may finish before adding job. - job.getCopiedTaskExecutors().forEach(executor -> { - Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor); - }); - LOG.info("add export job. {}", job); - } finally { writeUnlock(); } - + // delete existing files + if (Config.enable_delete_existing_files && Boolean.parseBoolean(job.getDeleteExistingFiles())) { + if (job.getBrokerDesc() == null) { + throw new AnalysisException("Local file system does not support delete existing files"); + } + String fullPath = job.getExportPath(); + BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0, fullPath.lastIndexOf('/') + 1), + job.getBrokerDesc()); + } + Env.getCurrentEnv().getEditLog().logExportCreate(job); Review Comment: I think `Env.getCurrentEnv().getEditLog().logExportCreate(job);` should be done within the lock. 1. unprotectAddJob(job); 2. Env.getCurrentEnv().getEditLog().logExportCreate(job); 3. delete existing files 4. addMemoryTask ########## fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java: ########## @@ -119,15 +120,15 @@ public void tryPublish(Long jobId, Long taskId, TaskType taskType) { * * @param taskId task id */ - public void tryPublishTask(Long taskId) { + public void tryPublishTask(Long taskId) throws JobException { if (isClosed) { log.info("tryPublish failed, disruptor is closed, taskId: {}", taskId); return; } - try { + if (disruptor.getRingBuffer().hasAvailableCapacity(2)) { Review Comment: Add comment to explain what does `2` mean. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org