This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new daacf40ee6d Add test cases on PipelineTaskUtils (#33399)
daacf40ee6d is described below
commit daacf40ee6dd7ba20aa16bca7fb06172e9415128
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Oct 25 12:53:12 2024 +0800
Add test cases on PipelineTaskUtils (#33399)
---
.../pipeline/core/task/PipelineTaskUtilsTest.java | 36 ++++++++++++++++++++++
1 file changed, 36 insertions(+)
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtilsTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtilsTest.java
index 903e99ec83a..689db97ca99 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtilsTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtilsTest.java
@@ -19,10 +19,17 @@ package org.apache.shardingsphere.data.pipeline.core.task;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskDelay;
+import
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
import org.junit.jupiter.api.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
class PipelineTaskUtilsTest {
@@ -33,4 +40,33 @@ class PipelineTaskUtilsTest {
dumperContext.setShardingItem(1);
assertThat(PipelineTaskUtils.generateInventoryTaskId(dumperContext),
is("foo_ds.foo_actual_tbl#1"));
}
+
+ @Test
+ void assertCreateIncrementalTaskProgressWithNullInitProgress() {
+ IncrementalTaskProgress actual =
PipelineTaskUtils.createIncrementalTaskProgress(mock(IngestPosition.class),
null);
+ assertThat(actual.getIncrementalTaskDelay().getLastEventTimestamps(),
is(0L));
+ }
+
+ @Test
+ void assertCreateIncrementalTaskProgressWithNullIncremental() {
+ IncrementalTaskProgress actual =
PipelineTaskUtils.createIncrementalTaskProgress(mock(IngestPosition.class),
mock(TransmissionJobItemProgress.class));
+ assertThat(actual.getIncrementalTaskDelay().getLastEventTimestamps(),
is(0L));
+ }
+
+ @Test
+ void assertCreateIncrementalTaskProgress() {
+ IncrementalTaskProgress actual =
PipelineTaskUtils.createIncrementalTaskProgress(mock(IngestPosition.class),
mockTransmissionJobItemProgress());
+ assertThat(actual.getIncrementalTaskDelay().getLastEventTimestamps(),
is(1L));
+ }
+
+ private static TransmissionJobItemProgress
mockTransmissionJobItemProgress() {
+ TransmissionJobItemProgress result =
mock(TransmissionJobItemProgress.class);
+ IncrementalTaskProgress incrementalTaskProgress = new
IncrementalTaskProgress(null);
+ IncrementalTaskDelay taskDelay = new IncrementalTaskDelay();
+ taskDelay.setLastEventTimestamps(1L);
+ incrementalTaskProgress.setIncrementalTaskDelay(taskDelay);
+ JobItemIncrementalTasksProgress itemIncrementalTasksProgress = new
JobItemIncrementalTasksProgress(incrementalTaskProgress);
+ when(result.getIncremental()).thenReturn(itemIncrementalTasksProgress);
+ return result;
+ }
}