GJL commented on a change in pull request #8928: [FLINK-12997][coordination] Release partitions when a vertex is reset URL: https://github.com/apache/flink/pull/8928#discussion_r298966001
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ########## @@ -537,4 +529,137 @@ public static void verifyGeneratedExecutionJobVertex( subtaskIndex++; } } + + /** + * Builder for {@link ExecutionGraph}. + */ + public static class TestingExecutionGraphBuilder { + + private ShuffleMaster<?> shuffleMaster = NettyShuffleMaster.INSTANCE; + private Time allocationTimeout = Time.seconds(10L); + private BlobWriter blobWriter = VoidBlobWriter.getInstance(); + private MetricGroup metricGroup = new UnregisteredMetricsGroup(); + private RestartStrategy restartStrategy = new NoRestartStrategy(); + private Time rpcTimeout = AkkaUtils.getDefaultTimeout(); + private CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory(); + private ClassLoader classLoader = getClass().getClassLoader(); + private SlotProvider slotProvider = new TestingSlotProvider(slotRequestId -> CompletableFuture.completedFuture(new TestingLogicalSlot())); + private Executor ioExecutor = TestingUtils.defaultExecutor(); + private ScheduledExecutorService futureExecutor = TestingUtils.defaultExecutor(); + private Configuration jobMasterConfig = new Configuration(); + private ExecutionGraph priorExecutionGraph = null; + private JobGraph jobGraph; + private PartitionTracker partitionTracker = NoOpPartitionTracker.INSTANCE; + + public TestingExecutionGraphBuilder(final JobVertex ... jobVertices) { + this(new JobID(), "test job", jobVertices); + } + + public TestingExecutionGraphBuilder(final JobID jobId, final JobVertex ... jobVertices) { + this(jobId, "test job", jobVertices); + } + + public TestingExecutionGraphBuilder(final String jobName, final JobVertex ... jobVertices) { + this(new JobID(), jobName, jobVertices); + } + + public TestingExecutionGraphBuilder(final JobID jobId, final String jobName, final JobVertex ... jobVertices) { + this(new JobGraph(jobId, jobName, jobVertices)); + } + + public TestingExecutionGraphBuilder(final JobGraph jobGraph) { + this.jobGraph = jobGraph; + } + + public TestingExecutionGraphBuilder setPriorExecutionGraph(final ExecutionGraph priorExecutionGraph) { + this.priorExecutionGraph = priorExecutionGraph; + return this; + } + + public TestingExecutionGraphBuilder setJobMasterConfig(final Configuration jobMasterConfig) { + this.jobMasterConfig = jobMasterConfig; + return this; + } + + public TestingExecutionGraphBuilder setFutureExecutor(final ScheduledExecutorService futureExecutor) { + this.futureExecutor = futureExecutor; + return this; + } + + public TestingExecutionGraphBuilder setIoExecutor(final Executor ioExecutor) { + this.ioExecutor = ioExecutor; + return this; + } + + public TestingExecutionGraphBuilder setSlotProvider(final SlotProvider slotProvider) { + this.slotProvider = slotProvider; + return this; + } + + public TestingExecutionGraphBuilder setClassLoader(final ClassLoader classLoader) { + this.classLoader = classLoader; + return this; + } + + public TestingExecutionGraphBuilder setCheckpointRecoveryFactory(final CheckpointRecoveryFactory checkpointRecoveryFactory) { + this.checkpointRecoveryFactory = checkpointRecoveryFactory; + return this; + } + + public TestingExecutionGraphBuilder setRpcTimeout(final Time rpcTimeout) { + this.rpcTimeout = rpcTimeout; + return this; + } + + public TestingExecutionGraphBuilder setRestartStrategy(final RestartStrategy restartStrategy) { + this.restartStrategy = restartStrategy; + return this; + } + + public TestingExecutionGraphBuilder setMetricGroup(final MetricGroup metricGroup) { + this.metricGroup = metricGroup; + return this; + } + + public TestingExecutionGraphBuilder setBlobWriter(final BlobWriter blobWriter) { + this.blobWriter = blobWriter; + return this; + } + + public TestingExecutionGraphBuilder setAllocationTimeout(final Time allocationTimeout) { + this.allocationTimeout = allocationTimeout; + return this; + } + + public TestingExecutionGraphBuilder setShuffleMaster(final ShuffleMaster<?> shuffleMaster) { + this.shuffleMaster = shuffleMaster; + return this; + } + + public TestingExecutionGraphBuilder setPartitionTracker(final PartitionTracker partitionTracker) { + this.partitionTracker = partitionTracker; + return this; + } + + public ExecutionGraph build() throws JobException, JobExecutionException { + return ExecutionGraphBuilder.buildGraph( + priorExecutionGraph, Review comment: Not sure if we need to include `priorExecutionGraph` in the builder. I cannot find a usage of `buildGraph()` where `prior` is non-null. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services