This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ea76dbfa84 Add CDCJobItemContextTest (#37381)
3ea76dbfa84 is described below

commit 3ea76dbfa84d449a4226a1d7fdc562a0a61aa0dc
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 14 13:49:38 2025 +0800

    Add CDCJobItemContextTest (#37381)
---
 .../cdc/context/CDCJobItemContextTest.java         | 112 +++++++++++++++++++++
 1 file changed, 112 insertions(+)

diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContextTest.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContextTest.java
new file mode 100644
index 00000000000..aed58846e62
--- /dev/null
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContextTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.context;
+
+import org.apache.shardingsphere.data.pipeline.cdc.config.CDCJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.config.CDCTaskConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonContext;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.ActualAndLogicTableNameMapper;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
+import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
+import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
+import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+class CDCJobItemContextTest {
+    
+    @Test
+    void assertContextWithInitProgressAndLazyLoaders() {
+        PipelineDataSource pipelineDataSource = mock(PipelineDataSource.class);
+        PipelineDataSourceManager dataSourceManager = 
mock(PipelineDataSourceManager.class);
+        
when(dataSourceManager.getDataSource(any())).thenReturn(pipelineDataSource);
+        TransmissionJobItemProgress initProgress = new 
TransmissionJobItemProgress();
+        initProgress.setProcessedRecordsCount(5L);
+        initProgress.setInventoryRecordsCount(7L);
+        CDCJobItemContext context = new 
CDCJobItemContext(createJobConfiguration(), 1, initProgress, 
mock(TransmissionProcessContext.class), createTaskConfiguration(), 
dataSourceManager, mock());
+        assertThat(context.getJobId(), is("foo_job"));
+        assertThat(context.getDataSourceName(), is("foo_ds"));
+        assertThat(context.getProcessedRecordsCount(), is(5L));
+        assertThat(context.getInventoryRecordsCount(), is(7L));
+        assertThat(context.getSourceDataSource(), is(pipelineDataSource));
+        assertNotNull(context.getSourceMetaDataLoader());
+        assertFalse(context.isStopping());
+        assertThat(context.getStatus(), is(JobStatus.RUNNING));
+    }
+    
+    @Test
+    void assertProgressAndInventoryUpdate() {
+        try (MockedStatic<PipelineJobProgressPersistService> persistService = 
mockStatic(PipelineJobProgressPersistService.class)) {
+            CDCJobItemContext context = new 
CDCJobItemContext(createJobConfiguration(), 2, null, mock(), 
createTaskConfiguration(), mock(), mock());
+            context.onProgressUpdated(new PipelineJobUpdateProgress(3));
+            assertThat(context.getProcessedRecordsCount(), is(3L));
+            persistService.verify(() -> 
PipelineJobProgressPersistService.notifyPersist("foo_job", 2));
+            context.updateInventoryRecordsCount(4L);
+            assertThat(context.getInventoryRecordsCount(), is(4L));
+        }
+    }
+    
+    @Test
+    void assertInitWithoutInitialProgress() {
+        PipelineSink sink = mock(PipelineSink.class);
+        CDCJobItemContext context = new 
CDCJobItemContext(createJobConfiguration(), 3, null, mock(), 
createTaskConfiguration(), mock(), sink);
+        assertThat(context.getProcessedRecordsCount(), is(0L));
+        assertThat(context.getInventoryRecordsCount(), is(0L));
+        assertTrue(context.getInventoryTasks().isEmpty());
+        assertTrue(context.getIncrementalTasks().isEmpty());
+        assertThat(context.getJobConfig().getJobId(), is("foo_job"));
+        assertThat(context.getSink(), is(sink));
+    }
+    
+    private CDCJobConfiguration createJobConfiguration() {
+        return new CDCJobConfiguration(
+                "foo_job", "foo_db", Collections.emptyList(), true, 
TypedSPILoader.getService(DatabaseType.class, "FIXTURE"), mock(), null, 
Collections.emptyList(), true, null, 1, 0);
+    }
+    
+    private CDCTaskConfiguration createTaskConfiguration() {
+        DumperCommonContext commonContext = new DumperCommonContext("foo_ds", 
mock(), new ActualAndLogicTableNameMapper(Collections.emptyMap()), new 
TableAndSchemaNameMapper(Collections.emptyMap()));
+        IncrementalDumperContext dumperContext = new 
IncrementalDumperContext(commonContext, "foo_job", true);
+        Map<ShardingSphereIdentifier, Collection<String>> 
tableAndRequiredColumns = Collections.singletonMap(new 
ShardingSphereIdentifier("t_order"), Collections.singleton("id"));
+        ImporterConfiguration importerConfig = new 
ImporterConfiguration(mock(), tableAndRequiredColumns, new 
TableAndSchemaNameMapper(Collections.emptyMap()), 1, null, 1, 1);
+        return new CDCTaskConfiguration(dumperContext, importerConfig);
+    }
+}

Reply via email to