This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c290aace53 [Improve][Zeta] Fix JobMaster reset app classloader twice 
(#7063)
c290aace53 is described below

commit c290aace5329cacfeb3bdf4fbd2d7d53ab3ebc76
Author: Jia Fan <fanjiaemi...@qq.com>
AuthorDate: Wed Jun 26 20:42:17 2024 +0800

    [Improve][Zeta] Fix JobMaster reset app classloader twice (#7063)
---
 .../seatunnel/engine/server/master/JobMaster.java  | 63 +++++++++++-----------
 1 file changed, 31 insertions(+), 32 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 1b7bf6bdad..ae1f1f0de3 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -222,45 +222,44 @@ public class JobMaster {
                         nodeEngine.getSerializationService(),
                         classLoader,
                         jobImmutableInformation.getLogicalDag());
-        if (!restart
-                && !logicalDag.isStartWithSavePoint()
-                && 
ReadonlyConfig.fromMap(logicalDag.getJobConfig().getEnvOptions())
-                        .get(EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION)
-                        .equals(SaveModeExecuteLocation.CLUSTER)) {
-            try {
-                Thread.currentThread().setContextClassLoader(classLoader);
+        try {
+            Thread.currentThread().setContextClassLoader(classLoader);
+            if (!restart
+                    && !logicalDag.isStartWithSavePoint()
+                    && 
ReadonlyConfig.fromMap(logicalDag.getJobConfig().getEnvOptions())
+                            .get(EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION)
+                            .equals(SaveModeExecuteLocation.CLUSTER)) {
                 logicalDag.getLogicalVertexMap().values().stream()
                         .map(LogicalVertex::getAction)
                         .filter(action -> action instanceof SinkAction)
                         .map(sink -> ((SinkAction<?, ?, ?, ?>) sink).getSink())
                         .forEach(JobMaster::handleSaveMode);
-            } finally {
-                Thread.currentThread().setContextClassLoader(appClassLoader);
             }
-        }
 
-        final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple =
-                PlanUtils.fromLogicalDAG(
-                        logicalDag,
-                        nodeEngine,
-                        jobImmutableInformation,
-                        initializationTimestamp,
-                        executorService,
-                        flakeIdGenerator,
-                        runningJobStateIMap,
-                        runningJobStateTimestampsIMap,
-                        engineConfig.getQueueType(),
-                        engineConfig);
-        seaTunnelServer
-                .getClassLoaderService()
-                .releaseClassLoader(
-                        jobImmutableInformation.getJobId(),
-                        jobImmutableInformation.getPluginJarsUrls());
-        // revert to app class loader, it may be changed by 
PlanUtils.fromLogicalDAG
-        Thread.currentThread().setContextClassLoader(appClassLoader);
-        this.physicalPlan = planTuple.f0();
-        this.physicalPlan.setJobMaster(this);
-        this.checkpointPlanMap = planTuple.f1();
+            final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple 
=
+                    PlanUtils.fromLogicalDAG(
+                            logicalDag,
+                            nodeEngine,
+                            jobImmutableInformation,
+                            initializationTimestamp,
+                            executorService,
+                            flakeIdGenerator,
+                            runningJobStateIMap,
+                            runningJobStateTimestampsIMap,
+                            engineConfig.getQueueType(),
+                            engineConfig);
+            this.physicalPlan = planTuple.f0();
+            this.physicalPlan.setJobMaster(this);
+            this.checkpointPlanMap = planTuple.f1();
+        } finally {
+            // revert to app class loader, it may be changed by 
PlanUtils.fromLogicalDAG
+            Thread.currentThread().setContextClassLoader(appClassLoader);
+            seaTunnelServer
+                    .getClassLoaderService()
+                    .releaseClassLoader(
+                            jobImmutableInformation.getJobId(),
+                            jobImmutableInformation.getPluginJarsUrls());
+        }
         Exception initException = null;
         try {
             this.initCheckPointManager(restart);

Reply via email to