This is an automated email from the ASF dual-hosted git repository.

jarvis pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 67c8b7932d [improve] improve TaskLocation/TaskLocationGroup info 
(#8862)
67c8b7932d is described below

commit 67c8b7932d43df8d242c3139d3a163c6a0ac7501
Author: Jarvis <jar...@apache.org>
AuthorDate: Thu Mar 6 21:24:49 2025 +0800

    [improve] improve TaskLocation/TaskLocationGroup info (#8862)
---
 .../server/dag/physical/PhysicalPlanGenerator.java | 57 +++++-----------
 .../engine/server/dag/physical/PhysicalVertex.java | 34 +++-------
 .../engine/server/execution/TaskLocation.java      | 21 +++---
 .../seatunnel/engine/server/dag/TaskTest.java      | 79 ++++++++++++++++++++++
 4 files changed, 119 insertions(+), 72 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 5bd7e642a3..0bc7e46d5f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -91,7 +91,7 @@ public class PhysicalPlanGenerator {
 
     private final List<Pipeline> pipelines;
 
-    private final IdGenerator idGenerator = new IdGenerator();
+    private final IdGenerator taskGroupIdGenerator = new IdGenerator();
 
     private final JobImmutableInformation jobImmutableInformation;
 
@@ -274,15 +274,14 @@ public class PhysicalPlanGenerator {
                             }
                             // if sinkAggregatedCommitter is empty, don't 
create task.
                             if (sinkAggregatedCommitter.isPresent()) {
-                                long taskGroupID = idGenerator.getNextId();
-                                long taskTypeId = idGenerator.getNextId();
+                                long taskGroupID = 
taskGroupIdGenerator.getNextId();
                                 TaskGroupLocation taskGroupLocation =
                                         new TaskGroupLocation(
                                                 
jobImmutableInformation.getJobId(),
                                                 pipelineIndex,
                                                 taskGroupID);
                                 TaskLocation taskLocation =
-                                        new TaskLocation(taskGroupLocation, 
taskTypeId, 0);
+                                        new TaskLocation(taskGroupLocation, 0, 
0);
                                 SinkAggregatedCommitterTask<?, ?> t =
                                         new SinkAggregatedCommitterTask(
                                                 
jobImmutableInformation.getJobId(),
@@ -342,6 +341,7 @@ public class PhysicalPlanGenerator {
                             if (shuffleStrategy instanceof 
ShuffleMultipleRowStrategy) {
                                 ShuffleMultipleRowStrategy 
shuffleMultipleRowStrategy =
                                         (ShuffleMultipleRowStrategy) 
shuffleStrategy;
+                                AtomicInteger atomicInteger = new 
AtomicInteger(0);
                                 for (Flow nextFlow : flow.getNext()) {
                                     PhysicalExecutionFlow sinkFlow =
                                             (PhysicalExecutionFlow) nextFlow;
@@ -349,10 +349,7 @@ public class PhysicalPlanGenerator {
                                     String sinkTableId =
                                             
sinkAction.getConfig().getTablePath().toString();
 
-                                    long taskIDPrefix = 
idGenerator.getNextId();
-                                    long taskGroupIDPrefix = 
idGenerator.getNextId();
-                                    int parallelismIndex = 0;
-
+                                    int parallelismIndex = 
atomicInteger.getAndIncrement();
                                     ShuffleStrategy shuffleStrategyOfSinkFlow =
                                             shuffleMultipleRowStrategy
                                                     .toBuilder()
@@ -363,7 +360,6 @@ public class PhysicalPlanGenerator {
                                                     .toBuilder()
                                                     
.shuffleStrategy(shuffleStrategyOfSinkFlow)
                                                     .build();
-                                    long shuffleActionId = 
idGenerator.getNextId();
                                     String shuffleActionName =
                                             String.format(
                                                     "%s -> %s -> %s",
@@ -372,7 +368,7 @@ public class PhysicalPlanGenerator {
                                                     sinkAction.getName());
                                     ShuffleAction shuffleActionOfSinkFlow =
                                             new ShuffleAction(
-                                                    shuffleActionId,
+                                                    parallelismIndex,
                                                     shuffleActionName,
                                                     shuffleConfigOfSinkFlow);
                                     shuffleActionOfSinkFlow.setParallelism(1);
@@ -382,9 +378,7 @@ public class PhysicalPlanGenerator {
                                                     
Collections.singletonList(sinkFlow));
                                     setFlowConfig(shuffleFlow);
 
-                                    long taskGroupID =
-                                            mixIDPrefixAndIndex(
-                                                    taskGroupIDPrefix, 
parallelismIndex);
+                                    long taskGroupID = 
taskGroupIdGenerator.getNextId();
                                     TaskGroupLocation taskGroupLocation =
                                             new TaskGroupLocation(
                                                     
jobImmutableInformation.getJobId(),
@@ -392,9 +386,7 @@ public class PhysicalPlanGenerator {
                                                     taskGroupID);
                                     TaskLocation taskLocation =
                                             new TaskLocation(
-                                                    taskGroupLocation,
-                                                    taskIDPrefix,
-                                                    parallelismIndex);
+                                                    taskGroupLocation, 0, 
parallelismIndex);
                                     SeaTunnelTask seaTunnelTask =
                                             new TransformSeaTunnelTask(
                                                     
jobImmutableInformation.getJobId(),
@@ -428,17 +420,15 @@ public class PhysicalPlanGenerator {
                                                     
runningJobStateTimestampsIMap));
                                 }
                             } else {
-                                long taskIDPrefix = idGenerator.getNextId();
-                                long taskGroupIDPrefix = 
idGenerator.getNextId();
                                 for (int i = 0; i < 
flow.getAction().getParallelism(); i++) {
-                                    long taskGroupID = 
mixIDPrefixAndIndex(taskGroupIDPrefix, i);
+                                    long taskGroupID = 
taskGroupIdGenerator.getNextId();
                                     TaskGroupLocation taskGroupLocation =
                                             new TaskGroupLocation(
                                                     
jobImmutableInformation.getJobId(),
                                                     pipelineIndex,
                                                     taskGroupID);
                                     TaskLocation taskLocation =
-                                            new 
TaskLocation(taskGroupLocation, taskIDPrefix, i);
+                                            new 
TaskLocation(taskGroupLocation, 0, i);
                                     setFlowConfig(flow);
                                     SeaTunnelTask seaTunnelTask =
                                             new TransformSeaTunnelTask(
@@ -483,15 +473,13 @@ public class PhysicalPlanGenerator {
         return sources.stream()
                 .map(
                         sourceAction -> {
-                            long taskGroupID = idGenerator.getNextId();
-                            long taskTypeId = idGenerator.getNextId();
+                            long taskGroupID = 
taskGroupIdGenerator.getNextId();
                             TaskGroupLocation taskGroupLocation =
                                     new TaskGroupLocation(
                                             jobImmutableInformation.getJobId(),
                                             pipelineIndex,
                                             taskGroupID);
-                            TaskLocation taskLocation =
-                                    new TaskLocation(taskGroupLocation, 
taskTypeId, 0);
+                            TaskLocation taskLocation = new 
TaskLocation(taskGroupLocation, 0, 0);
                             SourceSplitEnumeratorTask<?> t =
                                     new SourceSplitEnumeratorTask<>(
                                             jobImmutableInformation.getJobId(),
@@ -541,32 +529,25 @@ public class PhysicalPlanGenerator {
                             if (sourceWithSink(flow)) {
                                 flows.addAll(splitSinkFromFlow(flow));
                             }
-                            long taskGroupIDPrefix = idGenerator.getNextId();
-                            Map<Long, Long> flowTaskIDPrefixMap = new 
HashMap<>();
                             for (int i = 0; i < 
flow.getAction().getParallelism(); i++) {
+                                long taskGroupId = 
taskGroupIdGenerator.getNextId();
                                 int finalParallelismIndex = i;
-                                long taskGroupID = 
mixIDPrefixAndIndex(taskGroupIDPrefix, i);
                                 TaskGroupLocation taskGroupLocation =
                                         new TaskGroupLocation(
                                                 
jobImmutableInformation.getJobId(),
                                                 pipelineIndex,
-                                                taskGroupID);
+                                                taskGroupId);
+                                AtomicInteger taskInTaskGroupIndex = new 
AtomicInteger(0);
                                 List<SeaTunnelTask> taskList =
                                         flows.stream()
                                                 .map(
                                                         f -> {
                                                             setFlowConfig(f);
-                                                            long taskIDPrefix =
-                                                                    
flowTaskIDPrefixMap
-                                                                            
.computeIfAbsent(
-                                                                               
     f.getFlowID(),
-                                                                               
     id ->
-                                                                               
             idGenerator
-                                                                               
                     .getNextId());
                                                             final TaskLocation 
taskLocation =
                                                                     new 
TaskLocation(
                                                                             
taskGroupLocation,
-                                                                            
taskIDPrefix,
+                                                                            
taskInTaskGroupIndex
+                                                                               
     .getAndIncrement(),
                                                                             
finalParallelismIndex);
                                                             if (f
                                                                     instanceof
@@ -768,10 +749,6 @@ public class PhysicalPlanGenerator {
                         .contains(true);
     }
 
-    private long mixIDPrefixAndIndex(long idPrefix, int index) {
-        return idPrefix * 10000 + index;
-    }
-
     private List<Flow> getNextWrapper(List<ExecutionEdge> edges, Action start) 
{
         List<Action> actions =
                 edges.stream()
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index b65b3cc56a..435cdc1e3c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -162,30 +162,16 @@ public class PhysicalVertex {
         this.currExecutionState = (ExecutionState) 
runningJobStateIMap.get(taskGroupLocation);
 
         this.nodeEngine = nodeEngine;
-        if (log.isDebugEnabled() || log.isTraceEnabled()) {
-            this.taskFullName =
-                    String.format(
-                            "Job %s (%s), Pipeline: [(%d/%d)], task: [%s 
(%d/%d)], taskGroupLocation: [%s]",
-                            jobImmutableInformation.getJobConfig().getName(),
-                            jobImmutableInformation.getJobId(),
-                            pipelineId,
-                            totalPipelineNum,
-                            taskGroup.getTaskGroupName(),
-                            subTaskGroupIndex + 1,
-                            parallelism,
-                            taskGroupLocation);
-        } else {
-            this.taskFullName =
-                    String.format(
-                            "Job %s (%s), Pipeline: [(%d/%d)], task: [%s 
(%d/%d)]",
-                            jobImmutableInformation.getJobConfig().getName(),
-                            jobImmutableInformation.getJobId(),
-                            pipelineId,
-                            totalPipelineNum,
-                            taskGroup.getTaskGroupName(),
-                            subTaskGroupIndex + 1,
-                            parallelism);
-        }
+        this.taskFullName =
+                String.format(
+                        "Job (%s), Pipeline: [(%d/%d)], task: [%s (%d/%d)], 
taskGroupLocation: [%s]",
+                        jobImmutableInformation.getJobId(),
+                        pipelineId,
+                        totalPipelineNum,
+                        taskGroup.getTaskGroupName(),
+                        subTaskGroupIndex + 1,
+                        parallelism,
+                        taskGroupLocation);
 
         this.taskFuture = new CompletableFuture<>();
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
index 5469b55556..0a136742c7 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
@@ -37,16 +37,21 @@ public class TaskLocation implements 
IdentifiedDataSerializable, Serializable {
     private long taskID;
     private int index;
 
+    private static final long SUB_PIPELINE_ID_FACTORY = 10000L * 10000L * 
10000L;
+    private static final long GROUP_ID_FACTOR = 10000L * 10000L;
+    private static final long TASK_GROUP_FACTOR = 10000L;
+
     public TaskLocation() {}
 
-    public TaskLocation(TaskGroupLocation taskGroupLocation, long idPrefix, 
int index) {
+    public TaskLocation(
+            TaskGroupLocation taskGroupLocation, long taskInGroupIndex, int 
taskParallelismIndex) {
         this.taskGroupLocation = taskGroupLocation;
-        this.taskID = mixIDPrefixAndIndex(idPrefix, index);
-        this.index = index;
-    }
-
-    private long mixIDPrefixAndIndex(long idPrefix, int index) {
-        return idPrefix * 10000 + index;
+        this.taskID =
+                taskGroupLocation.getPipelineId() * SUB_PIPELINE_ID_FACTORY
+                        + taskGroupLocation.getTaskGroupId() * GROUP_ID_FACTOR
+                        + taskInGroupIndex * TASK_GROUP_FACTOR
+                        + taskParallelismIndex;
+        this.index = taskParallelismIndex;
     }
 
     public TaskGroupLocation getTaskGroupLocation() {
@@ -66,7 +71,7 @@ public class TaskLocation implements 
IdentifiedDataSerializable, Serializable {
     }
 
     public long getTaskVertexId() {
-        return taskID / 10000;
+        return taskID;
     }
 
     public int getTaskIndex() {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index aa75fa563b..911bdc169d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -50,7 +50,11 @@ import 
org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
 import org.apache.seatunnel.engine.server.TestUtils;
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
 import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
+import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
+import org.apache.seatunnel.engine.server.execution.Task;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -228,6 +232,81 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
                 Sets.newHashSet(new URL("file:///console.jar")));
     }
 
+    @Test
+    public void testTaskGroupAndTaskLocationInfos() {
+        Long jobId = 1L;
+        LogicalDag testLogicalDag =
+                TestUtils.createTestLogicalPlan(
+                        "stream_fake_to_console.conf", "test_task_group_info", 
jobId);
+        JobImmutableInformation jobImmutableInformation =
+                new JobImmutableInformation(
+                        jobId,
+                        "Test",
+                        nodeEngine.getSerializationService(),
+                        testLogicalDag,
+                        Collections.emptyList(),
+                        Collections.emptyList());
+        IMap<Object, Object> runningJobState =
+                
nodeEngine.getHazelcastInstance().getMap("testRunningJobState");
+        IMap<Object, Long[]> runningJobStateTimestamp =
+                
nodeEngine.getHazelcastInstance().getMap("testRunningJobStateTimestamp");
+        PhysicalPlan physicalPlan =
+                PlanUtils.fromLogicalDAG(
+                                testLogicalDag,
+                                nodeEngine,
+                                jobImmutableInformation,
+                                System.currentTimeMillis(),
+                                Executors.newCachedThreadPool(),
+                                server.getClassLoaderService(),
+                                
instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME),
+                                runningJobState,
+                                runningJobStateTimestamp,
+                                QueueType.BLOCKINGQUEUE,
+                                new EngineConfig())
+                        .f0();
+        Assertions.assertEquals(2, physicalPlan.getPipelineList().size());
+        for (int i = 0; i < physicalPlan.getPipelineList().size(); i++) {
+            SubPlan subPlan = physicalPlan.getPipelineList().get(i);
+            int pipelineId = subPlan.getPipelineId();
+
+            for (int j = 0; j < subPlan.getCoordinatorVertexList().size(); 
j++) {
+                PhysicalVertex physicalVertex = 
subPlan.getCoordinatorVertexList().get(j);
+                TaskGroupLocation taskGroupLocation = 
physicalVertex.getTaskGroupLocation();
+                List<Task> physicalTasks =
+                        new 
ArrayList<>(physicalVertex.getTaskGroup().getTasks());
+                for (int taskInGroupIndex = 0;
+                        taskInGroupIndex < physicalTasks.size();
+                        taskInGroupIndex++) {
+                    Task task = physicalTasks.get(taskInGroupIndex);
+                    long expectedTaskId =
+                            pipelineId * 10000L * 10000L * 10000L
+                                    + taskGroupLocation.getTaskGroupId() * 
10000L * 10000L
+                                    + taskInGroupIndex * 10000L;
+                    Assertions.assertEquals(expectedTaskId, task.getTaskID());
+                }
+            }
+
+            for (int j = 0; j < subPlan.getPhysicalVertexList().size(); j++) {
+                PhysicalVertex physicalVertex = 
subPlan.getPhysicalVertexList().get(j);
+                TaskGroupLocation taskGroupLocation = 
physicalVertex.getTaskGroupLocation();
+                List<Task> physicalTasks =
+                        new 
ArrayList<>(physicalVertex.getTaskGroup().getTasks());
+                for (int taskInGroupIndex = 0;
+                        taskInGroupIndex < physicalTasks.size();
+                        taskInGroupIndex++) {
+                    Task task = physicalTasks.get(taskInGroupIndex);
+                    // can't get job parallel index, use prefix check
+                    long expectedTaskIdPrefix =
+                            pipelineId * 10000L * 10000L * 10000L
+                                    + taskGroupLocation.getTaskGroupId() * 
10000L * 10000L
+                                    + taskInGroupIndex * 10000L;
+                    Assertions.assertEquals(
+                            expectedTaskIdPrefix / 10000L, task.getTaskID() / 
10000L);
+                }
+            }
+        }
+    }
+
     private static FakeSource createFakeSource() {
         Config fakeSourceConfig =
                 ConfigFactory.parseMap(

Reply via email to