This is an automated email from the ASF dual-hosted git repository. kirs pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 00be5d4a209 [fix](export) fix potential export concurrency issue (#43109) 00be5d4a209 is described below commit 00be5d4a2098eef19f9fdb2936f4269a34b56a41 Author: Mingyu Chen (Rayner) <morning...@163.com> AuthorDate: Sat Nov 2 08:35:44 2024 +0800 [fix](export) fix potential export concurrency issue (#43109) ### What problem does this PR solve? Problem Summary: ``` 2024-11-01 19:42:52,521 WARN (mysql-nio-pool-117|9514) [StmtExecutor.execute():616] Analyze failed. stmt[250257, 59c581a512e7468f-b1cfd7d4b63fed33] org.apache.doris.common.NereidsException: errCode = 2, detailMessage = java.util.ConcurrentModificationException at org.apache.doris.qe.StmtExecutor.executeByNereids(StmtExecutor.java:780) ~[doris-fe.jar:1.2-SNAPSHOT] at org.apache.doris.qe.StmtExecutor.execute(StmtExecutor.java:601) ~[doris-fe.jar:1.2-SNAPSHOT] at org.apache.doris.qe.StmtExecutor.queryRetry(StmtExecutor.java:564) ~[doris-fe.jar:1.2-SNAPSHOT] at org.apache.doris.qe.StmtExecutor.execute(StmtExecutor.java:554) ~[doris-fe.jar:1.2-SNAPSHOT] at org.apache.doris.qe.ConnectProcessor.executeQuery(ConnectProcessor.java:340) ~[doris-fe.jar:1.2-SNAPSHOT] at org.apache.doris.qe.ConnectProcessor.handleQuery(ConnectProcessor.java:243) ~[doris-fe.jar:1.2-SNAPSHOT] at org.apache.doris.qe.MysqlConnectProcessor.handleQuery(MysqlConnectProcessor.java:208) ~[doris-fe.jar:1.2-SNAPSHOT] at org.apache.doris.qe.MysqlConnectProcessor.dispatch(MysqlConnectProcessor.java:236) ~[doris-fe.jar:1.2-SNAPSHOT] at org.apache.doris.qe.MysqlConnectProcessor.processOnce(MysqlConnectProcessor.java:413) ~[doris-fe.jar:1.2-SNAPSHOT] at org.apache.doris.mysql.ReadListener.lambda$handleEvent$0(ReadListener.java:52) ~[doris-fe.jar:1.2-SNAPSHOT] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?] at java.lang.Thread.run(Thread.java:840) ~[?:?] Caused by: org.apache.doris.common.AnalysisException: errCode = 2, detailMessage = java.util.ConcurrentModificationException ... 13 more Caused by: java.util.ConcurrentModificationException at java.util.ArrayList.forEach(ArrayList.java:1513) ~[?:?] at org.apache.doris.load.ExportMgr.addExportJobAndRegisterTask(ExportMgr.java:120) ~[doris-fe.jar:1.2-SNAPSHOT] at org.apache.doris.nereids.trees.plans.commands.ExportCommand.run(ExportCommand.java:149) ~[doris-fe.jar:1.2-SNAPSHOT] at org.apache.doris.qe.StmtExecutor.executeByNereids(StmtExecutor.java:749) ~[doris-fe.jar:1.2-SNAPSHOT] ... 12 more ``` ### Check List (For Committer) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [x] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [x] Previous test can cover this change. - [ ] No colde files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [x] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [ ] No. - [ ] Yes. <!-- Add document PR link here. eg: https://github.com/apache/doris-website/pull/1214 --> - Release note <!-- bugfix, feat, behavior changed need a release note --> <!-- Add one line release note for this PR. --> None ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into --> --- fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java | 2 +- fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java | 4 ++-- fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java | 4 +--- .../org/apache/doris/nereids/trees/plans/commands/ExportCommand.java | 2 +- 4 files changed, 5 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java index a9ce85b2d3e..ba7aa50ec69 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java @@ -208,7 +208,7 @@ public class ExportStmt extends StatementBase implements NotFallbackInParser { } private void setJob() throws UserException { - exportJob = new ExportJob(); + exportJob = new ExportJob(Env.getCurrentEnv().getNextId()); Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(this.tblName.getDb()); exportJob.setDbId(db.getId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java index 5fe9c482633..9841ae2b11d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java @@ -396,8 +396,8 @@ public class ExportJob implements Writable { return statementBase; } - public List<? extends TransientTaskExecutor> getTaskExecutors() { - return jobExecutorList; + public List<? extends TransientTaskExecutor> getCopiedTaskExecutors() { + return Lists.newArrayList(jobExecutorList); } private void generateExportJobExecutor() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java index 80f738a4cdf..94f43d531bb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java @@ -95,8 +95,6 @@ public class ExportMgr { } public void addExportJobAndRegisterTask(ExportJob job) throws Exception { - long jobId = Env.getCurrentEnv().getNextId(); - job.setId(jobId); writeLock(); try { if (dbTolabelToExportJobId.containsKey(job.getDbId()) @@ -117,7 +115,7 @@ public class ExportMgr { BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0, fullPath.lastIndexOf('/') + 1), job.getBrokerDesc()); } - job.getTaskExecutors().forEach(executor -> { + job.getCopiedTaskExecutors().forEach(executor -> { Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor); }); Env.getCurrentEnv().getEditLog().logExportCreate(job); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java index dbf6cf7067e..38083e406b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java @@ -242,7 +242,7 @@ public class ExportCommand extends Command implements ForwardWithSync { private ExportJob generateExportJob(ConnectContext ctx, Map<String, String> fileProperties, TableName tblName) throws UserException { - ExportJob exportJob = new ExportJob(); + ExportJob exportJob = new ExportJob(Env.getCurrentEnv().getNextId()); // set export job and check catalog/db/table CatalogIf catalog = ctx.getEnv().getCatalogMgr().getCatalogOrAnalysisException(tblName.getCtl()); DatabaseIf db = catalog.getDbOrAnalysisException(tblName.getDb()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org