[ https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16580784#comment-16580784 ]
ASF GitHub Bot commented on FLINK-10056: ---------------------------------------- asfgit closed pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index c47f4fd19ff..01cb2b6b099 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -1655,4 +1655,9 @@ public void reportPayload(ResourceID resourceID, Void payload) { RestartStrategy getRestartStrategy() { return restartStrategy; } + + @VisibleForTesting + ExecutionGraph getExecutionGraph() { + return executionGraph; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index 1d36fa5859a..0d603fc17b2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -97,13 +97,12 @@ // ------------------------------------------------------------------------ /** - * Waits until the job has reached a certain state. + * Waits until the Job has reached a certain state. * * <p>This method is based on polling and might miss very fast state transitions! */ public static void waitUntilJobStatus(ExecutionGraph eg, JobStatus status, long maxWaitMillis) throws TimeoutException { - checkNotNull(eg); checkNotNull(status); checkArgument(maxWaitMillis >= 0); @@ -118,7 +117,9 @@ public static void waitUntilJobStatus(ExecutionGraph eg, JobStatus status, long } if (System.nanoTime() >= deadline) { - throw new TimeoutException("The job did not reach status " + status + " in time. Current status is " + eg.getState() + '.'); + throw new TimeoutException( + String.format("The job did not reach status %s in time. Current status is %s.", + status, eg.getState())); } } @@ -129,7 +130,6 @@ public static void waitUntilJobStatus(ExecutionGraph eg, JobStatus status, long */ public static void waitUntilExecutionState(Execution execution, ExecutionState state, long maxWaitMillis) throws TimeoutException { - checkNotNull(execution); checkNotNull(state); checkArgument(maxWaitMillis >= 0); @@ -144,7 +144,47 @@ public static void waitUntilExecutionState(Execution execution, ExecutionState s } if (System.nanoTime() >= deadline) { - throw new TimeoutException(); + throw new TimeoutException( + String.format("The execution did not reach state %s in time. Current state is %s.", + state, execution.getState())); + } + } + + /** + * Waits until the ExecutionVertex has reached a certain state. + * + * <p>This method is based on polling and might miss very fast state transitions! + */ + public static void waitUntilExecutionVertexState(ExecutionVertex executionVertex, ExecutionState state, long maxWaitMillis) + throws TimeoutException { + checkNotNull(executionVertex); + checkNotNull(state); + checkArgument(maxWaitMillis >= 0); + + // this is a poor implementation - we may want to improve it eventually + final long deadline = maxWaitMillis == 0 ? Long.MAX_VALUE : System.nanoTime() + (maxWaitMillis * 1_000_000); + + while (true) { + Execution execution = executionVertex.getCurrentExecutionAttempt(); + + if (execution == null || (execution.getState() != state && System.nanoTime() < deadline)) { + try { + Thread.sleep(2); + } catch (InterruptedException ignored) { } + } else { + break; + } + + if (System.nanoTime() >= deadline) { + if (execution != null) { + throw new TimeoutException( + String.format("The execution vertex did not reach state %s in time. Current state is %s.", + state, execution.getState())); + } else { + throw new TimeoutException( + "Cannot get current execution attempt of " + executionVertex + '.'); + } + } } } @@ -201,7 +241,6 @@ public static void waitForAllExecutionsPredicate( public static void waitUntilFailoverRegionState(FailoverRegion region, JobStatus status, long maxWaitMillis) throws TimeoutException { - checkNotNull(region); checkNotNull(status); checkArgument(maxWaitMillis >= 0); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 891ff82c413..578c9066e95 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -24,8 +24,12 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.core.io.InputSplitSource; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.VoidBlobStore; @@ -46,6 +50,9 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; @@ -60,6 +67,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobmanager.OnCompletionActions; @@ -84,6 +92,7 @@ import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.TestLogger; import org.hamcrest.Matchers; @@ -113,6 +122,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -678,6 +688,133 @@ public void testResourceManagerConnectionAfterRegainingLeadership() throws Excep } } + @Test + public void testRequestNextInputSplit() throws Exception { + // build one node JobGraph + InputSplitSource<TestingInputSplit> inputSplitSource = new TestingInputSplitSource(); + + JobVertex source = new JobVertex("vertex1"); + source.setParallelism(1); + source.setInputSplitSource(inputSplitSource); + source.setInvokableClass(AbstractInvokable.class); + + final JobGraph jobGraph = new JobGraph(source); + jobGraph.setAllowQueuedScheduling(true); + + configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1); + configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); + + final JobManagerSharedServices jobManagerSharedServices = + new TestingJobManagerSharedServicesBuilder() + .setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration)) + .build(); + + final JobMaster jobMaster = createJobMaster( + configuration, + jobGraph, + haServices, + jobManagerSharedServices); + + CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout); + + try { + // wait for the start to complete + startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + + ExecutionGraph eg = jobMaster.getExecutionGraph(); + ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next(); + + SerializedInputSplit serializedInputSplit1 = jobMasterGateway + .requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); + InputSplit inputSplit1 = InstantiationUtil + .deserializeObject( + serializedInputSplit1.getInputSplitData(), + ClassLoader.getSystemClassLoader()); + assertEquals(0, inputSplit1.getSplitNumber()); + + SerializedInputSplit serializedInputSplit2 = jobMasterGateway + .requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); + InputSplit inputSplit2 = InstantiationUtil + .deserializeObject( + serializedInputSplit2.getInputSplitData(), + ClassLoader.getSystemClassLoader()); + assertEquals(1, inputSplit2.getSplitNumber()); + + ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, ExecutionState.SCHEDULED, 2000L); + + eg.failGlobal(new Exception("Testing exception")); + + ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, ExecutionState.SCHEDULED, 2000L); + + SerializedInputSplit serializedInputSplit3 = jobMasterGateway + .requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); + InputSplit inputSplit3 = InstantiationUtil + .deserializeObject( + serializedInputSplit3.getInputSplitData(), + ClassLoader.getSystemClassLoader()); + assertEquals(0, inputSplit3.getSplitNumber()); + + SerializedInputSplit serializedInputSplit4 = jobMasterGateway + .requestNextInputSplit( + source.getID(), + ev.getCurrentExecutionAttempt().getAttemptId()) + .get(1L, TimeUnit.SECONDS); + InputSplit inputSplit4 = InstantiationUtil + .deserializeObject( + serializedInputSplit4.getInputSplitData(), + ClassLoader.getSystemClassLoader()); + assertEquals(1, inputSplit4.getSplitNumber()); + } finally { + RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); + } + } + + private static final class TestingInputSplitSource implements InputSplitSource<TestingInputSplit> { + @Override + public TestingInputSplit[] createInputSplits(int minNumSplits) { + return new TestingInputSplit[0]; + } + + @Override + public InputSplitAssigner getInputSplitAssigner(TestingInputSplit[] inputSplits) { + return new TestingInputSplitAssigner(); + } + } + + private static final class TestingInputSplitAssigner implements InputSplitAssigner { + + private int splitIndex = 0; + + @Override + public InputSplit getNextInputSplit(String host, int taskId){ + return new TestingInputSplit(splitIndex++); + } + } + + private static final class TestingInputSplit implements InputSplit { + + private final int splitNumber; + + TestingInputSplit(int number) { + this.splitNumber = number; + } + + public int getSplitNumber() { + return splitNumber; + } + } + /** * Tests the {@link JobMaster#requestPartitionState(IntermediateDataSetID, ResultPartitionID)} * call for a finished result partition. @@ -708,9 +845,9 @@ public void testRequestPartitionState() throws Exception { final CompletableFuture<TaskDeploymentDescriptor> tddFuture = new CompletableFuture<>(); final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder() .setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> { - tddFuture.complete(taskDeploymentDescriptor); - return CompletableFuture.completedFuture(Acknowledge.get()); - }) + tddFuture.complete(taskDeploymentDescriptor); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) .createTestingTaskExecutorGateway(); rpcService.registerGateway(testingTaskExecutorGateway.getAddress(), testingTaskExecutorGateway); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add testRequestNextInputSplit > ----------------------------- > > Key: FLINK-10056 > URL: https://issues.apache.org/jira/browse/FLINK-10056 > Project: Flink > Issue Type: Improvement > Components: JobManager, Tests > Affects Versions: 1.5.0 > Reporter: 陈梓立 > Assignee: 陈梓立 > Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit > works as expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)