zentol commented on a change in pull request #11469: URL: https://github.com/apache/flink/pull/11469#discussion_r473362720
########## File path: flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java ########## @@ -458,6 +466,123 @@ public void testRetriableSendOperationIfConnectionErrorOrServiceUnavailable() th } } + private class TestJobExecutionResultHandler extends TestHandler<EmptyRequestBody, JobExecutionResultResponseBody, JobMessageParameters> { + + private final Iterator<Object> jobExecutionResults; + + private Object lastJobExecutionResult; + + private TestJobExecutionResultHandler( + final Object... jobExecutionResults) { + super(JobExecutionResultHeaders.getInstance()); + checkArgument(Arrays.stream(jobExecutionResults) + .allMatch(object -> object instanceof JobExecutionResultResponseBody + || object instanceof RestHandlerException)); + this.jobExecutionResults = Arrays.asList(jobExecutionResults).iterator(); + } + + @Override + protected CompletableFuture<JobExecutionResultResponseBody> handleRequest( + @Nonnull HandlerRequest<EmptyRequestBody, JobMessageParameters> request, + @Nonnull DispatcherGateway gateway) { + if (jobExecutionResults.hasNext()) { + lastJobExecutionResult = jobExecutionResults.next(); + } + checkState(lastJobExecutionResult != null); + if (lastJobExecutionResult instanceof JobExecutionResultResponseBody) { + return CompletableFuture.completedFuture((JobExecutionResultResponseBody) lastJobExecutionResult); + } else if (lastJobExecutionResult instanceof RestHandlerException) { + return FutureUtils.completedExceptionally((RestHandlerException) lastJobExecutionResult); + } else { + throw new AssertionError(); + } + } + } + + @Test + public void testSubmitJobAndWaitForExecutionResult() throws Exception { + final TestJobExecutionResultHandler testJobExecutionResultHandler = + new TestJobExecutionResultHandler( + new RestHandlerException("should trigger retry", HttpResponseStatus.SERVICE_UNAVAILABLE), + JobExecutionResultResponseBody.inProgress(), + JobExecutionResultResponseBody.created(new JobResult.Builder() + .applicationStatus(ApplicationStatus.SUCCEEDED) + .jobId(jobId) + .netRuntime(Long.MAX_VALUE) + .accumulatorResults(Collections.singletonMap("testName", new SerializedValue<>(OptionalFailure.of(1.0)))) + .build()), + JobExecutionResultResponseBody.created(new JobResult.Builder() + .applicationStatus(ApplicationStatus.FAILED) + .jobId(jobId) + .netRuntime(Long.MAX_VALUE) + .serializedThrowable(new SerializedThrowable(new RuntimeException("expected"))) + .build())); + + // fail first HTTP polling attempt, which should not be a problem because of the retries + final AtomicBoolean firstPollFailed = new AtomicBoolean(); + failHttpRequest = (messageHeaders, messageParameters, requestBody) -> + messageHeaders instanceof JobExecutionResultHeaders && !firstPollFailed.getAndSet(true); + + try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint( + testJobExecutionResultHandler, + new TestJobSubmitHandler())) { + + try (RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) { + final JobExecutionResult jobExecutionResult = restClusterClient.submitJob(jobGraph) + .thenCompose(restClusterClient::requestJobResult) + .get() + .toJobExecutionResult(ClassLoader.getSystemClassLoader()); + assertThat(jobExecutionResult.getJobID(), equalTo(jobId)); + assertThat(jobExecutionResult.getNetRuntime(), equalTo(Long.MAX_VALUE)); + assertThat( + jobExecutionResult.getAllAccumulatorResults(), + equalTo(Collections.singletonMap("testName", 1.0))); + + try { + restClusterClient.submitJob(jobGraph) + .thenCompose(restClusterClient::requestJobResult) + .get() + .toJobExecutionResult(ClassLoader.getSystemClassLoader()); + fail("Expected exception not thrown."); + } catch (final Exception e) { + final Optional<RuntimeException> cause = ExceptionUtils.findThrowable(e, RuntimeException.class); + assertThat(cause.isPresent(), is(true)); + assertThat(cause.get().getMessage(), equalTo("expected")); + } + } + } + } + + @Test + public void testJobSubmissionFailureThrowsProgramInvocationException() throws Exception { Review comment: ```suggestion public void testJobSubmissionFailureCauseForwardedToClient() throws Exception { ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org