Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6147#discussion_r195089986 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -28,35 +33,51 @@ import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; +import org.apache.flink.runtime.util.ScalaUtils; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import akka.actor.AddressFromURIString; +import org.slf4j.Logger; + import javax.annotation.Nonnull; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.ObjectInputStream; +import java.net.InetSocketAddress; +import java.nio.file.Path; +import java.util.Collection; +import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.stream.Collectors; /** * This handler can be used to submit jobs to a Flink cluster. */ public final class JobSubmitHandler extends AbstractRestHandler<DispatcherGateway, JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> { + private final Configuration config; + public JobSubmitHandler( CompletableFuture<String> localRestAddress, GatewayRetriever<? extends DispatcherGateway> leaderRetriever, Time timeout, - Map<String, String> headers) { + Map<String, String> headers, + Configuration config) { super(localRestAddress, leaderRetriever, timeout, headers, JobSubmitHeaders.getInstance()); + this.config = config; } @Override protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { + final JobSubmitRequestBody requestBody = request.getRequestBody(); JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); + try (ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(requestBody.serializedJobGraph))) { --- End diff -- Should we send the `serializedJobGraph` also as part of the post request body instead of passing it through jackson to encode it in base64.
---