This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ab04a458aa [Enhancement](export) cancel all running coordinators when
execute cancel-export statement. (#15801)
ab04a458aa is described below
commit ab04a458aa991b261f5778d220a8610bd724a1bc
Author: Xiangyu Wang <[email protected]>
AuthorDate: Sun Jan 22 23:11:32 2023 +0800
[Enhancement](export) cancel all running coordinators when execute
cancel-export statement. (#15801)
---
.../src/main/java/org/apache/doris/load/ExportJob.java | 17 ++++++++++++++++-
1 file changed, 16 insertions(+), 1 deletion(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index 7a8c9cbe19..3b79ca70de 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -67,6 +67,7 @@ import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.SqlModeHelper;
import org.apache.doris.rewrite.ExprRewriter;
@@ -658,7 +659,21 @@ public class ExportJob implements Writable {
failMsg = new ExportFailMsg(type, msg);
}
if (updateState(ExportJob.JobState.CANCELLED, false)) {
- releaseSnapshotPaths();
+ // cancel all running coordinators, so that the scheduler's worker
thread will be released
+ for (Coordinator coordinator : coordList) {
+ Coordinator registeredCoordinator =
QeProcessorImpl.INSTANCE.getCoordinator(coordinator.getQueryId());
+ if (registeredCoordinator != null) {
+ registeredCoordinator.cancel();
+ }
+ }
+
+ // release snapshot
+ Status releaseSnapshotStatus = releaseSnapshotPaths();
+ if (!releaseSnapshotStatus.ok()) {
+ // snapshot will be removed by GC thread on BE, finally.
+ LOG.warn("failed to release snapshot for export job: {}. err:
{}", id,
+ releaseSnapshotStatus.getErrorMsg());
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]