[ 
https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16580784#comment-16580784
 ] 

ASF GitHub Bot commented on FLINK-10056:
----------------------------------------

asfgit closed pull request #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index c47f4fd19ff..01cb2b6b099 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1655,4 +1655,9 @@ public void reportPayload(ResourceID resourceID, Void 
payload) {
        RestartStrategy getRestartStrategy() {
                return restartStrategy;
        }
+
+       @VisibleForTesting
+       ExecutionGraph getExecutionGraph() {
+               return executionGraph;
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 1d36fa5859a..0d603fc17b2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -97,13 +97,12 @@
        // 
------------------------------------------------------------------------
 
        /**
-        * Waits until the job has reached a certain state.
+        * Waits until the Job has reached a certain state.
         *
         * <p>This method is based on polling and might miss very fast state 
transitions!
         */
        public static void waitUntilJobStatus(ExecutionGraph eg, JobStatus 
status, long maxWaitMillis)
                        throws TimeoutException {
-
                checkNotNull(eg);
                checkNotNull(status);
                checkArgument(maxWaitMillis >= 0);
@@ -118,7 +117,9 @@ public static void waitUntilJobStatus(ExecutionGraph eg, 
JobStatus status, long
                }
 
                if (System.nanoTime() >= deadline) {
-                       throw new TimeoutException("The job did not reach 
status " + status + " in time. Current status is " + eg.getState() + '.');
+                       throw new TimeoutException(
+                               String.format("The job did not reach status %s 
in time. Current status is %s.",
+                                       status, eg.getState()));
                }
        }
 
@@ -129,7 +130,6 @@ public static void waitUntilJobStatus(ExecutionGraph eg, 
JobStatus status, long
         */
        public static void waitUntilExecutionState(Execution execution, 
ExecutionState state, long maxWaitMillis)
                        throws TimeoutException {
-
                checkNotNull(execution);
                checkNotNull(state);
                checkArgument(maxWaitMillis >= 0);
@@ -144,7 +144,47 @@ public static void waitUntilExecutionState(Execution 
execution, ExecutionState s
                }
 
                if (System.nanoTime() >= deadline) {
-                       throw new TimeoutException();
+                       throw new TimeoutException(
+                               String.format("The execution did not reach 
state %s in time. Current state is %s.",
+                                       state, execution.getState()));
+               }
+       }
+
+       /**
+        * Waits until the ExecutionVertex has reached a certain state.
+        *
+        * <p>This method is based on polling and might miss very fast state 
transitions!
+        */
+       public static void waitUntilExecutionVertexState(ExecutionVertex 
executionVertex, ExecutionState state, long maxWaitMillis)
+               throws TimeoutException {
+               checkNotNull(executionVertex);
+               checkNotNull(state);
+               checkArgument(maxWaitMillis >= 0);
+
+               // this is a poor implementation - we may want to improve it 
eventually
+               final long deadline = maxWaitMillis == 0 ? Long.MAX_VALUE : 
System.nanoTime() + (maxWaitMillis * 1_000_000);
+
+               while (true) {
+                       Execution execution = 
executionVertex.getCurrentExecutionAttempt();
+
+                       if (execution == null || (execution.getState() != state 
&& System.nanoTime() < deadline)) {
+                               try {
+                                       Thread.sleep(2);
+                               } catch (InterruptedException ignored) { }
+                       } else {
+                               break;
+                       }
+
+                       if (System.nanoTime() >= deadline) {
+                               if (execution != null) {
+                                       throw new TimeoutException(
+                                               String.format("The execution 
vertex did not reach state %s in time. Current state is %s.",
+                                                       state, 
execution.getState()));
+                               } else {
+                                       throw new TimeoutException(
+                                               "Cannot get current execution 
attempt of " + executionVertex + '.');
+                               }
+                       }
                }
        }
 
@@ -201,7 +241,6 @@ public static void waitForAllExecutionsPredicate(
 
        public static void waitUntilFailoverRegionState(FailoverRegion region, 
JobStatus status, long maxWaitMillis)
                        throws TimeoutException {
-
                checkNotNull(region);
                checkNotNull(status);
                checkArgument(maxWaitMillis >= 0);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 891ff82c413..578c9066e95 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -24,8 +24,12 @@
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
@@ -46,6 +50,9 @@
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
@@ -60,6 +67,7 @@
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
@@ -84,6 +92,7 @@
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
 
 import org.hamcrest.Matchers;
@@ -113,6 +122,7 @@
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -678,6 +688,133 @@ public void 
testResourceManagerConnectionAfterRegainingLeadership() throws Excep
                }
        }
 
+       @Test
+       public void testRequestNextInputSplit() throws Exception {
+               // build one node JobGraph
+               InputSplitSource<TestingInputSplit> inputSplitSource = new 
TestingInputSplitSource();
+
+               JobVertex source = new JobVertex("vertex1");
+               source.setParallelism(1);
+               source.setInputSplitSource(inputSplitSource);
+               source.setInvokableClass(AbstractInvokable.class);
+
+               final JobGraph jobGraph = new JobGraph(source);
+               jobGraph.setAllowQueuedScheduling(true);
+
+               
configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
+               
configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
+
+               final JobManagerSharedServices jobManagerSharedServices =
+                       new TestingJobManagerSharedServicesBuilder()
+                               
.setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration))
+                               .build();
+
+               final JobMaster jobMaster = createJobMaster(
+                       configuration,
+                       jobGraph,
+                       haServices,
+                       jobManagerSharedServices);
+
+               CompletableFuture<Acknowledge> startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
+
+               try {
+                       // wait for the start to complete
+                       startFuture.get(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+                       final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+                       ExecutionGraph eg = jobMaster.getExecutionGraph();
+                       ExecutionVertex ev = 
eg.getAllExecutionVertices().iterator().next();
+
+                       SerializedInputSplit serializedInputSplit1 = 
jobMasterGateway
+                               .requestNextInputSplit(
+                                       source.getID(),
+                                       
ev.getCurrentExecutionAttempt().getAttemptId())
+                               .get(1L, TimeUnit.SECONDS);
+                       InputSplit inputSplit1 = InstantiationUtil
+                               .deserializeObject(
+                                       
serializedInputSplit1.getInputSplitData(),
+                                       ClassLoader.getSystemClassLoader());
+                       assertEquals(0, inputSplit1.getSplitNumber());
+
+                       SerializedInputSplit serializedInputSplit2 = 
jobMasterGateway
+                               .requestNextInputSplit(
+                                       source.getID(),
+                                       
ev.getCurrentExecutionAttempt().getAttemptId())
+                               .get(1L, TimeUnit.SECONDS);
+                       InputSplit inputSplit2 = InstantiationUtil
+                               .deserializeObject(
+                                       
serializedInputSplit2.getInputSplitData(),
+                                       ClassLoader.getSystemClassLoader());
+                       assertEquals(1, inputSplit2.getSplitNumber());
+
+                       
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, 
ExecutionState.SCHEDULED, 2000L);
+
+                       eg.failGlobal(new Exception("Testing exception"));
+
+                       
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, 
ExecutionState.SCHEDULED, 2000L);
+
+                       SerializedInputSplit serializedInputSplit3 = 
jobMasterGateway
+                               .requestNextInputSplit(
+                                       source.getID(),
+                                       
ev.getCurrentExecutionAttempt().getAttemptId())
+                               .get(1L, TimeUnit.SECONDS);
+                       InputSplit inputSplit3 = InstantiationUtil
+                               .deserializeObject(
+                                       
serializedInputSplit3.getInputSplitData(),
+                                       ClassLoader.getSystemClassLoader());
+                       assertEquals(0, inputSplit3.getSplitNumber());
+
+                       SerializedInputSplit serializedInputSplit4 = 
jobMasterGateway
+                               .requestNextInputSplit(
+                                       source.getID(),
+                                       
ev.getCurrentExecutionAttempt().getAttemptId())
+                               .get(1L, TimeUnit.SECONDS);
+                       InputSplit inputSplit4 = InstantiationUtil
+                               .deserializeObject(
+                                       
serializedInputSplit4.getInputSplitData(),
+                                       ClassLoader.getSystemClassLoader());
+                       assertEquals(1, inputSplit4.getSplitNumber());
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(jobMaster, 
testingTimeout);
+               }
+       }
+
+       private static final class TestingInputSplitSource implements 
InputSplitSource<TestingInputSplit> {
+               @Override
+               public TestingInputSplit[] createInputSplits(int minNumSplits) {
+                       return new TestingInputSplit[0];
+               }
+
+               @Override
+               public InputSplitAssigner 
getInputSplitAssigner(TestingInputSplit[] inputSplits) {
+                       return new TestingInputSplitAssigner();
+               }
+       }
+
+       private static final class TestingInputSplitAssigner implements 
InputSplitAssigner {
+
+               private int splitIndex = 0;
+
+               @Override
+               public InputSplit getNextInputSplit(String host, int taskId){
+                       return new TestingInputSplit(splitIndex++);
+               }
+       }
+
+       private static final class TestingInputSplit implements InputSplit {
+
+               private final int splitNumber;
+
+               TestingInputSplit(int number) {
+                       this.splitNumber = number;
+               }
+
+               public int getSplitNumber() {
+                       return splitNumber;
+               }
+       }
+
        /**
         * Tests the {@link 
JobMaster#requestPartitionState(IntermediateDataSetID, ResultPartitionID)}
         * call for a finished result partition.
@@ -708,9 +845,9 @@ public void testRequestPartitionState() throws Exception {
                        final CompletableFuture<TaskDeploymentDescriptor> 
tddFuture = new CompletableFuture<>();
                        final TestingTaskExecutorGateway 
testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
                                
.setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
-                    tddFuture.complete(taskDeploymentDescriptor);
-                    return 
CompletableFuture.completedFuture(Acknowledge.get());
-                })
+                                         
tddFuture.complete(taskDeploymentDescriptor);
+                                         return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                 })
                                .createTestingTaskExecutorGateway();
                        
rpcService.registerGateway(testingTaskExecutorGateway.getAddress(), 
testingTaskExecutorGateway);
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add testRequestNextInputSplit
> -----------------------------
>
>                 Key: FLINK-10056
>                 URL: https://issues.apache.org/jira/browse/FLINK-10056
>             Project: Flink
>          Issue Type: Improvement
>          Components: JobManager, Tests
>    Affects Versions: 1.5.0
>            Reporter: 陈梓立
>            Assignee: 陈梓立
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit 
> works as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to