This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a9cdcf8b7b2 Add test cases on IncrementalTaskPositionManager (#33409)
a9cdcf8b7b2 is described below

commit a9cdcf8b7b216fda6170eafe095b7ae188ffca3d
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Oct 26 14:14:41 2024 +0800

    Add test cases on IncrementalTaskPositionManager (#33409)
---
 .../IncrementalTaskPositionManager.java            |  18 ++-
 .../IncrementalTaskPositionManagerTest.java        | 140 +++++++++++++++++++++
 2 files changed, 147 insertions(+), 11 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
index 8367dd211bf..32514b9fa4c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
@@ -82,28 +82,24 @@ public final class IncrementalTaskPositionManager {
         final long startTimeMillis = System.currentTimeMillis();
         log.info("Cleanup position, database type: {}, pipeline data source 
type: {}", databaseType.getType(), pipelineDataSourceConfig.getType());
         if (pipelineDataSourceConfig instanceof 
ShardingSpherePipelineDataSourceConfiguration) {
-            destroyPosition(jobId, 
(ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig, 
dialectPositionManager);
+            destroyPosition(jobId, 
(ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig);
         } else if (pipelineDataSourceConfig instanceof 
StandardPipelineDataSourceConfiguration) {
-            destroyPosition(jobId, (StandardPipelineDataSourceConfiguration) 
pipelineDataSourceConfig, dialectPositionManager);
+            destroyPosition(jobId, (StandardPipelineDataSourceConfiguration) 
pipelineDataSourceConfig);
         }
         log.info("Destroy position cost {} ms.", System.currentTimeMillis() - 
startTimeMillis);
     }
     
-    private void destroyPosition(final String jobId,
-                                 final 
ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig, final 
DialectIncrementalPositionManager positionInitializer) throws SQLException {
+    private void destroyPosition(final String jobId, final 
ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig) throws 
SQLException {
         for (DataSourcePoolProperties each : new 
YamlDataSourceConfigurationSwapper().getDataSourcePoolPropertiesMap(pipelineDataSourceConfig.getRootConfig()).values())
 {
             try (PipelineDataSource dataSource = new 
PipelineDataSource(DataSourcePoolCreator.create(each), databaseType)) {
-                positionInitializer.destroy(dataSource, jobId);
+                dialectPositionManager.destroy(dataSource, jobId);
             }
         }
     }
     
-    private void destroyPosition(final String jobId, final 
StandardPipelineDataSourceConfiguration pipelineDataSourceConfig,
-                                 final DialectIncrementalPositionManager 
positionInitializer) throws SQLException {
-        try (
-                PipelineDataSource dataSource = new PipelineDataSource(
-                        
DataSourcePoolCreator.create((DataSourcePoolProperties) 
pipelineDataSourceConfig.getDataSourceConfiguration()), databaseType)) {
-            positionInitializer.destroy(dataSource, jobId);
+    private void destroyPosition(final String jobId, final 
StandardPipelineDataSourceConfiguration pipelineDataSourceConfig) throws 
SQLException {
+        try (PipelineDataSource dataSource = new 
PipelineDataSource(DataSourcePoolCreator.create((DataSourcePoolProperties) 
pipelineDataSourceConfig.getDataSourceConfiguration()), databaseType)) {
+            dialectPositionManager.destroy(dataSource, jobId);
         }
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManagerTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManagerTest.java
new file mode 100644
index 00000000000..3b7385afeeb
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManagerTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.preparer.incremental;
+
+import lombok.SneakyThrows;
+import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIncrementalPositionManager;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
+import org.apache.shardingsphere.test.fixture.jdbc.MockedDataSource;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.internal.configuration.plugins.Plugins;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class IncrementalTaskPositionManagerTest {
+    
+    private final DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, "H2");
+    
+    @Mock
+    private DialectIncrementalPositionManager dialectPositionManager;
+    
+    private IncrementalTaskPositionManager incrementalTaskPositionManager;
+    
+    @SneakyThrows(ReflectiveOperationException.class)
+    @BeforeEach
+    void setUp() {
+        incrementalTaskPositionManager = new 
IncrementalTaskPositionManager(databaseType);
+        
Plugins.getMemberAccessor().set(IncrementalTaskPositionManager.class.getDeclaredField("dialectPositionManager"),
 incrementalTaskPositionManager, dialectPositionManager);
+    }
+    
+    @Test
+    void assertGetPositionWithInitialProgress() throws SQLException {
+        JobItemIncrementalTasksProgress initialProgress = 
mock(JobItemIncrementalTasksProgress.class);
+        IngestPosition position = mock(IngestPosition.class);
+        
when(initialProgress.getIncrementalPosition()).thenReturn(Optional.of(position));
+        IncrementalDumperContext dumperContext = 
mockIncrementalDumperContext();
+        assertThat(incrementalTaskPositionManager.getPosition(initialProgress, 
dumperContext, mock(PipelineDataSourceManager.class)), is(position));
+    }
+    
+    @Test
+    void assertGetPositionWithoutIncrementalProgress() throws SQLException {
+        JobItemIncrementalTasksProgress initialProgress = 
mock(JobItemIncrementalTasksProgress.class);
+        
when(initialProgress.getIncrementalPosition()).thenReturn(Optional.empty());
+        IncrementalDumperContext dumperContext = 
mockIncrementalDumperContext();
+        PipelineDataSourceManager dataSourceManager = 
mock(PipelineDataSourceManager.class);
+        PipelineDataSource dataSource = mock(PipelineDataSource.class);
+        
when(dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig())).thenReturn(dataSource);
+        IngestPosition position = mock(IngestPosition.class);
+        when(dialectPositionManager.init(dataSource, 
dumperContext.getJobId())).thenReturn(position);
+        assertThat(incrementalTaskPositionManager.getPosition(initialProgress, 
dumperContext, dataSourceManager), is(position));
+    }
+    
+    @Test
+    void assertGetPositionWithoutInitialProgress() throws SQLException {
+        IncrementalDumperContext dumperContext = 
mockIncrementalDumperContext();
+        PipelineDataSourceManager dataSourceManager = 
mock(PipelineDataSourceManager.class);
+        PipelineDataSource dataSource = mock(PipelineDataSource.class);
+        
when(dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig())).thenReturn(dataSource);
+        IngestPosition position = mock(IngestPosition.class);
+        when(dialectPositionManager.init(dataSource, 
dumperContext.getJobId())).thenReturn(position);
+        assertThat(incrementalTaskPositionManager.getPosition(null, 
dumperContext, dataSourceManager), is(position));
+    }
+    
+    private IncrementalDumperContext mockIncrementalDumperContext() {
+        IncrementalDumperContext result = mock(IncrementalDumperContext.class, 
RETURNS_DEEP_STUBS);
+        PipelineDataSourceConfiguration dataSourceConfig = 
mock(PipelineDataSourceConfiguration.class);
+        
when(result.getCommonContext().getDataSourceConfig()).thenReturn(dataSourceConfig);
+        return result;
+    }
+    
+    @Test
+    void 
assertDestroyPositionWithShardingSpherePipelineDataSourceConfiguration() throws 
SQLException {
+        YamlRootConfiguration rootConfig = new YamlRootConfiguration();
+        Map<String, Object> dataSourceProps = new HashMap<>();
+        dataSourceProps.put("dataSourceClassName", 
MockedDataSource.class.getName());
+        dataSourceProps.put("url", "jdbc:mock://127.0.0.1/foo_ds");
+        rootConfig.getDataSources().put("foo_ds", dataSourceProps);
+        ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig 
= new ShardingSpherePipelineDataSourceConfiguration(rootConfig);
+        incrementalTaskPositionManager.destroyPosition("foo_job", 
pipelineDataSourceConfig);
+        verify(dialectPositionManager).destroy(any(), eq("foo_job"));
+    }
+    
+    @Test
+    void assertDestroyPositionWithStandardPipelineDataSourceConfiguration() 
throws SQLException {
+        Map<String, Object> dataSourceProps = new HashMap<>();
+        dataSourceProps.put("dataSourceClassName", 
MockedDataSource.class.getName());
+        dataSourceProps.put("url", "jdbc:mock://127.0.0.1/foo_ds");
+        StandardPipelineDataSourceConfiguration pipelineDataSourceConfig = new 
StandardPipelineDataSourceConfiguration(dataSourceProps);
+        incrementalTaskPositionManager.destroyPosition("foo_job", 
pipelineDataSourceConfig);
+        verify(dialectPositionManager).destroy(any(), eq("foo_job"));
+    }
+    
+    @Test
+    void assertDestroyPositionWithUnknownPipelineDataSourceConfiguration() 
throws SQLException {
+        incrementalTaskPositionManager.destroyPosition("foo_job", 
mock(PipelineDataSourceConfiguration.class));
+        verify(dialectPositionManager, times(0)).destroy(any(), eq("foo_job"));
+    }
+}

Reply via email to