This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 38109c02530 Use ContextManager instead of PipelineContext (#36289)
38109c02530 is described below
commit 38109c025305f92f2c74786b43e1530176c66c69
Author: Liang Zhang <[email protected]>
AuthorDate: Wed Aug 13 21:41:51 2025 +0800
Use ContextManager instead of PipelineContext (#36289)
* Use ContextManager instead of PipelineContext
* Use ContextManager instead of PipelineContext
---
.../pipeline/core/context/PipelineContext.java | 35 ----------------------
.../core/context/PipelineContextManager.java | 9 +++---
.../pipeline/core/job/api/PipelineAPIFactory.java | 8 ++---
.../PipelineContextManagerLifecycleListener.java | 3 +-
.../core/util/PipelineDistributedBarrier.java | 2 +-
.../data/pipeline/cdc/api/CDCJobAPI.java | 11 ++++---
.../pipeline/cdc/handler/CDCBackendHandler.java | 4 +--
.../scenario/migration/api/MigrationJobAPI.java | 4 +--
.../migration/preparer/MigrationJobPreparer.java | 8 ++---
.../repository/PipelineGovernanceFacadeTest.java | 4 +--
.../core/util/PipelineDistributedBarrierTest.java | 7 ++---
.../MigrationDataConsistencyCheckerTest.java | 4 +--
.../pipeline/core/util/PipelineContextUtils.java | 7 ++---
13 files changed, 29 insertions(+), 77 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContext.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContext.java
deleted file mode 100644
index 8f76f79b8fc..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContext.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.context;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
-import org.apache.shardingsphere.mode.manager.ContextManager;
-
-/**
- * Pipeline context.
- */
-@RequiredArgsConstructor
-@Getter
-public final class PipelineContext {
-
- private final ModeConfiguration modeConfig;
-
- private final ContextManager contextManager;
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContextManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContextManager.java
index 7c8279a0f5f..0d63c35fea5 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContextManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContextManager.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.context;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+import org.apache.shardingsphere.mode.manager.ContextManager;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -30,7 +31,7 @@ import java.util.concurrent.ConcurrentHashMap;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class PipelineContextManager {
- private static final Map<PipelineContextKey, PipelineContext> CONTEXT_MAP
= new ConcurrentHashMap<>();
+ private static final Map<PipelineContextKey, ContextManager> CONTEXT_MAP =
new ConcurrentHashMap<>();
/**
* Get context.
@@ -38,7 +39,7 @@ public final class PipelineContextManager {
* @param key key
* @return context
*/
- public static PipelineContext getContext(final PipelineContextKey key) {
+ public static ContextManager getContext(final PipelineContextKey key) {
return CONTEXT_MAP.get(key);
}
@@ -47,7 +48,7 @@ public final class PipelineContextManager {
*
* @return context
*/
- public static PipelineContext getProxyContext() {
+ public static ContextManager getProxyContext() {
return CONTEXT_MAP.get(new PipelineContextKey(InstanceType.PROXY));
}
@@ -57,7 +58,7 @@ public final class PipelineContextManager {
* @param key key
* @param context context
*/
- public static void putContext(final PipelineContextKey key, final
PipelineContext context) {
+ public static void putContext(final PipelineContextKey key, final
ContextManager context) {
CONTEXT_MAP.put(key, context);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/api/PipelineAPIFactory.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/api/PipelineAPIFactory.java
index eba438ee6a4..f854bc268ea 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/api/PipelineAPIFactory.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/api/PipelineAPIFactory.java
@@ -22,7 +22,6 @@ import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
@@ -39,7 +38,6 @@ import
org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.statistics.S
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
-import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import java.util.Map;
@@ -65,8 +63,7 @@ public final class PipelineAPIFactory {
@Override
protected PipelineGovernanceFacade initialize() {
- ContextManager contextManager =
PipelineContextManager.getContext(contextKey).getContextManager();
- return new PipelineGovernanceFacade((ClusterPersistRepository)
contextManager.getPersistServiceFacade().getRepository());
+ return new PipelineGovernanceFacade((ClusterPersistRepository)
PipelineContextManager.getContext(contextKey).getPersistServiceFacade().getRepository());
}
}).get();
}
@@ -158,8 +155,7 @@ public final class PipelineAPIFactory {
private CoordinatorRegistryCenter createRegistryCenter(final
PipelineContextKey contextKey) {
CoordinatorRegistryCenterInitializer registryCenterInitializer =
new CoordinatorRegistryCenterInitializer();
- PipelineContext pipelineContext =
PipelineContextManager.getContext(contextKey);
- ModeConfiguration modeConfig = pipelineContext.getModeConfig();
+ ModeConfiguration modeConfig =
PipelineContextManager.getContext(contextKey).getComputeNodeInstanceContext().getModeConfiguration();
String elasticJobNamespace =
PipelineMetaDataNode.getElasticJobNamespace();
String clusterType = modeConfig.getRepository().getType();
ShardingSpherePreconditions.checkState("ZooKeeper".equals(clusterType), () ->
new IllegalArgumentException("Unsupported cluster type: " + clusterType));
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
index 26f9c69a0da..9b9c55d9704 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.core.listener;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
@@ -54,7 +53,7 @@ public final class PipelineContextManagerLifecycleListener
implements ContextMan
return;
}
PipelineContextKey contextKey = new
PipelineContextKey(preSelectedDatabaseName,
contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getType());
- PipelineContextManager.putContext(contextKey, new
PipelineContext(contextManager.getComputeNodeInstanceContext().getModeConfiguration(),
contextManager));
+ PipelineContextManager.putContext(contextKey, contextManager);
PipelineMetaDataNodeWatcher.getInstance(contextKey);
ElasticJobServiceLoader.registerTypedService(ElasticJobListener.class);
try {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
index 9b34b6f920b..01f4946de2b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
@@ -163,7 +163,7 @@ public final class PipelineDistributedBarrier {
@Override
protected ClusterPersistRepository initialize() {
- return (ClusterPersistRepository)
PipelineContextManager.getContext(contextKey).getContextManager().getPersistServiceFacade().getRepository();
+ return (ClusterPersistRepository)
PipelineContextManager.getContext(contextKey).getPersistServiceFacade().getRepository();
}
}
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 5bc6043bdda..eeec6867705 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -75,6 +75,7 @@ import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigur
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.Collections;
@@ -155,7 +156,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
sinkConfig.setSinkType(sinkType.name());
sinkConfig.setProps(sinkProps);
result.setSinkConfig(sinkConfig);
- ShardingSphereDatabase database =
PipelineContextManager.getContext(contextKey).getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getDatabaseName());
+ ShardingSphereDatabase database =
PipelineContextManager.getContext(contextKey).getMetaDataContexts().getMetaData().getDatabase(param.getDatabaseName());
result.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database)));
List<JobDataNodeLine> jobDataNodeLines =
JobDataNodeLineConvertUtils.convertDataNodesToLines(param.getTableAndDataNodesMap());
result.setJobShardingDataNodes(jobDataNodeLines.stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
@@ -282,7 +283,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
*/
public Collection<CDCJobItemInfo> getJobItemInfos(final String jobId) {
CDCJobConfiguration jobConfig = new
PipelineJobConfigurationManager(jobType).getJobConfiguration(jobId);
- ShardingSphereDatabase database =
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(jobConfig.getDatabaseName());
+ ShardingSphereDatabase database =
PipelineContextManager.getProxyContext().getMetaDataContexts().getMetaData().getDatabase(jobConfig.getDatabaseName());
Collection<CDCJobItemInfo> result = new LinkedList<>();
for (TransmissionJobItemInfo each : new
TransmissionJobManager(jobType).getJobItemInfos(jobId)) {
TransmissionJobItemProgress jobItemProgress =
each.getJobItemProgress();
@@ -300,8 +301,10 @@ public final class CDCJobAPI implements TransmissionJobAPI
{
if (!queryCurrentPositionSQL.isPresent()) {
return "";
}
- try (Connection connection =
storageUnit.getDataSource().getConnection()) {
- ResultSet resultSet =
connection.createStatement().executeQuery(queryCurrentPositionSQL.get());
+ try (
+ Connection connection =
storageUnit.getDataSource().getConnection();
+ Statement statement = connection.createStatement();
+ ResultSet resultSet =
statement.executeQuery(queryCurrentPositionSQL.get())) {
resultSet.next();
return resultSet.getString(1);
} catch (final SQLException ex) {
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index c27351249bd..27436fda802 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -97,7 +97,7 @@ public final class CDCBackendHandler {
* @return CDC response
*/
public CDCResponse streamData(final String requestId, final
StreamDataRequestBody requestBody, final CDCConnectionContext
connectionContext, final Channel channel) {
- ShardingSphereDatabase database =
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(requestBody.getDatabase());
+ ShardingSphereDatabase database =
PipelineContextManager.getProxyContext().getMetaDataContexts().getMetaData().getDatabase(requestBody.getDatabase());
ShardingSpherePreconditions.checkNotNull(database,
() -> new CDCExceptionWrapper(requestId, new
StreamDatabaseNotFoundException(String.format("%s database is not exists",
requestBody.getDatabase()))));
Map<String, Set<String>> schemaTableNameMap;
@@ -136,7 +136,7 @@ public final class CDCBackendHandler {
CDCJobConfiguration cdcJobConfig =
jobConfigManager.getJobConfiguration(jobId);
ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new
PipelineJobNotFoundException(jobId));
PipelineJobRegistry.stop(jobId);
- ShardingSphereDatabase database =
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName());
+ ShardingSphereDatabase database =
PipelineContextManager.getProxyContext().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName());
jobAPI.start(jobId, new PipelineCDCSocketSink(channel, database,
cdcJobConfig.getSchemaTableNames()));
connectionContext.setJobId(jobId);
}
diff --git
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
index c659b364d08..f923d79b470 100644
---
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
@@ -154,7 +154,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
}
}
result.setSources(configSources);
- ShardingSphereDatabase targetDatabase =
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(targetDatabaseName);
+ ShardingSphereDatabase targetDatabase =
PipelineContextManager.getProxyContext().getMetaDataContexts().getMetaData().getDatabase(targetDatabaseName);
PipelineDataSourceConfiguration targetPipelineDataSourceConfig =
buildTargetPipelineDataSourceConfiguration(targetDatabase);
result.setTarget(buildYamlPipelineDataSourceConfiguration(targetPipelineDataSourceConfig.getType(),
targetPipelineDataSourceConfig.getParameter()));
result.setTargetDatabaseType(targetPipelineDataSourceConfig.getDatabaseType().getType());
@@ -302,7 +302,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
private void refreshTableMetadata(final String jobId, final String
databaseName) {
// TODO use origin database name for now. It can be reduce metadata
refresh scope after reloadDatabaseMetaData case-sensitive problem fixed.
- ContextManager contextManager =
PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobId)).getContextManager();
+ ContextManager contextManager =
PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobId));
ShardingSphereDatabase database =
contextManager.getMetaDataContexts().getMetaData().getDatabase(databaseName);
contextManager.reloadDatabase(database);
}
diff --git
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 48a50a6e72a..8d080a0a8bd 100644
---
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -67,7 +67,6 @@ import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePrecondition
import
org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;
-import org.apache.shardingsphere.mode.lock.LockContext;
import org.apache.shardingsphere.mode.lock.LockDefinition;
import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.manager.cluster.lock.global.GlobalLockDefinition;
@@ -112,14 +111,13 @@ public final class MigrationJobPreparer implements
PipelineJobPreparer<Migration
MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
String jobId = jobConfig.getJobId();
PipelineContextKey contextKey =
PipelineJobIdUtils.parseContextKey(jobId);
- ContextManager contextManager =
PipelineContextManager.getContext(contextKey).getContextManager();
- LockContext lockContext = contextManager.getLockContext();
+ ContextManager contextManager =
PipelineContextManager.getContext(contextKey);
if (!jobItemManager.getProgress(jobId,
jobItemContext.getShardingItem()).isPresent()) {
jobItemManager.persistProgress(jobItemContext);
}
LockDefinition lockDefinition = new GlobalLockDefinition(new
MigrationPrepareLock(jobConfig.getJobId()));
long startTimeMillis = System.currentTimeMillis();
- if (lockContext.tryLock(lockDefinition, 600 * 1000L)) {
+ if (contextManager.getLockContext().tryLock(lockDefinition, 600 *
1000L)) {
log.info("Lock success, jobId={}, shardingItem={}, cost {} ms.",
jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() -
startTimeMillis);
try {
PipelineJobOffsetGovernanceRepository offsetRepository =
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getJobFacade().getOffset();
@@ -132,7 +130,7 @@ public final class MigrationJobPreparer implements
PipelineJobPreparer<Migration
}
} finally {
log.info("Unlock, jobId={}, shardingItem={}, cost {} ms.",
jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() -
startTimeMillis);
- lockContext.unlock(lockDefinition);
+ contextManager.getLockContext().unlock(lockDefinition);
}
} else {
log.warn("Lock failed, jobId={}, shardingItem={}.", jobId,
jobItemContext.getShardingItem());
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/registrycenter/repository/PipelineGovernanceFacadeTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/registrycenter/repository/PipelineGovernanceFacadeTest.java
index cda2d95198f..06052dd6b68 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/registrycenter/repository/PipelineGovernanceFacadeTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/registrycenter/repository/PipelineGovernanceFacadeTest.java
@@ -36,7 +36,6 @@ import
org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
-import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
import
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
@@ -189,8 +188,7 @@ class PipelineGovernanceFacadeTest {
}
private ClusterPersistRepository getClusterPersistRepository() {
- ContextManager contextManager =
PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getContextManager();
- return (ClusterPersistRepository)
contextManager.getPersistServiceFacade().getRepository();
+ return (ClusterPersistRepository)
PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getPersistServiceFacade().getRepository();
}
private MigrationJobItemContext mockJobItemContext() {
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
index c5f3523ec86..5422ce2f32c 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.data.pipeline.core.util;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import org.apache.shardingsphere.mode.spi.repository.PersistRepository;
import
org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
import
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
import org.junit.jupiter.api.BeforeAll;
@@ -45,8 +44,7 @@ class PipelineDistributedBarrierTest {
void assertRegisterAndRemove() throws ReflectiveOperationException {
String jobId =
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId();
PipelineContextKey contextKey = PipelineContextUtils.getContextKey();
- PersistRepository repository =
PipelineContextManager.getContext(contextKey).getContextManager().getPersistServiceFacade().getRepository();
- repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), "");
+
PipelineContextManager.getContext(contextKey).getPersistServiceFacade().getRepository().persist(PipelineMetaDataNode.getJobRootPath(jobId),
"");
PipelineDistributedBarrier instance =
PipelineDistributedBarrier.getInstance(contextKey);
String parentPath = "/barrier";
instance.register(parentPath, 1);
@@ -61,8 +59,7 @@ class PipelineDistributedBarrierTest {
void assertAwait() {
String jobId =
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId();
PipelineContextKey contextKey = PipelineContextUtils.getContextKey();
- PersistRepository repository =
PipelineContextManager.getContext(contextKey).getContextManager().getPersistServiceFacade().getRepository();
- repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), "");
+
PipelineContextManager.getContext(contextKey).getPersistServiceFacade().getRepository().persist(PipelineMetaDataNode.getJobRootPath(jobId),
"");
PipelineDistributedBarrier instance =
PipelineDistributedBarrier.getInstance(contextKey);
String barrierEnablePath =
PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
instance.register(barrierEnablePath, 1);
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index 22e05eb57cf..04f4574ad55 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -32,7 +32,6 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.sw
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
import
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
@@ -88,8 +87,7 @@ class MigrationDataConsistencyCheckerTest {
}
private ClusterPersistRepository getClusterPersistRepository() {
- ContextManager contextManager =
PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getContextManager();
- return (ClusterPersistRepository)
contextManager.getPersistServiceFacade().getRepository();
+ return (ClusterPersistRepository)
PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getPersistServiceFacade().getRepository();
}
private ConsistencyCheckJobItemProgressContext
createConsistencyCheckJobItemProgressContext(final String jobId) {
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
index d762ee4c9d2..fae73a7e974 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
@@ -20,7 +20,6 @@ package
org.apache.shardingsphere.test.it.data.pipeline.core.util;
import lombok.SneakyThrows;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
@@ -113,10 +112,8 @@ public final class PipelineContextUtils {
ClusterPersistRepository persistRepository =
getClusterPersistRepository(
(ClusterPersistRepositoryConfiguration)
contextManager.getComputeNodeInstanceContext().getModeConfiguration().getRepository());
MetaDataContexts metaDataContexts =
renewMetaDataContexts(contextManager.getMetaDataContexts(), new
MetaDataPersistFacade(persistRepository, true));
- PipelineContext pipelineContext = new
PipelineContext(contextManager.getComputeNodeInstanceContext().getModeConfiguration(),
- new ContextManager(metaDataContexts,
contextManager.getComputeNodeInstanceContext(),
- contextManager.getLockContext(),
contextManager.getPersistServiceFacade().getRepository()));
- PipelineContextManager.putContext(contextKey, pipelineContext);
+ PipelineContextManager.putContext(contextKey,
+ new ContextManager(metaDataContexts,
contextManager.getComputeNodeInstanceContext(),
contextManager.getLockContext(),
contextManager.getPersistServiceFacade().getRepository()));
}
@SneakyThrows({ReflectiveOperationException.class, SQLException.class})