azagrebin commented on a change in pull request #8386: [FLINK-12181][Tests] Port ExecutionGraphRestartTest to new codebase URL: https://github.com/apache/flink/pull/8386#discussion_r285550467
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ########## @@ -733,79 +658,114 @@ public void testFailureWhileRestarting() throws Exception { // Utilities // ------------------------------------------------------------------------ - private Scheduler createSchedulerWithInstances(int num, TaskManagerGateway taskManagerGateway) { - final Scheduler scheduler = new Scheduler(executor); - final Instance[] instances = new Instance[num]; + private static class TestingExecutionGraphBuilder { + private RestartStrategy restartStrategy = new NoRestartStrategy(); + private JobGraph jobGraph = createJobGraph(); + private int tasksNum = NUM_TASKS; + private TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); - for (int i = 0; i < instances.length; i++) { - instances[i] = createInstance(taskManagerGateway, 55443 + i); - scheduler.newInstanceAvailable(instances[i]); + public TestingExecutionGraphBuilder setRestartStrategy(RestartStrategy restartStrategy) { + this.restartStrategy = restartStrategy; + return this; } - return scheduler; - } + public TestingExecutionGraphBuilder setJobGraph(JobGraph jobGraph) { + this.jobGraph = jobGraph; + return this; + } - private static Instance createInstance(TaskManagerGateway taskManagerGateway, int port) { - final HardwareDescription resources = new HardwareDescription(4, 1_000_000_000, 500_000_000, 400_000_000); - final TaskManagerLocation location = new TaskManagerLocation( - ResourceID.generate(), InetAddress.getLoopbackAddress(), port); - return new Instance(taskManagerGateway, location, new InstanceID(), resources, 1); - } + TestingExecutionGraphBuilder setTasksNum2() { + this.tasksNum = 2; + return this; + } - // ------------------------------------------------------------------------ + public TestingExecutionGraphBuilder setTaskManagerLocation(TaskManagerLocation taskManagerLocation) { + this.taskManagerLocation = taskManagerLocation; + return this; + } - private Tuple2<ExecutionGraph, Instance> createExecutionGraph(RestartStrategy restartStrategy) throws Exception { - Instance instance = ExecutionGraphTestUtils.getInstance( - new SimpleAckingTaskManagerGateway(), - NUM_TASKS); + private static TestingExecutionGraphBuilder newBuilder() { + return new TestingExecutionGraphBuilder(); + } - Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); - scheduler.newInstanceAvailable(instance); + private ExecutionGraph build(SlotPool slotPool) throws Exception { + final Scheduler scheduler = createSchedulerWithSlots(tasksNum, slotPool, taskManagerLocation); + final ExecutionGraph eg = createSimpleExecutionGraph(restartStrategy, scheduler, jobGraph); - ExecutionGraph eg = createSimpleExecutionGraph(restartStrategy, scheduler); + assertEquals(JobStatus.CREATED, eg.getState()); - assertEquals(JobStatus.CREATED, eg.getState()); + eg.scheduleForExecution(); Review comment: ok, true, I will rename it to `buildAndScheduleForExecution`. We always need a scheduled graph here. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services