This is an automated email from the ASF dual-hosted git repository. jarvis pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 67c8b7932d [improve] improve TaskLocation/TaskLocationGroup info (#8862) 67c8b7932d is described below commit 67c8b7932d43df8d242c3139d3a163c6a0ac7501 Author: Jarvis <jar...@apache.org> AuthorDate: Thu Mar 6 21:24:49 2025 +0800 [improve] improve TaskLocation/TaskLocationGroup info (#8862) --- .../server/dag/physical/PhysicalPlanGenerator.java | 57 +++++----------- .../engine/server/dag/physical/PhysicalVertex.java | 34 +++------- .../engine/server/execution/TaskLocation.java | 21 +++--- .../seatunnel/engine/server/dag/TaskTest.java | 79 ++++++++++++++++++++++ 4 files changed, 119 insertions(+), 72 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java index 5bd7e642a3..0bc7e46d5f 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java @@ -91,7 +91,7 @@ public class PhysicalPlanGenerator { private final List<Pipeline> pipelines; - private final IdGenerator idGenerator = new IdGenerator(); + private final IdGenerator taskGroupIdGenerator = new IdGenerator(); private final JobImmutableInformation jobImmutableInformation; @@ -274,15 +274,14 @@ public class PhysicalPlanGenerator { } // if sinkAggregatedCommitter is empty, don't create task. if (sinkAggregatedCommitter.isPresent()) { - long taskGroupID = idGenerator.getNextId(); - long taskTypeId = idGenerator.getNextId(); + long taskGroupID = taskGroupIdGenerator.getNextId(); TaskGroupLocation taskGroupLocation = new TaskGroupLocation( jobImmutableInformation.getJobId(), pipelineIndex, taskGroupID); TaskLocation taskLocation = - new TaskLocation(taskGroupLocation, taskTypeId, 0); + new TaskLocation(taskGroupLocation, 0, 0); SinkAggregatedCommitterTask<?, ?> t = new SinkAggregatedCommitterTask( jobImmutableInformation.getJobId(), @@ -342,6 +341,7 @@ public class PhysicalPlanGenerator { if (shuffleStrategy instanceof ShuffleMultipleRowStrategy) { ShuffleMultipleRowStrategy shuffleMultipleRowStrategy = (ShuffleMultipleRowStrategy) shuffleStrategy; + AtomicInteger atomicInteger = new AtomicInteger(0); for (Flow nextFlow : flow.getNext()) { PhysicalExecutionFlow sinkFlow = (PhysicalExecutionFlow) nextFlow; @@ -349,10 +349,7 @@ public class PhysicalPlanGenerator { String sinkTableId = sinkAction.getConfig().getTablePath().toString(); - long taskIDPrefix = idGenerator.getNextId(); - long taskGroupIDPrefix = idGenerator.getNextId(); - int parallelismIndex = 0; - + int parallelismIndex = atomicInteger.getAndIncrement(); ShuffleStrategy shuffleStrategyOfSinkFlow = shuffleMultipleRowStrategy .toBuilder() @@ -363,7 +360,6 @@ public class PhysicalPlanGenerator { .toBuilder() .shuffleStrategy(shuffleStrategyOfSinkFlow) .build(); - long shuffleActionId = idGenerator.getNextId(); String shuffleActionName = String.format( "%s -> %s -> %s", @@ -372,7 +368,7 @@ public class PhysicalPlanGenerator { sinkAction.getName()); ShuffleAction shuffleActionOfSinkFlow = new ShuffleAction( - shuffleActionId, + parallelismIndex, shuffleActionName, shuffleConfigOfSinkFlow); shuffleActionOfSinkFlow.setParallelism(1); @@ -382,9 +378,7 @@ public class PhysicalPlanGenerator { Collections.singletonList(sinkFlow)); setFlowConfig(shuffleFlow); - long taskGroupID = - mixIDPrefixAndIndex( - taskGroupIDPrefix, parallelismIndex); + long taskGroupID = taskGroupIdGenerator.getNextId(); TaskGroupLocation taskGroupLocation = new TaskGroupLocation( jobImmutableInformation.getJobId(), @@ -392,9 +386,7 @@ public class PhysicalPlanGenerator { taskGroupID); TaskLocation taskLocation = new TaskLocation( - taskGroupLocation, - taskIDPrefix, - parallelismIndex); + taskGroupLocation, 0, parallelismIndex); SeaTunnelTask seaTunnelTask = new TransformSeaTunnelTask( jobImmutableInformation.getJobId(), @@ -428,17 +420,15 @@ public class PhysicalPlanGenerator { runningJobStateTimestampsIMap)); } } else { - long taskIDPrefix = idGenerator.getNextId(); - long taskGroupIDPrefix = idGenerator.getNextId(); for (int i = 0; i < flow.getAction().getParallelism(); i++) { - long taskGroupID = mixIDPrefixAndIndex(taskGroupIDPrefix, i); + long taskGroupID = taskGroupIdGenerator.getNextId(); TaskGroupLocation taskGroupLocation = new TaskGroupLocation( jobImmutableInformation.getJobId(), pipelineIndex, taskGroupID); TaskLocation taskLocation = - new TaskLocation(taskGroupLocation, taskIDPrefix, i); + new TaskLocation(taskGroupLocation, 0, i); setFlowConfig(flow); SeaTunnelTask seaTunnelTask = new TransformSeaTunnelTask( @@ -483,15 +473,13 @@ public class PhysicalPlanGenerator { return sources.stream() .map( sourceAction -> { - long taskGroupID = idGenerator.getNextId(); - long taskTypeId = idGenerator.getNextId(); + long taskGroupID = taskGroupIdGenerator.getNextId(); TaskGroupLocation taskGroupLocation = new TaskGroupLocation( jobImmutableInformation.getJobId(), pipelineIndex, taskGroupID); - TaskLocation taskLocation = - new TaskLocation(taskGroupLocation, taskTypeId, 0); + TaskLocation taskLocation = new TaskLocation(taskGroupLocation, 0, 0); SourceSplitEnumeratorTask<?> t = new SourceSplitEnumeratorTask<>( jobImmutableInformation.getJobId(), @@ -541,32 +529,25 @@ public class PhysicalPlanGenerator { if (sourceWithSink(flow)) { flows.addAll(splitSinkFromFlow(flow)); } - long taskGroupIDPrefix = idGenerator.getNextId(); - Map<Long, Long> flowTaskIDPrefixMap = new HashMap<>(); for (int i = 0; i < flow.getAction().getParallelism(); i++) { + long taskGroupId = taskGroupIdGenerator.getNextId(); int finalParallelismIndex = i; - long taskGroupID = mixIDPrefixAndIndex(taskGroupIDPrefix, i); TaskGroupLocation taskGroupLocation = new TaskGroupLocation( jobImmutableInformation.getJobId(), pipelineIndex, - taskGroupID); + taskGroupId); + AtomicInteger taskInTaskGroupIndex = new AtomicInteger(0); List<SeaTunnelTask> taskList = flows.stream() .map( f -> { setFlowConfig(f); - long taskIDPrefix = - flowTaskIDPrefixMap - .computeIfAbsent( - f.getFlowID(), - id -> - idGenerator - .getNextId()); final TaskLocation taskLocation = new TaskLocation( taskGroupLocation, - taskIDPrefix, + taskInTaskGroupIndex + .getAndIncrement(), finalParallelismIndex); if (f instanceof @@ -768,10 +749,6 @@ public class PhysicalPlanGenerator { .contains(true); } - private long mixIDPrefixAndIndex(long idPrefix, int index) { - return idPrefix * 10000 + index; - } - private List<Flow> getNextWrapper(List<ExecutionEdge> edges, Action start) { List<Action> actions = edges.stream() diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java index b65b3cc56a..435cdc1e3c 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java @@ -162,30 +162,16 @@ public class PhysicalVertex { this.currExecutionState = (ExecutionState) runningJobStateIMap.get(taskGroupLocation); this.nodeEngine = nodeEngine; - if (log.isDebugEnabled() || log.isTraceEnabled()) { - this.taskFullName = - String.format( - "Job %s (%s), Pipeline: [(%d/%d)], task: [%s (%d/%d)], taskGroupLocation: [%s]", - jobImmutableInformation.getJobConfig().getName(), - jobImmutableInformation.getJobId(), - pipelineId, - totalPipelineNum, - taskGroup.getTaskGroupName(), - subTaskGroupIndex + 1, - parallelism, - taskGroupLocation); - } else { - this.taskFullName = - String.format( - "Job %s (%s), Pipeline: [(%d/%d)], task: [%s (%d/%d)]", - jobImmutableInformation.getJobConfig().getName(), - jobImmutableInformation.getJobId(), - pipelineId, - totalPipelineNum, - taskGroup.getTaskGroupName(), - subTaskGroupIndex + 1, - parallelism); - } + this.taskFullName = + String.format( + "Job (%s), Pipeline: [(%d/%d)], task: [%s (%d/%d)], taskGroupLocation: [%s]", + jobImmutableInformation.getJobId(), + pipelineId, + totalPipelineNum, + taskGroup.getTaskGroupName(), + subTaskGroupIndex + 1, + parallelism, + taskGroupLocation); this.taskFuture = new CompletableFuture<>(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java index 5469b55556..0a136742c7 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java @@ -37,16 +37,21 @@ public class TaskLocation implements IdentifiedDataSerializable, Serializable { private long taskID; private int index; + private static final long SUB_PIPELINE_ID_FACTORY = 10000L * 10000L * 10000L; + private static final long GROUP_ID_FACTOR = 10000L * 10000L; + private static final long TASK_GROUP_FACTOR = 10000L; + public TaskLocation() {} - public TaskLocation(TaskGroupLocation taskGroupLocation, long idPrefix, int index) { + public TaskLocation( + TaskGroupLocation taskGroupLocation, long taskInGroupIndex, int taskParallelismIndex) { this.taskGroupLocation = taskGroupLocation; - this.taskID = mixIDPrefixAndIndex(idPrefix, index); - this.index = index; - } - - private long mixIDPrefixAndIndex(long idPrefix, int index) { - return idPrefix * 10000 + index; + this.taskID = + taskGroupLocation.getPipelineId() * SUB_PIPELINE_ID_FACTORY + + taskGroupLocation.getTaskGroupId() * GROUP_ID_FACTOR + + taskInGroupIndex * TASK_GROUP_FACTOR + + taskParallelismIndex; + this.index = taskParallelismIndex; } public TaskGroupLocation getTaskGroupLocation() { @@ -66,7 +71,7 @@ public class TaskLocation implements IdentifiedDataSerializable, Serializable { } public long getTaskVertexId() { - return taskID / 10000; + return taskID; } public int getTaskIndex() { diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java index aa75fa563b..911bdc169d 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java @@ -50,7 +50,11 @@ import org.apache.seatunnel.engine.core.job.JobImmutableInformation; import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest; import org.apache.seatunnel.engine.server.TestUtils; import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan; +import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex; import org.apache.seatunnel.engine.server.dag.physical.PlanUtils; +import org.apache.seatunnel.engine.server.dag.physical.SubPlan; +import org.apache.seatunnel.engine.server.execution.Task; +import org.apache.seatunnel.engine.server.execution.TaskGroupLocation; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -228,6 +232,81 @@ public class TaskTest extends AbstractSeaTunnelServerTest { Sets.newHashSet(new URL("file:///console.jar"))); } + @Test + public void testTaskGroupAndTaskLocationInfos() { + Long jobId = 1L; + LogicalDag testLogicalDag = + TestUtils.createTestLogicalPlan( + "stream_fake_to_console.conf", "test_task_group_info", jobId); + JobImmutableInformation jobImmutableInformation = + new JobImmutableInformation( + jobId, + "Test", + nodeEngine.getSerializationService(), + testLogicalDag, + Collections.emptyList(), + Collections.emptyList()); + IMap<Object, Object> runningJobState = + nodeEngine.getHazelcastInstance().getMap("testRunningJobState"); + IMap<Object, Long[]> runningJobStateTimestamp = + nodeEngine.getHazelcastInstance().getMap("testRunningJobStateTimestamp"); + PhysicalPlan physicalPlan = + PlanUtils.fromLogicalDAG( + testLogicalDag, + nodeEngine, + jobImmutableInformation, + System.currentTimeMillis(), + Executors.newCachedThreadPool(), + server.getClassLoaderService(), + instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME), + runningJobState, + runningJobStateTimestamp, + QueueType.BLOCKINGQUEUE, + new EngineConfig()) + .f0(); + Assertions.assertEquals(2, physicalPlan.getPipelineList().size()); + for (int i = 0; i < physicalPlan.getPipelineList().size(); i++) { + SubPlan subPlan = physicalPlan.getPipelineList().get(i); + int pipelineId = subPlan.getPipelineId(); + + for (int j = 0; j < subPlan.getCoordinatorVertexList().size(); j++) { + PhysicalVertex physicalVertex = subPlan.getCoordinatorVertexList().get(j); + TaskGroupLocation taskGroupLocation = physicalVertex.getTaskGroupLocation(); + List<Task> physicalTasks = + new ArrayList<>(physicalVertex.getTaskGroup().getTasks()); + for (int taskInGroupIndex = 0; + taskInGroupIndex < physicalTasks.size(); + taskInGroupIndex++) { + Task task = physicalTasks.get(taskInGroupIndex); + long expectedTaskId = + pipelineId * 10000L * 10000L * 10000L + + taskGroupLocation.getTaskGroupId() * 10000L * 10000L + + taskInGroupIndex * 10000L; + Assertions.assertEquals(expectedTaskId, task.getTaskID()); + } + } + + for (int j = 0; j < subPlan.getPhysicalVertexList().size(); j++) { + PhysicalVertex physicalVertex = subPlan.getPhysicalVertexList().get(j); + TaskGroupLocation taskGroupLocation = physicalVertex.getTaskGroupLocation(); + List<Task> physicalTasks = + new ArrayList<>(physicalVertex.getTaskGroup().getTasks()); + for (int taskInGroupIndex = 0; + taskInGroupIndex < physicalTasks.size(); + taskInGroupIndex++) { + Task task = physicalTasks.get(taskInGroupIndex); + // can't get job parallel index, use prefix check + long expectedTaskIdPrefix = + pipelineId * 10000L * 10000L * 10000L + + taskGroupLocation.getTaskGroupId() * 10000L * 10000L + + taskInGroupIndex * 10000L; + Assertions.assertEquals( + expectedTaskIdPrefix / 10000L, task.getTaskID() / 10000L); + } + } + } + } + private static FakeSource createFakeSource() { Config fakeSourceConfig = ConfigFactory.parseMap(