This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new c290aace53 [Improve][Zeta] Fix JobMaster reset app classloader twice (#7063) c290aace53 is described below commit c290aace5329cacfeb3bdf4fbd2d7d53ab3ebc76 Author: Jia Fan <fanjiaemi...@qq.com> AuthorDate: Wed Jun 26 20:42:17 2024 +0800 [Improve][Zeta] Fix JobMaster reset app classloader twice (#7063) --- .../seatunnel/engine/server/master/JobMaster.java | 63 +++++++++++----------- 1 file changed, 31 insertions(+), 32 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index 1b7bf6bdad..ae1f1f0de3 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -222,45 +222,44 @@ public class JobMaster { nodeEngine.getSerializationService(), classLoader, jobImmutableInformation.getLogicalDag()); - if (!restart - && !logicalDag.isStartWithSavePoint() - && ReadonlyConfig.fromMap(logicalDag.getJobConfig().getEnvOptions()) - .get(EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION) - .equals(SaveModeExecuteLocation.CLUSTER)) { - try { - Thread.currentThread().setContextClassLoader(classLoader); + try { + Thread.currentThread().setContextClassLoader(classLoader); + if (!restart + && !logicalDag.isStartWithSavePoint() + && ReadonlyConfig.fromMap(logicalDag.getJobConfig().getEnvOptions()) + .get(EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION) + .equals(SaveModeExecuteLocation.CLUSTER)) { logicalDag.getLogicalVertexMap().values().stream() .map(LogicalVertex::getAction) .filter(action -> action instanceof SinkAction) .map(sink -> ((SinkAction<?, ?, ?, ?>) sink).getSink()) .forEach(JobMaster::handleSaveMode); - } finally { - Thread.currentThread().setContextClassLoader(appClassLoader); } - } - final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple = - PlanUtils.fromLogicalDAG( - logicalDag, - nodeEngine, - jobImmutableInformation, - initializationTimestamp, - executorService, - flakeIdGenerator, - runningJobStateIMap, - runningJobStateTimestampsIMap, - engineConfig.getQueueType(), - engineConfig); - seaTunnelServer - .getClassLoaderService() - .releaseClassLoader( - jobImmutableInformation.getJobId(), - jobImmutableInformation.getPluginJarsUrls()); - // revert to app class loader, it may be changed by PlanUtils.fromLogicalDAG - Thread.currentThread().setContextClassLoader(appClassLoader); - this.physicalPlan = planTuple.f0(); - this.physicalPlan.setJobMaster(this); - this.checkpointPlanMap = planTuple.f1(); + final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple = + PlanUtils.fromLogicalDAG( + logicalDag, + nodeEngine, + jobImmutableInformation, + initializationTimestamp, + executorService, + flakeIdGenerator, + runningJobStateIMap, + runningJobStateTimestampsIMap, + engineConfig.getQueueType(), + engineConfig); + this.physicalPlan = planTuple.f0(); + this.physicalPlan.setJobMaster(this); + this.checkpointPlanMap = planTuple.f1(); + } finally { + // revert to app class loader, it may be changed by PlanUtils.fromLogicalDAG + Thread.currentThread().setContextClassLoader(appClassLoader); + seaTunnelServer + .getClassLoaderService() + .releaseClassLoader( + jobImmutableInformation.getJobId(), + jobImmutableInformation.getPluginJarsUrls()); + } Exception initException = null; try { this.initCheckPointManager(restart);