Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6147#discussion_r195107887 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java --- @@ -71,20 +108,192 @@ public void testSerializationFailureHandling() throws Exception { @Test public void testSuccessfulJobSubmission() throws Exception { - DispatcherGateway mockGateway = mock(DispatcherGateway.class); - when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + DispatcherGateway mockGateway = new JobGraphCapturingMockGateway(); GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class); JobSubmitHandler handler = new JobSubmitHandler( CompletableFuture.completedFuture("http://localhost:1234"), mockGatewayRetriever, RpcUtils.INF_TIMEOUT, - Collections.emptyMap()); + Collections.emptyMap(), + new Configuration()); JobGraph job = new JobGraph("testjob"); JobSubmitRequestBody request = new JobSubmitRequestBody(job); handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway) .get(); } + + @Test + public void testJarHandling() throws Exception { + final String jarName = "jar"; + + JobGraphCapturingMockGateway jobGraphCapturingMockGateway = new JobGraphCapturingMockGateway(); + GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class); + + JobSubmitHandler handler = new JobSubmitHandler( + CompletableFuture.completedFuture("http://localhost:1234"), + mockGatewayRetriever, + RpcUtils.INF_TIMEOUT, + Collections.emptyMap(), + new Configuration()); + + Path tmp = TMP_FOLDER.newFolder().toPath(); + Path clientStorageDirectory = Files.createDirectory(tmp.resolve("client-storage-directory")); + Path serverStorageDirectory = Files.createDirectory(tmp.resolve("server-storage-directory")); + + Path jar = Paths.get(jarName); + Files.createFile(clientStorageDirectory.resolve(jar)); + Files.createFile(serverStorageDirectory.resolve(jar)); + + JobGraph job = new JobGraph("testjob"); + job.addJar(new org.apache.flink.core.fs.Path(jar.toUri())); + JobSubmitRequestBody serializedJobGraphBody = new JobSubmitRequestBody(job); + JobSubmitRequestBody request = new JobSubmitRequestBody(serializedJobGraphBody.serializedJobGraph, Collections.singletonList(serverStorageDirectory.resolve(jar)), Collections.emptyList(), serverStorageDirectory); + + handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), jobGraphCapturingMockGateway) + .get(); + + JobGraph submittedJobGraph = jobGraphCapturingMockGateway.jobGraph; + List<org.apache.flink.core.fs.Path> userJars = submittedJobGraph.getUserJars(); + + // ensure we haven't changed the total number of jars + Assert.assertEquals(1, userJars.size()); + + // this entry should be changed, a replacement jar exists in the server storage directory + Assert.assertEquals(new org.apache.flink.core.fs.Path(serverStorageDirectory.resolve(jar).toUri()), userJars.get(0)); --- End diff -- I think updating `JobGraph#userJars` and `JobGraph#userArtifacts` is not really necessary. Maybe we should even mark them `transient` in order to emphasize that they won't be transmitted. Given that, I think we don't have to do these tests.
---