This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5555e8f7275 Simplify lock for scaling job (#18935)
5555e8f7275 is described below
commit 5555e8f727572096a5c9a89213c7b6f382fbbf5c
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Jul 7 17:34:34 2022 +0800
Simplify lock for scaling job (#18935)
---
.../data/pipeline/api/RuleAlteredJobAPI.java | 10 ++--
.../data/pipeline/spi/lock/JobLock.java | 40 --------------
.../data/pipeline/spi/lock/RowBasedJobLock.java | 26 ---------
.../pipeline/spi/lock/RowBasedJobLockFactory.java | 43 ---------------
.../data/pipeline/spi/lock/RuleBasedJobLock.java | 26 ---------
.../pipeline/spi/lock/RuleBasedJobLockFactory.java | 43 ---------------
.../api/fixture/RuleAlteredJobAPIFixture.java | 4 +-
.../spi/fixture/RowBasedJobLockFixture.java | 31 -----------
.../spi/fixture/RuleBasedJobLockFixture.java | 31 -----------
.../spi/lock/RowBasedJobLockFactoryTest.java | 32 ------------
.../spi/lock/RuleBasedJobLockFactoryTest.java | 32 ------------
...ngsphere.data.pipeline.spi.lock.RowBasedJobLock | 18 -------
...gsphere.data.pipeline.spi.lock.RuleBasedJobLock | 18 -------
.../core/api/impl/RuleAlteredJobAPIImpl.java | 14 ++---
.../data/pipeline/core/job/FinishedCheckJob.java | 30 +++--------
.../scenario/rulealtered/RuleAlteredContext.java | 10 ----
.../spi/DefaultMetadataCheckoutLock.java | 35 -------------
.../spi/DefaultSourceWritingStopLock.java | 44 ----------------
...ngsphere.data.pipeline.spi.lock.RowBasedJobLock | 18 -------
...gsphere.data.pipeline.spi.lock.RuleBasedJobLock | 18 -------
.../core/api/impl/RuleAlteredJobAPIImplTest.java | 12 ++---
.../spi/DefaultSourceWritingStopLockTest.java | 61 ----------------------
22 files changed, 24 insertions(+), 572 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
index 976670b146e..db201f9a5b0 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
@@ -76,10 +76,9 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI,
RequiredSPI {
/**
* Stop cluster writing.
*
- * @param databaseName database name
- * @param jobId job id
+ * @param jobConfig job configuration
*/
- void stopClusterWriteDB(String databaseName, String jobId);
+ void stopClusterWriteDB(RuleAlteredJobConfiguration jobConfig);
/**
* Restore cluster writing.
@@ -91,10 +90,9 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI,
RequiredSPI {
/**
* Restore cluster writing.
*
- * @param databaseName database name
- * @param jobId job id
+ * @param jobConfig job configuration
*/
- void restoreClusterWriteDB(String databaseName, String jobId);
+ void restoreClusterWriteDB(RuleAlteredJobConfiguration jobConfig);
/**
* List all data consistency check algorithms from SPI.
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/lock/JobLock.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/lock/JobLock.java
deleted file mode 100644
index 35c1d1b7378..00000000000
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/lock/JobLock.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.spi.lock;
-
-/**
- * Job lock.
- */
-public interface JobLock {
-
- /**
- * Lock.
- *
- * @param databaseName database name
- * @param jobId job id
- */
- void lock(String databaseName, String jobId);
-
- /**
- * Release lock.
- *
- * @param databaseName database name
- * @param jobId job id
- */
- void releaseLock(String databaseName, String jobId);
-}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/lock/RowBasedJobLock.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/lock/RowBasedJobLock.java
deleted file mode 100644
index a1b0b8ad6cc..00000000000
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/lock/RowBasedJobLock.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.spi.lock;
-
-import org.apache.shardingsphere.spi.type.required.RequiredSPI;
-
-/**
- * Row based job lock.
- */
-public interface RowBasedJobLock extends JobLock, RequiredSPI {
-}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/lock/RowBasedJobLockFactory.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/lock/RowBasedJobLockFactory.java
deleted file mode 100644
index d09b49b3b95..00000000000
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/lock/RowBasedJobLockFactory.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.spi.lock;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
-import org.apache.shardingsphere.spi.type.required.RequiredSPIRegistry;
-
-/**
- * Row based job lock factory.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class RowBasedJobLockFactory {
-
- static {
- ShardingSphereServiceLoader.register(RowBasedJobLock.class);
- }
-
- /**
- * Get instance of row based job lock.
- *
- * @return got instance
- */
- public static RowBasedJobLock getInstance() {
- return RequiredSPIRegistry.getRegisteredService(RowBasedJobLock.class);
- }
-}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/lock/RuleBasedJobLock.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/lock/RuleBasedJobLock.java
deleted file mode 100644
index d07779b5e9f..00000000000
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/lock/RuleBasedJobLock.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.spi.lock;
-
-import org.apache.shardingsphere.spi.type.required.RequiredSPI;
-
-/**
- * Rule based job lock.
- */
-public interface RuleBasedJobLock extends JobLock, RequiredSPI {
-}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/lock/RuleBasedJobLockFactory.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/lock/RuleBasedJobLockFactory.java
deleted file mode 100644
index a2b1f008980..00000000000
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/lock/RuleBasedJobLockFactory.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.spi.lock;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
-import org.apache.shardingsphere.spi.type.required.RequiredSPIRegistry;
-
-/**
- * Rule based job lock factory.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class RuleBasedJobLockFactory {
-
- static {
- ShardingSphereServiceLoader.register(RuleBasedJobLock.class);
- }
-
- /**
- * Get instance of rule based job lock.
- *
- * @return got instance
- */
- public static RuleBasedJobLock getInstance() {
- return
RequiredSPIRegistry.getRegisteredService(RuleBasedJobLock.class);
- }
-}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/fixture/RuleAlteredJobAPIFixture.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/fixture/RuleAlteredJobAPIFixture.java
index 94918f5202e..bfb05d6ae54 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/fixture/RuleAlteredJobAPIFixture.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/fixture/RuleAlteredJobAPIFixture.java
@@ -69,7 +69,7 @@ public final class RuleAlteredJobAPIFixture implements
RuleAlteredJobAPI {
}
@Override
- public void stopClusterWriteDB(final String databaseName, final String
jobId) {
+ public void stopClusterWriteDB(final RuleAlteredJobConfiguration
jobConfig) {
}
@Override
@@ -77,7 +77,7 @@ public final class RuleAlteredJobAPIFixture implements
RuleAlteredJobAPI {
}
@Override
- public void restoreClusterWriteDB(final String databaseName, final String
jobId) {
+ public void restoreClusterWriteDB(final RuleAlteredJobConfiguration
jobConfig) {
}
@Override
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/fixture/RowBasedJobLockFixture.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/fixture/RowBasedJobLockFixture.java
deleted file mode 100644
index 0b4c30e419d..00000000000
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/fixture/RowBasedJobLockFixture.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.spi.fixture;
-
-import org.apache.shardingsphere.data.pipeline.spi.lock.RowBasedJobLock;
-
-public final class RowBasedJobLockFixture implements RowBasedJobLock {
-
- @Override
- public void lock(final String databaseName, final String jobId) {
- }
-
- @Override
- public void releaseLock(final String databaseName, final String jobId) {
- }
-}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/fixture/RuleBasedJobLockFixture.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/fixture/RuleBasedJobLockFixture.java
deleted file mode 100644
index 2921ca1dec3..00000000000
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/fixture/RuleBasedJobLockFixture.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.spi.fixture;
-
-import org.apache.shardingsphere.data.pipeline.spi.lock.RuleBasedJobLock;
-
-public final class RuleBasedJobLockFixture implements RuleBasedJobLock {
-
- @Override
- public void lock(final String databaseName, final String jobId) {
- }
-
- @Override
- public void releaseLock(final String databaseName, final String jobId) {
- }
-}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/lock/RowBasedJobLockFactoryTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/lock/RowBasedJobLockFactoryTest.java
deleted file mode 100644
index abfd2df8632..00000000000
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/lock/RowBasedJobLockFactoryTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.spi.lock;
-
-import
org.apache.shardingsphere.data.pipeline.spi.fixture.RowBasedJobLockFixture;
-import org.junit.Test;
-
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertThat;
-
-public final class RowBasedJobLockFactoryTest {
-
- @Test
- public void assertGetInstance() {
- assertThat(RowBasedJobLockFactory.getInstance(),
instanceOf(RowBasedJobLockFixture.class));
- }
-}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/lock/RuleBasedJobLockFactoryTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/lock/RuleBasedJobLockFactoryTest.java
deleted file mode 100644
index 77f3ca05222..00000000000
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/lock/RuleBasedJobLockFactoryTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.spi.lock;
-
-import
org.apache.shardingsphere.data.pipeline.spi.fixture.RuleBasedJobLockFixture;
-import org.junit.Test;
-
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertThat;
-
-public final class RuleBasedJobLockFactoryTest {
-
- @Test
- public void assertGetInstance() {
- assertThat(RuleBasedJobLockFactory.getInstance(),
instanceOf(RuleBasedJobLockFixture.class));
- }
-}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.lock.RowBasedJobLock
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.lock.RowBasedJobLock
deleted file mode 100644
index 293ac3c26fa..00000000000
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.lock.RowBasedJobLock
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.spi.fixture.RowBasedJobLockFixture
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.lock.RuleBasedJobLock
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.lock.RuleBasedJobLock
deleted file mode 100644
index c07852c1565..00000000000
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.lock.RuleBasedJobLock
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.spi.fixture.RuleBasedJobLockFixture
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index 03a755a9986..eeb7427263c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -172,12 +172,12 @@ public final class RuleAlteredJobAPIImpl extends
AbstractPipelineJobAPIImpl impl
verifyManualMode(jobConfig);
verifyJobNotStopped(jobConfigPOJO);
verifyJobNotCompleted(jobConfig);
- String databaseName = jobConfig.getDatabaseName();
- stopClusterWriteDB(databaseName, jobId);
+ stopClusterWriteDB(jobConfig);
}
@Override
- public void stopClusterWriteDB(final String databaseName, final String
jobId) {
+ public void stopClusterWriteDB(final RuleAlteredJobConfiguration
jobConfig) {
+ String databaseName = jobConfig.getDatabaseName();
LockContext lockContext =
PipelineContext.getContextManager().getInstanceContext().getLockContext();
LockNameDefinition lockNameDefinition =
LockNameDefinitionFactory.newDatabaseDefinition(databaseName);
if (lockContext.isLocked(lockNameDefinition)) {
@@ -198,16 +198,16 @@ public final class RuleAlteredJobAPIImpl extends
AbstractPipelineJobAPIImpl impl
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
RuleAlteredJobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
verifyManualMode(jobConfig);
- String databaseName = jobConfig.getDatabaseName();
- restoreClusterWriteDB(databaseName, jobId);
+ restoreClusterWriteDB(jobConfig);
}
@Override
- public void restoreClusterWriteDB(final String databaseName, final String
jobId) {
+ public void restoreClusterWriteDB(final RuleAlteredJobConfiguration
jobConfig) {
+ String databaseName = jobConfig.getDatabaseName();
LockContext lockContext =
PipelineContext.getContextManager().getInstanceContext().getLockContext();
LockNameDefinition lockNameDefinition =
LockNameDefinitionFactory.newDatabaseDefinition(databaseName);
if (lockContext.isLocked(lockNameDefinition)) {
- log.info("restoreClusterWriteDB, before releaseLock,
databaseName={}, jobId={}", databaseName, jobId);
+ log.info("restoreClusterWriteDB, before releaseLock,
databaseName={}, jobId={}", databaseName, jobConfig.getJobId());
lockContext.releaseLock(lockNameDefinition);
return;
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
index 9ae3f7dfbe7..c403895f9d8 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
@@ -28,8 +28,6 @@ import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
-import org.apache.shardingsphere.data.pipeline.spi.lock.RowBasedJobLock;
-import org.apache.shardingsphere.data.pipeline.spi.lock.RuleBasedJobLock;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@@ -58,6 +56,7 @@ public final class FinishedCheckJob implements SimpleJob {
log.info("check not completed for job {}, ignore", jobId);
continue;
}
+ // TODO merge to CompletionDetectAlgorithm
if (isNotAllowDataCheck(jobId)) {
continue;
}
@@ -75,12 +74,8 @@ public final class FinishedCheckJob implements SimpleJob {
continue;
}
log.info("scaling job {} almost finished.", jobId);
- RowBasedJobLock rowBasedJobLock =
ruleAlteredContext.getRowBasedJobLock();
- String databaseName = jobConfig.getDatabaseName();
try {
- if (null != rowBasedJobLock) {
- rowBasedJobLock.lock(databaseName, jobId + "");
- }
+ ruleAlteredJobAPI.stopClusterWriteDB(jobConfig);
if
(!ruleAlteredJobAPI.isDataConsistencyCheckNeeded(jobConfig)) {
log.info("DataConsistencyCalculatorAlgorithm is not
configured, data consistency check is ignored.");
ruleAlteredJobAPI.switchClusterConfiguration(jobConfig);
@@ -90,12 +85,9 @@ public final class FinishedCheckJob implements SimpleJob {
log.error("data consistency check failed, job {}",
jobId);
continue;
}
- RuleBasedJobLock ruleBasedJobLock =
ruleAlteredContext.getRuleBasedJobLock();
- switchClusterConfiguration(databaseName, jobConfig,
ruleBasedJobLock);
+ switchClusterConfiguration(jobConfig);
} finally {
- if (null != rowBasedJobLock) {
- rowBasedJobLock.releaseLock(databaseName, jobId + "");
- }
+ ruleAlteredJobAPI.restoreClusterWriteDB(jobConfig);
}
log.info("job {} finished", jobId);
// CHECKSTYLE:OFF
@@ -124,17 +116,7 @@ public final class FinishedCheckJob implements SimpleJob {
return ruleAlteredJobAPI.aggregateDataConsistencyCheckResults(jobId,
ruleAlteredJobAPI.dataConsistencyCheck(jobConfig));
}
- private void switchClusterConfiguration(final String databaseName, final
RuleAlteredJobConfiguration jobConfig, final RuleBasedJobLock ruleBasedJobLock)
{
- String jobId = jobConfig.getJobId();
- try {
- if (null != ruleBasedJobLock) {
- ruleBasedJobLock.lock(databaseName, jobId + "");
- }
- ruleAlteredJobAPI.switchClusterConfiguration(jobConfig);
- } finally {
- if (null != ruleBasedJobLock) {
- ruleBasedJobLock.releaseLock(databaseName, jobId + "");
- }
- }
+ private void switchClusterConfiguration(final RuleAlteredJobConfiguration
jobConfig) {
+ ruleAlteredJobAPI.switchClusterConfiguration(jobConfig);
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
index 0888dd9cd7e..34792f7f4bf 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
@@ -28,10 +28,6 @@ import
org.apache.shardingsphere.data.pipeline.spi.detect.JobCompletionDetectAlg
import
org.apache.shardingsphere.data.pipeline.spi.detect.JobCompletionDetectAlgorithmFactory;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreatorFactory;
-import org.apache.shardingsphere.data.pipeline.spi.lock.RowBasedJobLock;
-import org.apache.shardingsphere.data.pipeline.spi.lock.RowBasedJobLockFactory;
-import org.apache.shardingsphere.data.pipeline.spi.lock.RuleBasedJobLock;
-import
org.apache.shardingsphere.data.pipeline.spi.lock.RuleBasedJobLockFactory;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithmFactory;
import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
@@ -74,10 +70,6 @@ public final class RuleAlteredContext {
private final DataConsistencyCalculateAlgorithm
dataConsistencyCalculateAlgorithm;
- private final RowBasedJobLock rowBasedJobLock;
-
- private final RuleBasedJobLock ruleBasedJobLock;
-
private final ExecuteEngine inventoryDumperExecuteEngine;
private final ExecuteEngine incrementalDumperExecuteEngine;
@@ -102,8 +94,6 @@ public final class RuleAlteredContext {
dataConsistencyCalculateAlgorithm = null !=
dataConsistencyCheckerConfig
?
DataConsistencyCalculateAlgorithmFactory.newInstance(dataConsistencyCheckerConfig.getType(),
dataConsistencyCheckerConfig.getProps())
: null;
- rowBasedJobLock = RowBasedJobLockFactory.getInstance();
- ruleBasedJobLock = RuleBasedJobLockFactory.getInstance();
inventoryDumperExecuteEngine =
ExecuteEngine.newFixedThreadInstance(inputConfig.getWorkerThread(),
INVENTORY_THREAD_PREFIX + jobId);
incrementalDumperExecuteEngine =
ExecuteEngine.newCachedThreadInstance(INCREMENTAL_THREAD_PREFIX + jobId);
importerExecuteEngine =
ExecuteEngine.newFixedThreadInstance(outputConfig.getWorkerThread(),
IMPORTER_THREAD_PREFIX + jobId);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/DefaultMetadataCheckoutLock.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/DefaultMetadataCheckoutLock.java
deleted file mode 100644
index 4d1ac534dd4..00000000000
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/DefaultMetadataCheckoutLock.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered.spi;
-
-import org.apache.shardingsphere.data.pipeline.spi.lock.RuleBasedJobLock;
-
-/**
- * Default metadata checkout lock.
- */
-public final class DefaultMetadataCheckoutLock implements RuleBasedJobLock {
-
- // TODO to be finished
- @Override
- public void lock(final String databaseName, final String jobId) {
- }
-
- @Override
- public void releaseLock(final String databaseName, final String jobId) {
- }
-}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/DefaultSourceWritingStopLock.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/DefaultSourceWritingStopLock.java
deleted file mode 100644
index 05c18c394e1..00000000000
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/DefaultSourceWritingStopLock.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered.spi;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
-import org.apache.shardingsphere.data.pipeline.spi.lock.RowBasedJobLock;
-
-/**
- * Default source writing stop lock.
- */
-@Slf4j
-public final class DefaultSourceWritingStopLock implements RowBasedJobLock {
-
- private final RuleAlteredJobAPI ruleAlteredJobAPI =
RuleAlteredJobAPIFactory.getInstance();
-
- @Override
- public void lock(final String databaseName, final String jobId) {
- log.info("lock, databaseName={}, jobId={}", databaseName, jobId);
- ruleAlteredJobAPI.stopClusterWriteDB(databaseName, jobId);
- }
-
- @Override
- public void releaseLock(final String databaseName, final String jobId) {
- log.info("releaseLock, databaseName={}, jobId={}", databaseName,
jobId);
- ruleAlteredJobAPI.restoreClusterWriteDB(databaseName, jobId);
- }
-}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.lock.RowBasedJobLock
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.lock.RowBasedJobLock
deleted file mode 100644
index 3e108a101e3..00000000000
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.lock.RowBasedJobLock
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.scenario.rulealtered.spi.DefaultSourceWritingStopLock
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.lock.RuleBasedJobLock
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.lock.RuleBasedJobLock
deleted file mode 100644
index 358c51c22b8..00000000000
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.lock.RuleBasedJobLock
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.scenario.rulealtered.spi.DefaultMetadataCheckoutLock
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
index b0c5553cdaf..66612a0371c 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
@@ -19,8 +19,8 @@ package org.apache.shardingsphere.data.pipeline.core.api.impl;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
+import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
@@ -136,10 +136,9 @@ public final class RuleAlteredJobAPIImplTest {
log.error("source is null, jobConfig={}",
YamlEngine.marshal(jobConfig));
}
initTableData(jobConfig);
- String databaseName = jobConfig.getDatabaseName();
- ruleAlteredJobAPI.stopClusterWriteDB(databaseName, jobId.get());
+ ruleAlteredJobAPI.stopClusterWriteDB(jobConfig);
Map<String, DataConsistencyCheckResult> checkResultMap =
ruleAlteredJobAPI.dataConsistencyCheck(jobId.get());
- ruleAlteredJobAPI.restoreClusterWriteDB(databaseName, jobId.get());
+ ruleAlteredJobAPI.restoreClusterWriteDB(jobConfig);
assertThat(checkResultMap.size(), is(1));
}
@@ -149,10 +148,9 @@ public final class RuleAlteredJobAPIImplTest {
assertTrue(jobId.isPresent());
RuleAlteredJobConfiguration jobConfig =
ruleAlteredJobAPI.getJobConfig(jobId.get());
initTableData(jobConfig);
- String databaseName = jobConfig.getDatabaseName();
- ruleAlteredJobAPI.stopClusterWriteDB(databaseName, jobId.get());
+ ruleAlteredJobAPI.stopClusterWriteDB(jobConfig);
Map<String, DataConsistencyCheckResult> checkResultMap =
ruleAlteredJobAPI.dataConsistencyCheck(jobId.get(), "FIXTURE", null);
- ruleAlteredJobAPI.restoreClusterWriteDB(databaseName, jobId.get());
+ ruleAlteredJobAPI.restoreClusterWriteDB(jobConfig);
assertThat(checkResultMap.size(), is(1));
assertTrue(checkResultMap.get("t_order").getCountCheckResult().isMatched());
assertThat(checkResultMap.get("t_order").getCountCheckResult().getTargetRecordsCount(),
is(2L));
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/DefaultSourceWritingStopLockTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/DefaultSourceWritingStopLockTest.java
deleted file mode 100644
index 2de0b5aa5ca..00000000000
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/DefaultSourceWritingStopLockTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered.spi;
-
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import org.junit.Before;
-import org.junit.Test;
-
-public final class DefaultSourceWritingStopLockTest {
-
- private final DefaultSourceWritingStopLock defaultSourceWritingStopLock =
new DefaultSourceWritingStopLock();
-
- private final String lockName = "lock1";
-
- private final String jobId = "jobId1";
-
- @Before
- public void setup() {
- PipelineContextUtil.mockModeConfigAndContextManager();
- }
-
- @Test
- public void assertSuccessLockReleaseLock() {
- defaultSourceWritingStopLock.lock(lockName, jobId);
- defaultSourceWritingStopLock.releaseLock(lockName, jobId);
- }
-
- @Test
- public void assertSuccessLockTwiceReleaseLock() {
- defaultSourceWritingStopLock.lock(lockName, jobId);
- defaultSourceWritingStopLock.lock(lockName, jobId);
- defaultSourceWritingStopLock.releaseLock(lockName, jobId);
- }
-
- @Test
- public void assertSuccessLockReleaseLockTwice() {
- defaultSourceWritingStopLock.lock(lockName, jobId);
- defaultSourceWritingStopLock.releaseLock(lockName, jobId);
- defaultSourceWritingStopLock.releaseLock(lockName, jobId);
- }
-
- @Test
- public void assertSuccessReleaseNullLock() {
- defaultSourceWritingStopLock.releaseLock(lockName, jobId);
- }
-}