zhuzhurk commented on a change in pull request #18462: URL: https://github.com/apache/flink/pull/18462#discussion_r791884986
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java ########## @@ -190,10 +191,24 @@ private void maybeSetParallelism(final ExecutionJobVertex jobVertex) { jobVertex.getName(), parallelism); - jobVertex.setParallelism(parallelism); + changeJobVertexParallelism(jobVertex, parallelism); } } + private void changeJobVertexParallelism(ExecutionJobVertex jobVertex, int parallelism) { + // update PlanJson Review comment: Would you add some comments for why we need to update the plan json? e.g. "it is needed to enable REST APIs to return the latest parallelism of job vertices." ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ########## @@ -676,7 +727,7 @@ private void notifyCoordinatorOfCancellation(ExecutionVertex vertex) { @Override public ResourceProfile getResourceProfile(final ExecutionVertexID executionVertexId) { - return getExecutionVertex(executionVertexId).getResourceProfile(); + return getExecutionJobVertex(executionVertexId.getJobVertexId()).getResourceProfile(); Review comment: What's this change for? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java ########## @@ -762,6 +762,11 @@ public void setInternalTaskFailuresListener( // Actions // -------------------------------------------------------------------------------------------- + @Override + public void notifyNewJobVertexInitialized(List<ExecutionJobVertex> vertices) { Review comment: maybe `notifyNewlyInitializedJobVertices()`? because the job vertices are just newly initialized rather than newly added. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java ########## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.blob.BlobWriter; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.executiongraph.JobStatusListener; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategyFactoryLoader; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategyFactoryLoader; +import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobType; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl; +import org.apache.flink.runtime.jobmaster.slotpool.SlotPool; +import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService; +import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory; +import org.apache.flink.runtime.scheduler.DefaultExecutionVertexOperations; +import org.apache.flink.runtime.scheduler.ExecutionGraphFactory; +import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner; +import org.apache.flink.runtime.scheduler.SchedulerNG; +import org.apache.flink.runtime.scheduler.SchedulerNGFactory; +import org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocatorFactory; +import org.apache.flink.runtime.scheduler.strategy.VertexwiseSchedulingStrategy; +import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.runtime.util.SlotSelectionStrategyUtils; +import org.apache.flink.util.clock.SystemClock; +import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter; + +import org.slf4j.Logger; + +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; + +import static org.apache.flink.util.Preconditions.checkState; + +/** Factory for {@link AdaptiveBatchScheduler}. */ +public class AdaptiveBatchSchedulerFactory implements SchedulerNGFactory { + + @Override + public SchedulerNG createInstance( + Logger log, + JobGraph jobGraph, + Executor ioExecutor, + Configuration jobMasterConfiguration, + SlotPoolService slotPoolService, + ScheduledExecutorService futureExecutor, + ClassLoader userCodeLoader, + CheckpointRecoveryFactory checkpointRecoveryFactory, + Time rpcTimeout, + BlobWriter blobWriter, + JobManagerJobMetricGroup jobManagerJobMetricGroup, + Time slotRequestTimeout, + ShuffleMaster<?> shuffleMaster, + JobMasterPartitionTracker partitionTracker, + ExecutionDeploymentTracker executionDeploymentTracker, + long initializationTimestamp, + ComponentMainThreadExecutor mainThreadExecutor, + FatalErrorHandler fatalErrorHandler, + JobStatusListener jobStatusListener) + throws Exception { + + checkState( + jobGraph.getJobType() == JobType.BATCH, + "Adaptive batch scheduler only supports batch jobs"); + checkIsAllBlockingGraph(jobGraph); + + final SlotPool slotPool = + slotPoolService + .castInto(SlotPool.class) + .orElseThrow( + () -> + new IllegalStateException( + "The DefaultScheduler requires a SlotPool.")); + + final SlotSelectionStrategy slotSelectionStrategy = + SlotSelectionStrategyUtils.selectSlotSelectionStrategy( + JobType.BATCH, jobMasterConfiguration); + final PhysicalSlotRequestBulkChecker bulkChecker = + PhysicalSlotRequestBulkCheckerImpl.createFromSlotPool( + slotPool, SystemClock.getInstance()); + final PhysicalSlotProvider physicalSlotProvider = + new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool); + final ExecutionSlotAllocatorFactory allocatorFactory = + new SlotSharingExecutionSlotAllocatorFactory( + physicalSlotProvider, false, bulkChecker, slotRequestTimeout); + + final RestartBackoffTimeStrategy restartBackoffTimeStrategy = + RestartBackoffTimeStrategyFactoryLoader.createRestartBackoffTimeStrategyFactory( + jobGraph.getSerializedExecutionConfig() + .deserializeValue(userCodeLoader) + .getRestartStrategy(), + jobMasterConfiguration, + jobGraph.isCheckpointingEnabled()) + .create(); + log.info( + "Using restart back off time strategy {} for {} ({}).", + restartBackoffTimeStrategy, + jobGraph.getName(), + jobGraph.getJobID()); + + final ExecutionGraphFactory executionGraphFactory = + new DefaultExecutionGraphFactory( + jobMasterConfiguration, + userCodeLoader, + executionDeploymentTracker, + futureExecutor, + ioExecutor, + rpcTimeout, + jobManagerJobMetricGroup, + blobWriter, + shuffleMaster, + partitionTracker, + true); + + return new AdaptiveBatchScheduler( + log, + jobGraph, + ioExecutor, + jobMasterConfiguration, + bulkChecker::start, + new ScheduledExecutorServiceAdapter(futureExecutor), + userCodeLoader, + new CheckpointsCleaner(), + checkpointRecoveryFactory, + jobManagerJobMetricGroup, + new VertexwiseSchedulingStrategy.Factory(), + FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration), + restartBackoffTimeStrategy, + new DefaultExecutionVertexOperations(), + new ExecutionVertexVersioner(), + allocatorFactory, + initializationTimestamp, + mainThreadExecutor, + jobStatusListener, + executionGraphFactory, + shuffleMaster, + rpcTimeout, + DefaultVertexParallelismDecider.from(jobMasterConfiguration), + jobMasterConfiguration.getInteger( + JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM)); + } + + private void checkIsAllBlockingGraph(final JobGraph jobGraph) { + for (JobVertex jobVertex : jobGraph.getVertices()) { + for (IntermediateDataSet dataSet : jobVertex.getProducedDataSets()) { + checkState( + dataSet.getResultType().isBlocking(), + "Adaptive batch scheduler currently only supports ALL_EDGES_BLOCKING jobs."); Review comment: Let's refine the message to point users to the config `execution.batch-shuffle-mode` and the required value should be `ALL_EXCHANGES_BLOCKING`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/util/SlotSelectionStrategyUtils.java ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.ClusterOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobType; +import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy; +import org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSlotSelectionStrategy; +import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SlotSelectionStrategyUtils { Review comment: java docs is needed ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java ########## @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.executiongraph.JobStatusListener; +import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy; +import org.apache.flink.runtime.jobgraph.JobEdge; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalResult; +import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology; +import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalVertex; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.scheduler.DefaultScheduler; +import org.apache.flink.runtime.scheduler.ExecutionGraphFactory; +import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory; +import org.apache.flink.runtime.scheduler.ExecutionVertexOperations; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner; +import org.apache.flink.runtime.scheduler.SchedulerOperations; +import org.apache.flink.runtime.scheduler.VertexParallelismStore; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory; +import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.util.concurrent.ScheduledExecutor; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.function.Consumer; +import java.util.function.Function; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This scheduler decides the parallelism of JobVertex according to the data volume it consumes. A + * dynamically built up ExecutionGraph is used for this purpose. + */ +public class AdaptiveBatchScheduler extends DefaultScheduler implements SchedulerOperations { + + private final Logger log; Review comment: It is not needed because there is a `DefaultScheduler#log` already. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java ########## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.blob.BlobWriter; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.executiongraph.JobStatusListener; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategyFactoryLoader; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategyFactoryLoader; +import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobType; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl; +import org.apache.flink.runtime.jobmaster.slotpool.SlotPool; +import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService; +import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory; +import org.apache.flink.runtime.scheduler.DefaultExecutionVertexOperations; +import org.apache.flink.runtime.scheduler.ExecutionGraphFactory; +import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner; +import org.apache.flink.runtime.scheduler.SchedulerNG; +import org.apache.flink.runtime.scheduler.SchedulerNGFactory; +import org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocatorFactory; +import org.apache.flink.runtime.scheduler.strategy.VertexwiseSchedulingStrategy; +import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.runtime.util.SlotSelectionStrategyUtils; +import org.apache.flink.util.clock.SystemClock; +import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter; + +import org.slf4j.Logger; + +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; + +import static org.apache.flink.util.Preconditions.checkState; + +/** Factory for {@link AdaptiveBatchScheduler}. */ +public class AdaptiveBatchSchedulerFactory implements SchedulerNGFactory { + + @Override + public SchedulerNG createInstance( + Logger log, + JobGraph jobGraph, + Executor ioExecutor, + Configuration jobMasterConfiguration, + SlotPoolService slotPoolService, + ScheduledExecutorService futureExecutor, + ClassLoader userCodeLoader, + CheckpointRecoveryFactory checkpointRecoveryFactory, + Time rpcTimeout, + BlobWriter blobWriter, + JobManagerJobMetricGroup jobManagerJobMetricGroup, + Time slotRequestTimeout, + ShuffleMaster<?> shuffleMaster, + JobMasterPartitionTracker partitionTracker, + ExecutionDeploymentTracker executionDeploymentTracker, + long initializationTimestamp, + ComponentMainThreadExecutor mainThreadExecutor, + FatalErrorHandler fatalErrorHandler, + JobStatusListener jobStatusListener) + throws Exception { + + checkState( + jobGraph.getJobType() == JobType.BATCH, + "Adaptive batch scheduler only supports batch jobs"); + checkIsAllBlockingGraph(jobGraph); + + final SlotPool slotPool = + slotPoolService + .castInto(SlotPool.class) + .orElseThrow( + () -> + new IllegalStateException( + "The DefaultScheduler requires a SlotPool.")); + + final SlotSelectionStrategy slotSelectionStrategy = + SlotSelectionStrategyUtils.selectSlotSelectionStrategy( + JobType.BATCH, jobMasterConfiguration); + final PhysicalSlotRequestBulkChecker bulkChecker = + PhysicalSlotRequestBulkCheckerImpl.createFromSlotPool( + slotPool, SystemClock.getInstance()); + final PhysicalSlotProvider physicalSlotProvider = + new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool); + final ExecutionSlotAllocatorFactory allocatorFactory = + new SlotSharingExecutionSlotAllocatorFactory( + physicalSlotProvider, false, bulkChecker, slotRequestTimeout); + + final RestartBackoffTimeStrategy restartBackoffTimeStrategy = + RestartBackoffTimeStrategyFactoryLoader.createRestartBackoffTimeStrategyFactory( + jobGraph.getSerializedExecutionConfig() + .deserializeValue(userCodeLoader) + .getRestartStrategy(), + jobMasterConfiguration, + jobGraph.isCheckpointingEnabled()) + .create(); + log.info( + "Using restart back off time strategy {} for {} ({}).", + restartBackoffTimeStrategy, + jobGraph.getName(), + jobGraph.getJobID()); + + final ExecutionGraphFactory executionGraphFactory = + new DefaultExecutionGraphFactory( + jobMasterConfiguration, + userCodeLoader, + executionDeploymentTracker, + futureExecutor, + ioExecutor, + rpcTimeout, + jobManagerJobMetricGroup, + blobWriter, + shuffleMaster, + partitionTracker, + true); + + return new AdaptiveBatchScheduler( + log, + jobGraph, + ioExecutor, + jobMasterConfiguration, + bulkChecker::start, + new ScheduledExecutorServiceAdapter(futureExecutor), + userCodeLoader, + new CheckpointsCleaner(), + checkpointRecoveryFactory, + jobManagerJobMetricGroup, + new VertexwiseSchedulingStrategy.Factory(), + FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration), + restartBackoffTimeStrategy, + new DefaultExecutionVertexOperations(), + new ExecutionVertexVersioner(), + allocatorFactory, + initializationTimestamp, + mainThreadExecutor, + jobStatusListener, + executionGraphFactory, + shuffleMaster, + rpcTimeout, + DefaultVertexParallelismDecider.from(jobMasterConfiguration), + jobMasterConfiguration.getInteger( + JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM)); + } + + private void checkIsAllBlockingGraph(final JobGraph jobGraph) { Review comment: maybe `checkAllExchangesBlocking`? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java ########## @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.executiongraph.JobStatusListener; +import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy; +import org.apache.flink.runtime.jobgraph.JobEdge; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalResult; +import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology; +import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalVertex; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.scheduler.DefaultScheduler; +import org.apache.flink.runtime.scheduler.ExecutionGraphFactory; +import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory; +import org.apache.flink.runtime.scheduler.ExecutionVertexOperations; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner; +import org.apache.flink.runtime.scheduler.SchedulerOperations; +import org.apache.flink.runtime.scheduler.VertexParallelismStore; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory; +import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.util.concurrent.ScheduledExecutor; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.function.Consumer; +import java.util.function.Function; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This scheduler decides the parallelism of JobVertex according to the data volume it consumes. A + * dynamically built up ExecutionGraph is used for this purpose. + */ +public class AdaptiveBatchScheduler extends DefaultScheduler implements SchedulerOperations { + + private final Logger log; + + private final DefaultLogicalTopology logicalTopology; + + private final VertexParallelismDecider vertexParallelismDecider; + + AdaptiveBatchScheduler( + final Logger log, + final JobGraph jobGraph, + final Executor ioExecutor, + final Configuration jobMasterConfiguration, + final Consumer<ComponentMainThreadExecutor> startUpAction, + final ScheduledExecutor delayExecutor, + final ClassLoader userCodeLoader, + final CheckpointsCleaner checkpointsCleaner, + final CheckpointRecoveryFactory checkpointRecoveryFactory, + final JobManagerJobMetricGroup jobManagerJobMetricGroup, + final SchedulingStrategyFactory schedulingStrategyFactory, + final FailoverStrategy.Factory failoverStrategyFactory, + final RestartBackoffTimeStrategy restartBackoffTimeStrategy, + final ExecutionVertexOperations executionVertexOperations, + final ExecutionVertexVersioner executionVertexVersioner, + final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory, + long initializationTimestamp, + final ComponentMainThreadExecutor mainThreadExecutor, + final JobStatusListener jobStatusListener, + final ExecutionGraphFactory executionGraphFactory, + final ShuffleMaster<?> shuffleMaster, + final Time rpcTimeout, + final VertexParallelismDecider vertexParallelismDecider, + int defaultMaxParallelism) + throws Exception { + + super( + log, + jobGraph, + ioExecutor, + jobMasterConfiguration, + startUpAction, + delayExecutor, + userCodeLoader, + checkpointsCleaner, + checkpointRecoveryFactory, + jobManagerJobMetricGroup, + schedulingStrategyFactory, + failoverStrategyFactory, + restartBackoffTimeStrategy, + executionVertexOperations, + executionVertexVersioner, + executionSlotAllocatorFactory, + initializationTimestamp, + mainThreadExecutor, + jobStatusListener, + executionGraphFactory, + shuffleMaster, + rpcTimeout, + computeVertexParallelismStoreForDynamicGraph( + jobGraph.getVertices(), defaultMaxParallelism)); + + this.log = log; + + this.logicalTopology = DefaultLogicalTopology.fromJobGraph(jobGraph); + + this.vertexParallelismDecider = vertexParallelismDecider; + } + + @Override + public void startSchedulingInternal() { + initializeVerticesIfPossible(); + + super.startSchedulingInternal(); + } + + @Override + protected void updateTaskExecutionStateInternal( + final ExecutionVertexID executionVertexId, + final TaskExecutionStateTransition taskExecutionState) { + + initializeVerticesIfPossible(); + + super.updateTaskExecutionStateInternal(executionVertexId, taskExecutionState); + } + + private void initializeVerticesIfPossible() { + final List<ExecutionJobVertex> newlyInitializedJobVertices = new ArrayList<>(); + try { + final long createTimestamp = System.currentTimeMillis(); + for (ExecutionJobVertex jobVertex : getExecutionGraph().getVerticesTopologically()) { + maybeSetParallelism(jobVertex); + } + for (ExecutionJobVertex jobVertex : getExecutionGraph().getVerticesTopologically()) { + if (canInitialize(jobVertex)) { + getExecutionGraph().initializeJobVertex(jobVertex, createTimestamp); + newlyInitializedJobVertices.add(jobVertex); + } + } + } catch (JobException ex) { + log.error("Unexpected error occurred when initializing ExecutionJobVertex", ex); + failJob(ex, System.currentTimeMillis()); + } + + if (newlyInitializedJobVertices.size() > 0) { + updateTopology(newlyInitializedJobVertices); + } + } + + private void maybeSetParallelism(final ExecutionJobVertex jobVertex) { + if (jobVertex.isParallelismDecided()) { + return; + } + + List<BlockingResultInfo> consumedResultsInfo = tryGetConsumedResultsInfo(jobVertex); + if (consumedResultsInfo != null) { + int parallelism = + vertexParallelismDecider.decideParallelismForVertex(consumedResultsInfo); + + log.info( + "JobVertex: {} parallelism is decided as: {}.", + jobVertex.getName(), + parallelism); + + jobVertex.setParallelism(parallelism); + } + } + + /** Get information of consumable results. */ + @Nullable + private List<BlockingResultInfo> tryGetConsumedResultsInfo(final ExecutionJobVertex jobVertex) { Review comment: According to previous discussion in Flink dev ML, it's better to use `Optional` as return value than `Nullable` ones. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java ########## @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.executiongraph.JobStatusListener; +import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy; +import org.apache.flink.runtime.jobgraph.JobEdge; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalResult; +import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology; +import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalVertex; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.scheduler.DefaultScheduler; +import org.apache.flink.runtime.scheduler.ExecutionGraphFactory; +import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory; +import org.apache.flink.runtime.scheduler.ExecutionVertexOperations; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner; +import org.apache.flink.runtime.scheduler.SchedulerOperations; +import org.apache.flink.runtime.scheduler.VertexParallelismStore; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory; +import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.util.concurrent.ScheduledExecutor; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.function.Consumer; +import java.util.function.Function; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This scheduler decides the parallelism of JobVertex according to the data volume it consumes. A + * dynamically built up ExecutionGraph is used for this purpose. + */ +public class AdaptiveBatchScheduler extends DefaultScheduler implements SchedulerOperations { + + private final Logger log; + + private final DefaultLogicalTopology logicalTopology; + + private final VertexParallelismDecider vertexParallelismDecider; + + AdaptiveBatchScheduler( + final Logger log, + final JobGraph jobGraph, + final Executor ioExecutor, + final Configuration jobMasterConfiguration, + final Consumer<ComponentMainThreadExecutor> startUpAction, + final ScheduledExecutor delayExecutor, + final ClassLoader userCodeLoader, + final CheckpointsCleaner checkpointsCleaner, + final CheckpointRecoveryFactory checkpointRecoveryFactory, + final JobManagerJobMetricGroup jobManagerJobMetricGroup, + final SchedulingStrategyFactory schedulingStrategyFactory, + final FailoverStrategy.Factory failoverStrategyFactory, + final RestartBackoffTimeStrategy restartBackoffTimeStrategy, + final ExecutionVertexOperations executionVertexOperations, + final ExecutionVertexVersioner executionVertexVersioner, + final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory, + long initializationTimestamp, + final ComponentMainThreadExecutor mainThreadExecutor, + final JobStatusListener jobStatusListener, + final ExecutionGraphFactory executionGraphFactory, + final ShuffleMaster<?> shuffleMaster, + final Time rpcTimeout, + final VertexParallelismDecider vertexParallelismDecider, + int defaultMaxParallelism) + throws Exception { + + super( + log, + jobGraph, + ioExecutor, + jobMasterConfiguration, + startUpAction, + delayExecutor, + userCodeLoader, + checkpointsCleaner, + checkpointRecoveryFactory, + jobManagerJobMetricGroup, + schedulingStrategyFactory, + failoverStrategyFactory, + restartBackoffTimeStrategy, + executionVertexOperations, + executionVertexVersioner, + executionSlotAllocatorFactory, + initializationTimestamp, + mainThreadExecutor, + jobStatusListener, + executionGraphFactory, + shuffleMaster, + rpcTimeout, + computeVertexParallelismStoreForDynamicGraph( + jobGraph.getVertices(), defaultMaxParallelism)); + + this.log = log; + + this.logicalTopology = DefaultLogicalTopology.fromJobGraph(jobGraph); + + this.vertexParallelismDecider = vertexParallelismDecider; + } + + @Override + public void startSchedulingInternal() { + initializeVerticesIfPossible(); + + super.startSchedulingInternal(); + } + + @Override + protected void updateTaskExecutionStateInternal( + final ExecutionVertexID executionVertexId, + final TaskExecutionStateTransition taskExecutionState) { + + initializeVerticesIfPossible(); + + super.updateTaskExecutionStateInternal(executionVertexId, taskExecutionState); + } + + private void initializeVerticesIfPossible() { + final List<ExecutionJobVertex> newlyInitializedJobVertices = new ArrayList<>(); + try { + final long createTimestamp = System.currentTimeMillis(); + for (ExecutionJobVertex jobVertex : getExecutionGraph().getVerticesTopologically()) { + maybeSetParallelism(jobVertex); + } + for (ExecutionJobVertex jobVertex : getExecutionGraph().getVerticesTopologically()) { + if (canInitialize(jobVertex)) { + getExecutionGraph().initializeJobVertex(jobVertex, createTimestamp); + newlyInitializedJobVertices.add(jobVertex); + } + } + } catch (JobException ex) { + log.error("Unexpected error occurred when initializing ExecutionJobVertex", ex); + failJob(ex, System.currentTimeMillis()); + } + + if (newlyInitializedJobVertices.size() > 0) { + updateTopology(newlyInitializedJobVertices); + } + } + + private void maybeSetParallelism(final ExecutionJobVertex jobVertex) { + if (jobVertex.isParallelismDecided()) { + return; + } + + List<BlockingResultInfo> consumedResultsInfo = tryGetConsumedResultsInfo(jobVertex); + if (consumedResultsInfo != null) { + int parallelism = + vertexParallelismDecider.decideParallelismForVertex(consumedResultsInfo); + + log.info( + "JobVertex: {} parallelism is decided as: {}.", Review comment: maybe refine the log a bit? "Parallelism of JobVertex: {} ({}) is decided to be {}.", jobVertex.getName(), jobVertex.getJobVertexId(), parallelism ID can be useful to distinguish vertices with the same name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org