zentol commented on a change in pull request #11469:
URL: https://github.com/apache/flink/pull/11469#discussion_r473362720



##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
##########
@@ -458,6 +466,123 @@ public void 
testRetriableSendOperationIfConnectionErrorOrServiceUnavailable() th
                }
        }
 
+       private class TestJobExecutionResultHandler extends 
TestHandler<EmptyRequestBody, JobExecutionResultResponseBody, 
JobMessageParameters> {
+
+               private final Iterator<Object> jobExecutionResults;
+
+               private Object lastJobExecutionResult;
+
+               private TestJobExecutionResultHandler(
+                       final Object... jobExecutionResults) {
+                       super(JobExecutionResultHeaders.getInstance());
+                       checkArgument(Arrays.stream(jobExecutionResults)
+                               .allMatch(object -> object instanceof 
JobExecutionResultResponseBody
+                                       || object instanceof 
RestHandlerException));
+                       this.jobExecutionResults = 
Arrays.asList(jobExecutionResults).iterator();
+               }
+
+               @Override
+               protected CompletableFuture<JobExecutionResultResponseBody> 
handleRequest(
+                       @Nonnull HandlerRequest<EmptyRequestBody, 
JobMessageParameters> request,
+                       @Nonnull DispatcherGateway gateway) {
+                       if (jobExecutionResults.hasNext()) {
+                               lastJobExecutionResult = 
jobExecutionResults.next();
+                       }
+                       checkState(lastJobExecutionResult != null);
+                       if (lastJobExecutionResult instanceof 
JobExecutionResultResponseBody) {
+                               return 
CompletableFuture.completedFuture((JobExecutionResultResponseBody) 
lastJobExecutionResult);
+                       } else if (lastJobExecutionResult instanceof 
RestHandlerException) {
+                               return 
FutureUtils.completedExceptionally((RestHandlerException) 
lastJobExecutionResult);
+                       } else {
+                               throw new AssertionError();
+                       }
+               }
+       }
+
+       @Test
+       public void testSubmitJobAndWaitForExecutionResult() throws Exception {
+               final TestJobExecutionResultHandler 
testJobExecutionResultHandler =
+                       new TestJobExecutionResultHandler(
+                               new RestHandlerException("should trigger 
retry", HttpResponseStatus.SERVICE_UNAVAILABLE),
+                               JobExecutionResultResponseBody.inProgress(),
+                               JobExecutionResultResponseBody.created(new 
JobResult.Builder()
+                                       
.applicationStatus(ApplicationStatus.SUCCEEDED)
+                                       .jobId(jobId)
+                                       .netRuntime(Long.MAX_VALUE)
+                                       
.accumulatorResults(Collections.singletonMap("testName", new 
SerializedValue<>(OptionalFailure.of(1.0))))
+                                       .build()),
+                               JobExecutionResultResponseBody.created(new 
JobResult.Builder()
+                                       
.applicationStatus(ApplicationStatus.FAILED)
+                                       .jobId(jobId)
+                                       .netRuntime(Long.MAX_VALUE)
+                                       .serializedThrowable(new 
SerializedThrowable(new RuntimeException("expected")))
+                                       .build()));
+
+               // fail first HTTP polling attempt, which should not be a 
problem because of the retries
+               final AtomicBoolean firstPollFailed = new AtomicBoolean();
+               failHttpRequest = (messageHeaders, messageParameters, 
requestBody) ->
+                       messageHeaders instanceof JobExecutionResultHeaders && 
!firstPollFailed.getAndSet(true);
+
+               try (TestRestServerEndpoint restServerEndpoint = 
createRestServerEndpoint(
+                       testJobExecutionResultHandler,
+                       new TestJobSubmitHandler())) {
+
+                       try (RestClusterClient<?> restClusterClient = 
createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
+                               final JobExecutionResult jobExecutionResult = 
restClusterClient.submitJob(jobGraph)
+                                       
.thenCompose(restClusterClient::requestJobResult)
+                                       .get()
+                                       
.toJobExecutionResult(ClassLoader.getSystemClassLoader());
+                               assertThat(jobExecutionResult.getJobID(), 
equalTo(jobId));
+                               assertThat(jobExecutionResult.getNetRuntime(), 
equalTo(Long.MAX_VALUE));
+                               assertThat(
+                                       
jobExecutionResult.getAllAccumulatorResults(),
+                                       
equalTo(Collections.singletonMap("testName", 1.0)));
+
+                               try {
+                                       restClusterClient.submitJob(jobGraph)
+                                               
.thenCompose(restClusterClient::requestJobResult)
+                                               .get()
+                                               
.toJobExecutionResult(ClassLoader.getSystemClassLoader());
+                                       fail("Expected exception not thrown.");
+                               } catch (final Exception e) {
+                                       final Optional<RuntimeException> cause 
= ExceptionUtils.findThrowable(e, RuntimeException.class);
+                                       assertThat(cause.isPresent(), is(true));
+                                       assertThat(cause.get().getMessage(), 
equalTo("expected"));
+                               }
+                       }
+               }
+       }
+
+       @Test
+       public void testJobSubmissionFailureThrowsProgramInvocationException() 
throws Exception {

Review comment:
       ```suggestion
        public void testJobSubmissionFailureCauseForwardedToClient() throws 
Exception {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to