This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 38109c02530 Use ContextManager instead of PipelineContext (#36289)
38109c02530 is described below

commit 38109c025305f92f2c74786b43e1530176c66c69
Author: Liang Zhang <[email protected]>
AuthorDate: Wed Aug 13 21:41:51 2025 +0800

    Use ContextManager instead of PipelineContext (#36289)
    
    * Use ContextManager instead of PipelineContext
    
    * Use ContextManager instead of PipelineContext
---
 .../pipeline/core/context/PipelineContext.java     | 35 ----------------------
 .../core/context/PipelineContextManager.java       |  9 +++---
 .../pipeline/core/job/api/PipelineAPIFactory.java  |  8 ++---
 .../PipelineContextManagerLifecycleListener.java   |  3 +-
 .../core/util/PipelineDistributedBarrier.java      |  2 +-
 .../data/pipeline/cdc/api/CDCJobAPI.java           | 11 ++++---
 .../pipeline/cdc/handler/CDCBackendHandler.java    |  4 +--
 .../scenario/migration/api/MigrationJobAPI.java    |  4 +--
 .../migration/preparer/MigrationJobPreparer.java   |  8 ++---
 .../repository/PipelineGovernanceFacadeTest.java   |  4 +--
 .../core/util/PipelineDistributedBarrierTest.java  |  7 ++---
 .../MigrationDataConsistencyCheckerTest.java       |  4 +--
 .../pipeline/core/util/PipelineContextUtils.java   |  7 ++---
 13 files changed, 29 insertions(+), 77 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContext.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContext.java
deleted file mode 100644
index 8f76f79b8fc..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContext.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.context;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
-import org.apache.shardingsphere.mode.manager.ContextManager;
-
-/**
- * Pipeline context.
- */
-@RequiredArgsConstructor
-@Getter
-public final class PipelineContext {
-    
-    private final ModeConfiguration modeConfig;
-    
-    private final ContextManager contextManager;
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContextManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContextManager.java
index 7c8279a0f5f..0d63c35fea5 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContextManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContextManager.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.context;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+import org.apache.shardingsphere.mode.manager.ContextManager;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -30,7 +31,7 @@ import java.util.concurrent.ConcurrentHashMap;
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public final class PipelineContextManager {
     
-    private static final Map<PipelineContextKey, PipelineContext> CONTEXT_MAP 
= new ConcurrentHashMap<>();
+    private static final Map<PipelineContextKey, ContextManager> CONTEXT_MAP = 
new ConcurrentHashMap<>();
     
     /**
      * Get context.
@@ -38,7 +39,7 @@ public final class PipelineContextManager {
      * @param key key
      * @return context
      */
-    public static PipelineContext getContext(final PipelineContextKey key) {
+    public static ContextManager getContext(final PipelineContextKey key) {
         return CONTEXT_MAP.get(key);
     }
     
@@ -47,7 +48,7 @@ public final class PipelineContextManager {
      *
      * @return context
      */
-    public static PipelineContext getProxyContext() {
+    public static ContextManager getProxyContext() {
         return CONTEXT_MAP.get(new PipelineContextKey(InstanceType.PROXY));
     }
     
@@ -57,7 +58,7 @@ public final class PipelineContextManager {
      * @param key key
      * @param context context
      */
-    public static void putContext(final PipelineContextKey key, final 
PipelineContext context) {
+    public static void putContext(final PipelineContextKey key, final 
ContextManager context) {
         CONTEXT_MAP.put(key, context);
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/api/PipelineAPIFactory.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/api/PipelineAPIFactory.java
index eba438ee6a4..f854bc268ea 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/api/PipelineAPIFactory.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/api/PipelineAPIFactory.java
@@ -22,7 +22,6 @@ import lombok.NoArgsConstructor;
 import lombok.SneakyThrows;
 import org.apache.commons.lang3.concurrent.ConcurrentException;
 import org.apache.commons.lang3.concurrent.LazyInitializer;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
@@ -39,7 +38,6 @@ import 
org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.statistics.S
 import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
-import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 
 import java.util.Map;
@@ -65,8 +63,7 @@ public final class PipelineAPIFactory {
             
             @Override
             protected PipelineGovernanceFacade initialize() {
-                ContextManager contextManager = 
PipelineContextManager.getContext(contextKey).getContextManager();
-                return new PipelineGovernanceFacade((ClusterPersistRepository) 
contextManager.getPersistServiceFacade().getRepository());
+                return new PipelineGovernanceFacade((ClusterPersistRepository) 
PipelineContextManager.getContext(contextKey).getPersistServiceFacade().getRepository());
             }
         }).get();
     }
@@ -158,8 +155,7 @@ public final class PipelineAPIFactory {
         
         private CoordinatorRegistryCenter createRegistryCenter(final 
PipelineContextKey contextKey) {
             CoordinatorRegistryCenterInitializer registryCenterInitializer = 
new CoordinatorRegistryCenterInitializer();
-            PipelineContext pipelineContext = 
PipelineContextManager.getContext(contextKey);
-            ModeConfiguration modeConfig = pipelineContext.getModeConfig();
+            ModeConfiguration modeConfig = 
PipelineContextManager.getContext(contextKey).getComputeNodeInstanceContext().getModeConfiguration();
             String elasticJobNamespace = 
PipelineMetaDataNode.getElasticJobNamespace();
             String clusterType = modeConfig.getRepository().getType();
             
ShardingSpherePreconditions.checkState("ZooKeeper".equals(clusterType), () -> 
new IllegalArgumentException("Unsupported cluster type: " + clusterType));
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
index 26f9c69a0da..9b9c55d9704 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.data.pipeline.core.listener;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
@@ -54,7 +53,7 @@ public final class PipelineContextManagerLifecycleListener 
implements ContextMan
             return;
         }
         PipelineContextKey contextKey = new 
PipelineContextKey(preSelectedDatabaseName, 
contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getType());
-        PipelineContextManager.putContext(contextKey, new 
PipelineContext(contextManager.getComputeNodeInstanceContext().getModeConfiguration(),
 contextManager));
+        PipelineContextManager.putContext(contextKey, contextManager);
         PipelineMetaDataNodeWatcher.getInstance(contextKey);
         ElasticJobServiceLoader.registerTypedService(ElasticJobListener.class);
         try {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
index 9b34b6f920b..01f4946de2b 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
@@ -163,7 +163,7 @@ public final class PipelineDistributedBarrier {
         
         @Override
         protected ClusterPersistRepository initialize() {
-            return (ClusterPersistRepository) 
PipelineContextManager.getContext(contextKey).getContextManager().getPersistServiceFacade().getRepository();
+            return (ClusterPersistRepository) 
PipelineContextManager.getContext(contextKey).getPersistServiceFacade().getRepository();
         }
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 5bc6043bdda..eeec6867705 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -75,6 +75,7 @@ import 
org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigur
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.time.LocalDateTime;
 import java.util.Collection;
 import java.util.Collections;
@@ -155,7 +156,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
         sinkConfig.setSinkType(sinkType.name());
         sinkConfig.setProps(sinkProps);
         result.setSinkConfig(sinkConfig);
-        ShardingSphereDatabase database = 
PipelineContextManager.getContext(contextKey).getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getDatabaseName());
+        ShardingSphereDatabase database = 
PipelineContextManager.getContext(contextKey).getMetaDataContexts().getMetaData().getDatabase(param.getDatabaseName());
         
result.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database)));
         List<JobDataNodeLine> jobDataNodeLines = 
JobDataNodeLineConvertUtils.convertDataNodesToLines(param.getTableAndDataNodesMap());
         
result.setJobShardingDataNodes(jobDataNodeLines.stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
@@ -282,7 +283,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
      */
     public Collection<CDCJobItemInfo> getJobItemInfos(final String jobId) {
         CDCJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(jobType).getJobConfiguration(jobId);
-        ShardingSphereDatabase database = 
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(jobConfig.getDatabaseName());
+        ShardingSphereDatabase database = 
PipelineContextManager.getProxyContext().getMetaDataContexts().getMetaData().getDatabase(jobConfig.getDatabaseName());
         Collection<CDCJobItemInfo> result = new LinkedList<>();
         for (TransmissionJobItemInfo each : new 
TransmissionJobManager(jobType).getJobItemInfos(jobId)) {
             TransmissionJobItemProgress jobItemProgress = 
each.getJobItemProgress();
@@ -300,8 +301,10 @@ public final class CDCJobAPI implements TransmissionJobAPI 
{
         if (!queryCurrentPositionSQL.isPresent()) {
             return "";
         }
-        try (Connection connection = 
storageUnit.getDataSource().getConnection()) {
-            ResultSet resultSet = 
connection.createStatement().executeQuery(queryCurrentPositionSQL.get());
+        try (
+                Connection connection = 
storageUnit.getDataSource().getConnection();
+                Statement statement = connection.createStatement();
+                ResultSet resultSet = 
statement.executeQuery(queryCurrentPositionSQL.get())) {
             resultSet.next();
             return resultSet.getString(1);
         } catch (final SQLException ex) {
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index c27351249bd..27436fda802 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -97,7 +97,7 @@ public final class CDCBackendHandler {
      * @return CDC response
      */
     public CDCResponse streamData(final String requestId, final 
StreamDataRequestBody requestBody, final CDCConnectionContext 
connectionContext, final Channel channel) {
-        ShardingSphereDatabase database = 
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(requestBody.getDatabase());
+        ShardingSphereDatabase database = 
PipelineContextManager.getProxyContext().getMetaDataContexts().getMetaData().getDatabase(requestBody.getDatabase());
         ShardingSpherePreconditions.checkNotNull(database,
                 () -> new CDCExceptionWrapper(requestId, new 
StreamDatabaseNotFoundException(String.format("%s database is not exists", 
requestBody.getDatabase()))));
         Map<String, Set<String>> schemaTableNameMap;
@@ -136,7 +136,7 @@ public final class CDCBackendHandler {
         CDCJobConfiguration cdcJobConfig = 
jobConfigManager.getJobConfiguration(jobId);
         ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new 
PipelineJobNotFoundException(jobId));
         PipelineJobRegistry.stop(jobId);
-        ShardingSphereDatabase database = 
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName());
+        ShardingSphereDatabase database = 
PipelineContextManager.getProxyContext().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName());
         jobAPI.start(jobId, new PipelineCDCSocketSink(channel, database, 
cdcJobConfig.getSchemaTableNames()));
         connectionContext.setJobId(jobId);
     }
diff --git 
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
index c659b364d08..f923d79b470 100644
--- 
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
@@ -154,7 +154,7 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
             }
         }
         result.setSources(configSources);
-        ShardingSphereDatabase targetDatabase = 
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(targetDatabaseName);
+        ShardingSphereDatabase targetDatabase = 
PipelineContextManager.getProxyContext().getMetaDataContexts().getMetaData().getDatabase(targetDatabaseName);
         PipelineDataSourceConfiguration targetPipelineDataSourceConfig = 
buildTargetPipelineDataSourceConfiguration(targetDatabase);
         
result.setTarget(buildYamlPipelineDataSourceConfiguration(targetPipelineDataSourceConfig.getType(),
 targetPipelineDataSourceConfig.getParameter()));
         
result.setTargetDatabaseType(targetPipelineDataSourceConfig.getDatabaseType().getType());
@@ -302,7 +302,7 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
     
     private void refreshTableMetadata(final String jobId, final String 
databaseName) {
         // TODO use origin database name for now. It can be reduce metadata 
refresh scope after reloadDatabaseMetaData case-sensitive problem fixed.
-        ContextManager contextManager = 
PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobId)).getContextManager();
+        ContextManager contextManager = 
PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobId));
         ShardingSphereDatabase database = 
contextManager.getMetaDataContexts().getMetaData().getDatabase(databaseName);
         contextManager.reloadDatabase(database);
     }
diff --git 
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 48a50a6e72a..8d080a0a8bd 100644
--- 
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -67,7 +67,6 @@ import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePrecondition
 import 
org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import org.apache.shardingsphere.infra.parser.SQLParserEngine;
-import org.apache.shardingsphere.mode.lock.LockContext;
 import org.apache.shardingsphere.mode.lock.LockDefinition;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.manager.cluster.lock.global.GlobalLockDefinition;
@@ -112,14 +111,13 @@ public final class MigrationJobPreparer implements 
PipelineJobPreparer<Migration
         MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
         String jobId = jobConfig.getJobId();
         PipelineContextKey contextKey = 
PipelineJobIdUtils.parseContextKey(jobId);
-        ContextManager contextManager = 
PipelineContextManager.getContext(contextKey).getContextManager();
-        LockContext lockContext = contextManager.getLockContext();
+        ContextManager contextManager = 
PipelineContextManager.getContext(contextKey);
         if (!jobItemManager.getProgress(jobId, 
jobItemContext.getShardingItem()).isPresent()) {
             jobItemManager.persistProgress(jobItemContext);
         }
         LockDefinition lockDefinition = new GlobalLockDefinition(new 
MigrationPrepareLock(jobConfig.getJobId()));
         long startTimeMillis = System.currentTimeMillis();
-        if (lockContext.tryLock(lockDefinition, 600 * 1000L)) {
+        if (contextManager.getLockContext().tryLock(lockDefinition, 600 * 
1000L)) {
             log.info("Lock success, jobId={}, shardingItem={}, cost {} ms.", 
jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - 
startTimeMillis);
             try {
                 PipelineJobOffsetGovernanceRepository offsetRepository = 
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getJobFacade().getOffset();
@@ -132,7 +130,7 @@ public final class MigrationJobPreparer implements 
PipelineJobPreparer<Migration
                 }
             } finally {
                 log.info("Unlock, jobId={}, shardingItem={}, cost {} ms.", 
jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - 
startTimeMillis);
-                lockContext.unlock(lockDefinition);
+                contextManager.getLockContext().unlock(lockDefinition);
             }
         } else {
             log.warn("Lock failed, jobId={}, shardingItem={}.", jobId, 
jobItemContext.getShardingItem());
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/registrycenter/repository/PipelineGovernanceFacadeTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/registrycenter/repository/PipelineGovernanceFacadeTest.java
index cda2d95198f..06052dd6b68 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/registrycenter/repository/PipelineGovernanceFacadeTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/registrycenter/repository/PipelineGovernanceFacadeTest.java
@@ -36,7 +36,6 @@ import 
org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.mode.event.DataChangedEvent;
 import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
-import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
@@ -189,8 +188,7 @@ class PipelineGovernanceFacadeTest {
     }
     
     private ClusterPersistRepository getClusterPersistRepository() {
-        ContextManager contextManager = 
PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getContextManager();
-        return (ClusterPersistRepository) 
contextManager.getPersistServiceFacade().getRepository();
+        return (ClusterPersistRepository) 
PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getPersistServiceFacade().getRepository();
     }
     
     private MigrationJobItemContext mockJobItemContext() {
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
index c5f3523ec86..5422ce2f32c 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.data.pipeline.core.util;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import org.apache.shardingsphere.mode.spi.repository.PersistRepository;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
 import org.junit.jupiter.api.BeforeAll;
@@ -45,8 +44,7 @@ class PipelineDistributedBarrierTest {
     void assertRegisterAndRemove() throws ReflectiveOperationException {
         String jobId = 
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId();
         PipelineContextKey contextKey = PipelineContextUtils.getContextKey();
-        PersistRepository repository = 
PipelineContextManager.getContext(contextKey).getContextManager().getPersistServiceFacade().getRepository();
-        repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), "");
+        
PipelineContextManager.getContext(contextKey).getPersistServiceFacade().getRepository().persist(PipelineMetaDataNode.getJobRootPath(jobId),
 "");
         PipelineDistributedBarrier instance = 
PipelineDistributedBarrier.getInstance(contextKey);
         String parentPath = "/barrier";
         instance.register(parentPath, 1);
@@ -61,8 +59,7 @@ class PipelineDistributedBarrierTest {
     void assertAwait() {
         String jobId = 
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId();
         PipelineContextKey contextKey = PipelineContextUtils.getContextKey();
-        PersistRepository repository = 
PipelineContextManager.getContext(contextKey).getContextManager().getPersistServiceFacade().getRepository();
-        repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), "");
+        
PipelineContextManager.getContext(contextKey).getPersistServiceFacade().getRepository().persist(PipelineMetaDataNode.getJobRootPath(jobId),
 "");
         PipelineDistributedBarrier instance = 
PipelineDistributedBarrier.getInstance(contextKey);
         String barrierEnablePath = 
PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
         instance.register(barrierEnablePath, 1);
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index 22e05eb57cf..04f4574ad55 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -32,7 +32,6 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.sw
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
@@ -88,8 +87,7 @@ class MigrationDataConsistencyCheckerTest {
     }
     
     private ClusterPersistRepository getClusterPersistRepository() {
-        ContextManager contextManager = 
PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getContextManager();
-        return (ClusterPersistRepository) 
contextManager.getPersistServiceFacade().getRepository();
+        return (ClusterPersistRepository) 
PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getPersistServiceFacade().getRepository();
     }
     
     private ConsistencyCheckJobItemProgressContext 
createConsistencyCheckJobItemProgressContext(final String jobId) {
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
index d762ee4c9d2..fae73a7e974 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
@@ -20,7 +20,6 @@ package 
org.apache.shardingsphere.test.it.data.pipeline.core.util;
 import lombok.SneakyThrows;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
@@ -113,10 +112,8 @@ public final class PipelineContextUtils {
         ClusterPersistRepository persistRepository = 
getClusterPersistRepository(
                 (ClusterPersistRepositoryConfiguration) 
contextManager.getComputeNodeInstanceContext().getModeConfiguration().getRepository());
         MetaDataContexts metaDataContexts = 
renewMetaDataContexts(contextManager.getMetaDataContexts(), new 
MetaDataPersistFacade(persistRepository, true));
-        PipelineContext pipelineContext = new 
PipelineContext(contextManager.getComputeNodeInstanceContext().getModeConfiguration(),
-                new ContextManager(metaDataContexts, 
contextManager.getComputeNodeInstanceContext(),
-                        contextManager.getLockContext(), 
contextManager.getPersistServiceFacade().getRepository()));
-        PipelineContextManager.putContext(contextKey, pipelineContext);
+        PipelineContextManager.putContext(contextKey,
+                new ContextManager(metaDataContexts, 
contextManager.getComputeNodeInstanceContext(), 
contextManager.getLockContext(), 
contextManager.getPersistServiceFacade().getRepository()));
     }
     
     @SneakyThrows({ReflectiveOperationException.class, SQLException.class})

Reply via email to