Solved. this is the corret code to deploy a Job programmatically via REST API.
Thanks URL serverUrl = new URL("http://192.168.149.130:8081/jars/upload"); HttpURLConnection urlConnection = (HttpURLConnection) serverUrl.openConnection(); String boundaryString = "------BoundaryXXXX"; String crlf = "\r\n"; String fileUrl = "Test-1.jar"; File jarToUpload = new File(fileUrl); urlConnection.setUseCaches(false); urlConnection.setDoOutput(true); urlConnection.setDoInput(true); urlConnection.setRequestMethod("POST"); urlConnection.setRequestProperty("Connection", "Keep-Alive"); urlConnection.setRequestProperty("Cache-Control", "no-cache"); urlConnection.addRequestProperty("Content-Type", "multipart/form-data; boundary=" + boundaryString); OutputStream outputStreamToRequestBody = urlConnection.getOutputStream(); BufferedWriter httpRequestBodyWriter = new BufferedWriter(new OutputStreamWriter(outputStreamToRequestBody)); // Include the section to describe the file String payloadString = "--"+boundaryString + crlf + "Content-Disposition: form-data;" + " name=\"jarfile\";" + " filename=\"Test-1.jar\""+crlf + "Content-Type: application/x-java-archive"+crlf+crlf; System.out.println(payloadString); httpRequestBodyWriter.write(payloadString); httpRequestBodyWriter.flush(); // Write the actual file contents FileInputStream inputStream = new FileInputStream(jarToUpload); int bytesRead; byte[] dataBuffer = new byte[1024]; while((bytesRead = inputStream.read(dataBuffer)) != -1) { outputStreamToRequestBody.write(dataBuffer, 0, bytesRead); } outputStreamToRequestBody.flush(); httpRequestBodyWriter.write(crlf+"--"+boundaryString+"--" +crlf); httpRequestBodyWriter.flush(); inputStream.close(); outputStreamToRequestBody.close(); httpRequestBodyWriter.close(); BufferedReader httpResponseReader = new BufferedReader(new InputStreamReader(urlConnection.getInputStream())); String lineRead; while((lineRead = httpResponseReader.readLine()) != null) { System.out.println(lineRead); } } Best Regards Luigi 2018-01-18 17:02 GMT+01:00 Luigi Sgaglione <luigi.sgagli...@gmail.com>: > Hi Timo, > > I think that the REST API is the most suitable solution. Thanks. > > So, I'm trying to use the Flink REST API and I'm able to perform get > request but not the post one. > > In particular when I issue a post to upload the jar I receive this error > form the server: {"error": "Failed to upload the file."} > > this is the used code: > > > URL serverUrl = new URL("http://192.168.149.130:8081/jars/upload"); > > HttpURLConnection urlConnection = (HttpURLConnection) > serverUrl.openConnection(); > > String boundaryString = "------BoundaryXXXX"; > String crlf = "\r\n"; > String fileUrl = "Test-1.jar"; > File jarToUpload = new File(fileUrl); > > urlConnection.setDoOutput(true); > urlConnection.setRequestMethod("POST"); > urlConnection.setRequestProperty("Connection", "Keep-Alive"); > urlConnection.setRequestProperty("Cache-Control", "no-cache"); > urlConnection.addRequestProperty("Content-Type", "multipart/form-data; > boundary=" + boundaryString); > > OutputStream outputStreamToRequestBody = urlConnection.getOutputStream(); > BufferedWriter httpRequestBodyWriter = > new BufferedWriter(new OutputStreamWriter(outputStreamToRequestBody)); > > String payloadString = boundaryString + crlf + "Content-Disposition: > form-data;" > + " name=\"jarfile\";" > + " filename=\"Test-1.jar\""+crlf > + "Content-Type: application/x-java-archive"+crlf+crlf; > System.out.println(payloadString); > httpRequestBodyWriter.write(payloadString); > httpRequestBodyWriter.flush(); > > // Write the actual file contents > FileInputStream inputStream = new FileInputStream(jarToUpload); > > int bytesRead; > byte[] dataBuffer = new byte[1024]; > while((bytesRead = inputStream.read(dataBuffer)) != -1) { > outputStreamToRequestBody.write(dataBuffer, 0, bytesRead); > } > > outputStreamToRequestBody.flush(); > > httpRequestBodyWriter.write(boundaryString +crlf); > httpRequestBodyWriter.flush(); > > inputStream.close(); > outputStreamToRequestBody.close(); > httpRequestBodyWriter.close(); > BufferedReader httpResponseReader = > new BufferedReader(new InputStreamReader( > urlConnection.getInputStream())); > String lineRead; > while((lineRead = httpResponseReader.readLine()) != null) { > System.out.println(lineRead); > } > > The documentation of Flink REST API is not so detailed, or better it > doesn't include a clear example. > > Do you have any idea to solve the error? > > > thanks > > 2018-01-18 12:54 GMT+01:00 Timo Walther <twal...@apache.org>: > >> Hi Luigi, >> >> I'm also working on a solution for submitting jobs programmatically. You >> can look into my working branch [1]. As far as I know, the best and most >> stable solution is using the ClusterClient. But this is internal API and >> might change. >> >> You could also use Flink's REST API for submitting a job [2]. >> >> Regards, >> Timo >> >> [1] https://github.com/twalthr/flink/blob/FLINK-7594_rebased/fli >> nk-libraries/flink-sql-client/src/main/java/org/apache/ >> flink/table/client/gateway/LocalExecutor.java >> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ >> monitoring/rest_api.html#submitting-programs >> >> Am 1/18/18 um 11:41 AM schrieb Luigi Sgaglione: >> >> Hi Timo, >> my objective is to create a web interface that allows me to edit and >> deploy jobs on Flink. >> >> To do so I'm evaluating all possibilities provided by Flink APIs. >> >> What do you think that is the best solution? >> >> Thanks >> Luigi >> >> Il 18/gen/2018 09:39, "Timo Walther" <twal...@apache.org> ha scritto: >> >>> Hi Luigi, >>> >>> can you try to load an entire configuration file via >>> GlobalConfiguration.loadConfiguration(flinkConfigDir). Maybe you tell >>> us a little bit what you want to achieve? >>> >>> Is the programmatic submission a requirement for you? Did you consider >>> using the RemoteStreamEnvironment? >>> >>> Regards, >>> Timo >>> >>> >>> Am 1/17/18 um 5:08 PM schrieb Luigi Sgaglione: >>> >>> Hi, >>> >>> I am a beginner in Flink and I'm trying to deploy a simple example using >>> a java client in a remote Flink server (1.4.0). >>> >>> I'm using org.apache.flink.client.program.Client >>> >>> this is the used code: >>> >>> Configuration config = new Configuration(); >>> config.setString("jobmanager.rpc.address", "192.168.149.130"); >>> config.setInteger("jobmanager.rpc.port", 6123); >>> >>> Client c = new Client(config); >>> >>> PackagedProgram prg = new PackagedProgram(new File("target/Test-1.jar")); >>> c.runDetached(prg, 1); >>> >>> >>> but when I try to deploy the jar I receive the following error: >>> >>> 16:03:20,035 INFO org.apache.flink.client.program.Client >>> - Looking up JobManager >>> Exception in thread "main" >>> org.apache.flink.client.program.ProgramInvocationException: >>> *Failed to retrieve the JobManager gateway.* >>> at org.apache.flink.client.program.Client.runDetached(Client.java:380) >>> at org.apache.flink.client.program.Client.runDetached(Client.java:355) >>> at org.apache.flink.client.program.Client.runDetached(Client.java:340) >>> at org.apache.flink.client.program.ContextEnvironment.execute(C >>> ontextEnvironment.java:74) >>> at org.apache.flink.api.java.ExecutionEnvironment.execute(Execu >>> tionEnvironment.java:804) >>> at org.apache.flink.api.java.DataSet.collect(DataSet.java:410) >>> at org.apache.flink.api.java.DataSet.print(DataSet.java:1495) >>> at flink.Job.main(Job.java:67) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) >>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) >>> at java.lang.reflect.Method.invoke(Unknown Source) >>> at org.apache.flink.client.program.PackagedProgram.callMainMeth >>> od(PackagedProgram.java:497) >>> at org.apache.flink.client.program.PackagedProgram.invokeIntera >>> ctiveModeForExecution(PackagedProgram.java:395) >>> at org.apache.flink.client.program.Client.runDetached(Client.java:279) >>> at flink.DeployJob.main(DeployJob.java:24) >>> Caused by: >>> org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: >>> *Could not retrieve the leader gateway* >>> at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveL >>> eaderGateway(LeaderRetrievalUtils.java:102) >>> at org.apache.flink.client.program.Client.getJobManagerGateway( >>> Client.java:567) >>> at org.apache.flink.client.program.Client.runDetached(Client.java:378) >>> ... 15 more >>> Caused by: java.util.concurrent.TimeoutException: Futures timed out >>> after [100000 milliseconds] >>> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) >>> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise. >>> scala:223) >>> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:116) >>> at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(B >>> lockContext.scala:53) >>> at scala.concurrent.Await$.result(package.scala:116) >>> at scala.concurrent.Await.result(package.scala) >>> at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveL >>> eaderGateway(LeaderRetrievalUtils.java:100) >>> ... 17 more >>> >>> >>> >>> Maybe I missed some configuration of the client. >>> Can you help me to solve the problem? >>> >>> Thanks >>> >>> >>> >> >