[ https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527831#comment-16527831 ]
ASF GitHub Bot commented on FLINK-9280: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199199366 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java --- @@ -71,20 +112,131 @@ public void testSerializationFailureHandling() throws Exception { @Test public void testSuccessfulJobSubmission() throws Exception { + final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath(); + try (OutputStream fileOut = Files.newOutputStream(jobGraphFile)) { + try (ObjectOutputStream objectOut = new ObjectOutputStream(fileOut)) { + objectOut.writeObject(new JobGraph("testjob")); + } + } + DispatcherGateway mockGateway = mock(DispatcherGateway.class); + when(mockGateway.getHostname()).thenReturn("localhost"); + when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort())); when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class); JobSubmitHandler handler = new JobSubmitHandler( CompletableFuture.completedFuture("http://localhost:1234"), mockGatewayRetriever, RpcUtils.INF_TIMEOUT, - Collections.emptyMap()); + Collections.emptyMap(), + TestingUtils.defaultExecutor()); - JobGraph job = new JobGraph("testjob"); - JobSubmitRequestBody request = new JobSubmitRequestBody(job); + JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.emptyList(), Collections.emptyList()); - handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway) + handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance(), Collections.emptyMap(), Collections.emptyMap(), Collections.singleton(jobGraphFile.toFile())), mockGateway) .get(); } + + @Test + public void testRejectionOnCountMismatch() throws Exception { + final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath(); + try (OutputStream fileOut = Files.newOutputStream(jobGraphFile)) { + try (ObjectOutputStream objectOut = new ObjectOutputStream(fileOut)) { + objectOut.writeObject(new JobGraph("testjob")); + } + } + final Path countExceedingFile = TEMPORARY_FOLDER.newFile().toPath(); + + DispatcherGateway mockGateway = mock(DispatcherGateway.class); + when(mockGateway.getHostname()).thenReturn("localhost"); + when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort())); + when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class); + + JobSubmitHandler handler = new JobSubmitHandler( + CompletableFuture.completedFuture("http://localhost:1234"), + mockGatewayRetriever, + RpcUtils.INF_TIMEOUT, + Collections.emptyMap(), + TestingUtils.defaultExecutor()); + + JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.emptyList(), Collections.emptyList()); + + try { + handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance(), Collections.emptyMap(), Collections.emptyMap(), Arrays.asList(jobGraphFile.toFile(), countExceedingFile.toFile())), mockGateway) + .get(); + } catch (Exception e) { + ExceptionUtils.findThrowable(e, candidate -> candidate instanceof RestHandlerException && candidate.getMessage().contains("count")); + } + } + + @Test + public void testFileHandling() throws Exception { + final String dcEntryName = "entry"; + + CompletableFuture<JobGraph> submittedJobGraphFuture = new CompletableFuture<>(); + DispatcherGateway dispatcherGateway = new TestingDispatcherGateway.Builder() + .setBlobServerPort(blobServer.getPort()) + .setSubmitFunction(submittedJobGraph -> { + submittedJobGraphFuture.complete(submittedJobGraph); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .build(); + + GatewayRetriever<DispatcherGateway> gatewayRetriever = new TestGatewayRetriever(dispatcherGateway); + + JobSubmitHandler handler = new JobSubmitHandler( + CompletableFuture.completedFuture("http://localhost:1234"), + gatewayRetriever, + RpcUtils.INF_TIMEOUT, + Collections.emptyMap(), + TestingUtils.defaultExecutor()); + + final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath(); + final Path jarFile = TEMPORARY_FOLDER.newFile().toPath(); + final Path artifactFile = TEMPORARY_FOLDER.newFile().toPath(); + + final JobGraph jobGraph = new JobGraph(); + // the entry that should be updated + jobGraph.addUserArtifact(dcEntryName, new DistributedCache.DistributedCacheEntry("random", false)); + try (OutputStream fileOut = Files.newOutputStream(jobGraphFile)) { + try (ObjectOutputStream objectOut = new ObjectOutputStream(fileOut)) { + objectOut.writeObject(jobGraph); + } + } + + JobSubmitRequestBody request = new JobSubmitRequestBody( + jobGraphFile.getFileName().toString(), + Collections.singletonList(jarFile.getFileName().toString()), + Collections.singleton(new JobSubmitRequestBody.DistributedCacheFile(dcEntryName, artifactFile.getFileName().toString()))); + + handler.handleRequest(new HandlerRequest<>( + request, + EmptyMessageParameters.getInstance(), + Collections.emptyMap(), + Collections.emptyMap(), + Arrays.asList(jobGraphFile.toFile(), jarFile.toFile(), artifactFile.toFile())), dispatcherGateway) + .get(); + + Assert.assertTrue("No JobGraph was submitted.", submittedJobGraphFuture.isDone()); + final JobGraph submittedJobGraph = submittedJobGraphFuture.get(); + Assert.assertEquals(1, submittedJobGraph.getUserJarBlobKeys().size()); + Assert.assertEquals(1, submittedJobGraph.getUserArtifacts().size()); + Assert.assertNotNull(submittedJobGraph.getUserArtifacts().get(dcEntryName).blobKey); --- End diff -- Just a side note, hamcrest offers a bit more expressive assertions which generate in many cases better failure messages. E.g., `assertThat(submittedJobGraph.getUserArtifacts(), hasSize(1))`. > Extend JobSubmitHandler to accept jar files > ------------------------------------------- > > Key: FLINK-9280 > URL: https://issues.apache.org/jira/browse/FLINK-9280 > Project: Flink > Issue Type: New Feature > Components: Job-Submission, REST > Affects Versions: 1.5.0 > Reporter: Chesnay Schepler > Assignee: Chesnay Schepler > Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > The job submission through the CLI first uploads all require jars to the blob > server, sets the blob keys in the jobgraph, and then uploads this graph to > The {{JobSubmitHandler}} which submits it to the Dispatcher. > This process has the downside that it requires jars to be uploaded to the > blobserver before submitting the job graph, which does not happen via REST. > I propose an extension to the the {{JobSubmitHandler}} to also accept an > optional list of jar files, that were previously uploaded through the > {{JarUploadHandler}}. If present, the handler would upload these jars to the > blobserver and set the blob keys. -- This message was sent by Atlassian JIRA (v7.6.3#76005)