[ 
https://issues.apache.org/jira/browse/FLINK-7551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16601892#comment-16601892
 ] 

ASF GitHub Bot commented on FLINK-7551:
---------------------------------------

zentol closed pull request #6602:  [FLINK-7551][rest] Add versioning to REST API
URL: https://github.com/apache/flink/pull/6602
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/rest_dispatcher.html 
b/docs/_includes/generated/rest_v1_dispatcher.html
similarity index 100%
rename from docs/_includes/generated/rest_dispatcher.html
rename to docs/_includes/generated/rest_v1_dispatcher.html
diff --git a/docs/monitoring/rest_api.md b/docs/monitoring/rest_api.md
index ec92051b396..fdbaa705e59 100644
--- a/docs/monitoring/rest_api.md
+++ b/docs/monitoring/rest_api.md
@@ -52,13 +52,26 @@ To add new requests, one needs to
 A good example is the 
`org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler` that uses the 
`org.apache.flink.runtime.rest.messages.JobExceptionsHeaders`.
 
 
-## Available Requests
+## API
 
-### Dispatcher
+The REST API is versioned, with specific versions being queryable by prefixing 
the url with the version prefix. Prefixes are always of the form 
`v[version_number]`.
+For example, to access version 1 of `/foo/bar` one would query `/v1/foo/bar`.
 
-{% include generated/rest_dispatcher.html %}
+If no version is specified Flink will default to the *oldest* version 
supporting the request.
 
-## Legacy
+Querying unsupported/non-existing versions will return a 404 error.
+
+<span class="label label-danger">Attention</span> REST API versioning is *not* 
active if the cluster runs in [legacy mode](../ops/config.html#mode). For this 
case please refer to the legacy API below.
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="v1" markdown="1">
+#### Dispatcher
+
+{% include generated/rest_v1_dispatcher.html %}
+</div>
+
+<div data-lang="legacy" markdown="1">
 
 This section is only relevant if the cluster runs in [legacy 
mode](../ops/config.html#mode).
 
@@ -90,7 +103,7 @@ Values in angle brackets are variables, for example 
`http://hostname:8081/jobs/<
   - `/jars/:jarid/run`
 
 
-### General
+#### General
 
 **`/config`**
 
@@ -126,7 +139,7 @@ Sample Result:
 }
 {% endhighlight %}
 
-### Overview of Jobs
+#### Overview of Jobs
 
 **`/jobs/overview`**
 
@@ -163,7 +176,7 @@ Sample Result:
 }
 {% endhighlight %}
 
-### Details of a Running or Completed Job
+#### Details of a Running or Completed Job
 
 **`/jobs/<jobid>`**
 
@@ -573,15 +586,15 @@ Sample Result:
 }
 {% endhighlight %}
 
-### Job Cancellation
+#### Job Cancellation
 
-#### Cancel Job
+##### Cancel Job
 
 `DELETE` request to **`/jobs/:jobid/cancel`**.
 
 Triggers job cancellation, result on success is `{}`.
 
-#### Cancel Job with Savepoint
+##### Cancel Job with Savepoint
 
 Triggers a savepoint and cancels the job after the savepoint succeeds.
 
@@ -601,7 +614,7 @@ Sample Trigger Result:
 }
 {% endhighlight %}
 
-##### Monitoring Progress
+###### Monitoring Progress
 
 The progress of the cancellation has to be monitored by the user at
 
@@ -611,7 +624,7 @@ The progress of the cancellation has to be monitored by the 
user at
 
 The request ID is returned by the trigger result.
 
-###### In-Progress
+####### In-Progress
 
 {% highlight json %}
 {
@@ -620,7 +633,7 @@ The request ID is returned by the trigger result.
 }
 {% endhighlight %}
 
-###### Success
+####### Success
 
 {% highlight json %}
 {
@@ -632,7 +645,7 @@ The request ID is returned by the trigger result.
 
 The `savepointPath` points to the external path of the savepoint, which can be 
used to resume the savepoint.
 
-###### Failed
+####### Failed
 
 {% highlight json %}
 {
@@ -642,11 +655,11 @@ The `savepointPath` points to the external path of the 
savepoint, which can be u
 }
 {% endhighlight %}
 
-### Submitting Programs
+#### Submitting Programs
 
 It is possible to upload, run, and list Flink programs via the REST APIs and 
web frontend.
 
-#### Upload a new JAR file
+##### Upload a new JAR file
 
 Send a `POST` request to `/jars/upload` with your jar file sent as multi-part 
data under the `jarfile` file.
 Also make sure that the multi-part data includes the `Content-Type` of the 
file itself, some http libraries do not add the header by default.
@@ -659,7 +672,7 @@ Content-Disposition: form-data; name="jarfile"; 
filename="YourFileName.jar"
 Content-Type: application/x-java-archive
 {% endhighlight %}
 
-#### Run a Program (POST)
+##### Run a Program (POST)
 
 Send a `POST` request to `/jars/:jarid/run`. The `jarid` parameter is the file 
name of the program JAR in the configured web frontend upload directory 
(configuration key `jobmanager.web.upload.dir`).
 
@@ -688,3 +701,7 @@ Response:
 {% endhighlight %}
 
 {% top %}
+</div>
+
+</div>
+
diff --git 
a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java 
b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index 82fdeec7cf6..4df1d6ee71b 100644
--- 
a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ 
b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -37,6 +37,7 @@
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessagePathParameter;
 import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
@@ -123,13 +124,24 @@
        public static void main(String[] args) throws IOException {
                String outputDirectory = args[0];
 
-               createHtmlFile(new DocumentingDispatcherRestEndpoint(), 
Paths.get(outputDirectory, "rest_dispatcher.html"));
+               for (final RestAPIVersion apiVersion : RestAPIVersion.values()) 
{
+                       if (apiVersion == RestAPIVersion.V0) {
+                               // this version exists only for testing purposes
+                               continue;
+                       }
+                       createHtmlFile(
+                               new DocumentingDispatcherRestEndpoint(),
+                               apiVersion,
+                               Paths.get(outputDirectory, "rest_" + 
apiVersion.getURLVersionPrefix() + "_dispatcher.html"));
+               }
        }
 
-       private static void createHtmlFile(DocumentingRestEndpoint 
restEndpoint, Path outputFile) throws IOException {
+       private static void createHtmlFile(DocumentingRestEndpoint 
restEndpoint, RestAPIVersion apiVersion, Path outputFile) throws IOException {
                StringBuilder html = new StringBuilder();
 
-               List<MessageHeaders> specs = restEndpoint.getSpecs();
+               List<MessageHeaders> specs = restEndpoint.getSpecs().stream()
+                       .filter(spec -> 
spec.getSupportedAPIVersions().contains(apiVersion))
+                       .collect(Collectors.toList());
                specs.forEach(spec -> html.append(createHtmlEntry(spec)));
 
                Files.deleteIfExists(outputFile);
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
index b0c2102ed4f..d8542d69587 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
@@ -28,6 +28,8 @@
 
 import 
org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
 import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
@@ -57,6 +59,7 @@
 import java.nio.file.Files;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.util.Collections;
 import java.util.Date;
 import java.util.Locale;
 
@@ -158,7 +161,12 @@ private void respondWithFile(ChannelHandlerContext ctx, 
HttpRequest request, Str
                                } finally {
                                        if (!success) {
                                                LOG.debug("Unable to load 
requested file {} from classloader", pathToLoad);
-                                               
StaticFileServerHandler.sendError(ctx, NOT_FOUND);
+                                               HandlerUtils.sendErrorResponse(
+                                                       ctx,
+                                                       request,
+                                                       new 
ErrorResponseBody("File not found."),
+                                                       NOT_FOUND,
+                                                       Collections.emptyMap());
                                                return;
                                        }
                                }
@@ -166,12 +174,22 @@ private void respondWithFile(ChannelHandlerContext ctx, 
HttpRequest request, Str
                }
 
                if (!file.exists() || file.isHidden() || file.isDirectory() || 
!file.isFile()) {
-                       StaticFileServerHandler.sendError(ctx, NOT_FOUND);
+                       HandlerUtils.sendErrorResponse(
+                               ctx,
+                               request,
+                               new ErrorResponseBody("File not found."),
+                               NOT_FOUND,
+                               Collections.emptyMap());
                        return;
                }
 
                if 
(!file.getCanonicalFile().toPath().startsWith(rootPath.toPath())) {
-                       StaticFileServerHandler.sendError(ctx, NOT_FOUND);
+                       HandlerUtils.sendErrorResponse(
+                               ctx,
+                               request,
+                               new ErrorResponseBody("File not found."),
+                               NOT_FOUND,
+                               Collections.emptyMap());
                        return;
                }
 
@@ -204,7 +222,12 @@ private void respondWithFile(ChannelHandlerContext ctx, 
HttpRequest request, Str
                try {
                        raf = new RandomAccessFile(file, "r");
                } catch (FileNotFoundException e) {
-                       StaticFileServerHandler.sendError(ctx, NOT_FOUND);
+                       HandlerUtils.sendErrorResponse(
+                               ctx,
+                               request,
+                               new ErrorResponseBody("File not found."),
+                               NOT_FOUND,
+                               Collections.emptyMap());
                        return;
                }
 
@@ -244,7 +267,12 @@ private void respondWithFile(ChannelHandlerContext ctx, 
HttpRequest request, Str
                } catch (Exception e) {
                        raf.close();
                        LOG.error("Failed to serve file.", e);
-                       StaticFileServerHandler.sendError(ctx, 
INTERNAL_SERVER_ERROR);
+                       HandlerUtils.sendErrorResponse(
+                               ctx,
+                               request,
+                               new ErrorResponseBody("Internal server error."),
+                               INTERNAL_SERVER_ERROR,
+                               Collections.emptyMap());
                }
        }
 
@@ -252,7 +280,12 @@ private void respondWithFile(ChannelHandlerContext ctx, 
HttpRequest request, Str
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
{
                if (ctx.channel().isActive()) {
                        LOG.error("Caught exception", cause);
-                       StaticFileServerHandler.sendError(ctx, 
INTERNAL_SERVER_ERROR);
+                       HandlerUtils.sendErrorResponse(
+                               ctx,
+                               false,
+                               new ErrorResponseBody("Internal server error."),
+                               INTERNAL_SERVER_ERROR,
+                               Collections.emptyMap());
                }
        }
 }
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
index b08504d0758..19a3d52da58 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
@@ -30,6 +30,8 @@
 
 import java.io.File;
 
+import static org.hamcrest.CoreMatchers.containsString;
+
 /**
  * Tests for the HistoryServerStaticFileServerHandler.
  */
@@ -56,7 +58,7 @@ public void testRespondWithFile() throws Exception {
                try {
                        // verify that 404 message is returned when requesting 
a non-existent file
                        String notFound404 = 
HistoryServerTest.getFromHTTP("http://localhost:"; + port + "/hello");
-                       Assert.assertTrue(notFound404.contains("404 Not 
Found"));
+                       Assert.assertThat(notFound404, containsString("not 
found"));
 
                        // verify that a) a file can be loaded using the 
ClassLoader and b) that the HistoryServer
                        // index_hs.html is injected
@@ -71,12 +73,12 @@ public void testRespondWithFile() throws Exception {
                        File dir = new File(webDir, "dir.json");
                        dir.mkdirs();
                        String dirNotFound404 = 
HistoryServerTest.getFromHTTP("http://localhost:"; + port + "/dir");
-                       Assert.assertTrue(dirNotFound404.contains("404 Not 
Found"));
+                       Assert.assertTrue(dirNotFound404.contains("not found"));
 
                        // verify that a 404 message is returned when 
requesting a file outside the webDir
                        tmp.newFile("secret");
                        String x = 
HistoryServerTest.getFromHTTP("http://localhost:"; + port + "/../secret");
-                       Assert.assertTrue(x.contains("404 Not Found"));
+                       Assert.assertTrue(x.contains("not found"));
                } finally {
                        webUI.shutdown();
                }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 2e9de4c168d..a8557496b08 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -31,6 +31,7 @@
 import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.rest.util.RestConstants;
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -85,6 +86,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE;
 
@@ -173,6 +175,24 @@ public void shutdown(Time timeout) {
                        U messageParameters,
                        R request,
                        Collection<FileUpload> fileUploads) throws IOException {
+               return sendRequest(
+                       targetAddress,
+                       targetPort,
+                       messageHeaders,
+                       messageParameters,
+                       request,
+                       fileUploads,
+                       
RestAPIVersion.getLatestVersion(messageHeaders.getSupportedAPIVersions()));
+       }
+
+       public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(
+                       String targetAddress,
+                       int targetPort,
+                       M messageHeaders,
+                       U messageParameters,
+                       R request,
+                       Collection<FileUpload> fileUploads,
+                       RestAPIVersion apiVersion) throws IOException {
                Preconditions.checkNotNull(targetAddress);
                Preconditions.checkArgument(0 <= targetPort && targetPort < 
65536, "The target port " + targetPort + " is not in the range (0, 65536].");
                Preconditions.checkNotNull(messageHeaders);
@@ -181,7 +201,17 @@ public void shutdown(Time timeout) {
                Preconditions.checkNotNull(fileUploads);
                Preconditions.checkState(messageParameters.isResolved(), 
"Message parameters were not resolved.");
 
-               String targetUrl = 
MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), 
messageParameters);
+               if 
(!messageHeaders.getSupportedAPIVersions().contains(apiVersion)) {
+                       throw new IllegalArgumentException(String.format(
+                               "The requested version %s is not supported by 
the request (method=%s URL=%s). Supported versions are: %s.",
+                               apiVersion,
+                               messageHeaders.getHttpMethod(),
+                               messageHeaders.getTargetRestEndpointURL(),
+                               
messageHeaders.getSupportedAPIVersions().stream().map(RestAPIVersion::getURLVersionPrefix).collect(Collectors.joining(","))));
+               }
+
+               String versionedHandlerURL = "/" + 
apiVersion.getURLVersionPrefix() + messageHeaders.getTargetRestEndpointURL();
+               String targetUrl = 
MessageParameters.resolveUrl(versionedHandlerURL, messageParameters);
 
                LOG.debug("Sending request of class {} to {}:{}{}", 
request.getClass(), targetAddress, targetPort, targetUrl);
                // serialize payload
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index e836e357b5d..28af072a10c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -27,6 +27,7 @@
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.router.Router;
 import org.apache.flink.runtime.rest.handler.router.RouterHandler;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.Preconditions;
@@ -144,8 +145,7 @@ public final void start() throws Exception {
                                RestHandlerUrlComparator.INSTANCE);
 
                        handlers.forEach(handler -> {
-                               log.debug("Register handler {} under {}@{}.", 
handler.f1, handler.f0.getHttpMethod(), handler.f0.getTargetRestEndpointURL());
-                               registerHandler(router, handler);
+                               registerHandler(router, handler, log);
                        });
 
                        ChannelInitializer<SocketChannel> initializer = new 
ChannelInitializer<SocketChannel>() {
@@ -364,22 +364,37 @@ public String getRestBaseUrl() {
                }
        }
 
-       private static void registerHandler(Router router, 
Tuple2<RestHandlerSpecification, ChannelInboundHandler> specificationHandler) {
-               switch (specificationHandler.f0.getHttpMethod()) {
+       private static void registerHandler(Router router, 
Tuple2<RestHandlerSpecification, ChannelInboundHandler> specificationHandler, 
Logger log) {
+               final String handlerURL = 
specificationHandler.f0.getTargetRestEndpointURL();
+               // setup versioned urls
+               for (final RestAPIVersion supportedVersion : 
specificationHandler.f0.getSupportedAPIVersions()) {
+                       final String versionedHandlerURL = '/' + 
supportedVersion.getURLVersionPrefix() + handlerURL;
+                       log.debug("Register handler {} under {}@{}.", 
specificationHandler.f1, specificationHandler.f0.getHttpMethod(), 
versionedHandlerURL);
+                       registerHandler(router, versionedHandlerURL, 
specificationHandler.f0.getHttpMethod(), specificationHandler.f1);
+                       if (supportedVersion.isDefaultVersion()) {
+                               // setup unversioned url for convenience and 
backwards compatibility
+                               log.debug("Register handler {} under {}@{}.", 
specificationHandler.f1, specificationHandler.f0.getHttpMethod(), handlerURL);
+                               registerHandler(router, handlerURL, 
specificationHandler.f0.getHttpMethod(), specificationHandler.f1);
+                       }
+               }
+       }
+
+       private static void registerHandler(Router router, String handlerURL, 
HttpMethodWrapper httpMethod, ChannelInboundHandler handler) {
+               switch (httpMethod) {
                        case GET:
-                               
router.addGet(specificationHandler.f0.getTargetRestEndpointURL(), 
specificationHandler.f1);
+                               router.addGet(handlerURL, handler);
                                break;
                        case POST:
-                               
router.addPost(specificationHandler.f0.getTargetRestEndpointURL(), 
specificationHandler.f1);
+                               router.addPost(handlerURL, handler);
                                break;
                        case DELETE:
-                               
router.addDelete(specificationHandler.f0.getTargetRestEndpointURL(), 
specificationHandler.f1);
+                               router.addDelete(handlerURL, handler);
                                break;
                        case PATCH:
-                               
router.addPatch(specificationHandler.f0.getTargetRestEndpointURL(), 
specificationHandler.f1);
+                               router.addPatch(handlerURL, handler);
                                break;
                        default:
-                               throw new RuntimeException("Unsupported http 
method: " + specificationHandler.f0.getHttpMethod() + '.');
+                               throw new RuntimeException("Unsupported http 
method: " + httpMethod + '.');
                }
        }
 
@@ -437,13 +452,22 @@ private static synchronized void 
checkAndCreateUploadDir(final Path uploadDir, f
 
                private static final Comparator<String> CASE_INSENSITIVE_ORDER 
= new CaseInsensitiveOrderComparator();
 
+               private static final Comparator<RestAPIVersion> 
API_VERSION_ORDER = new RestAPIVersion.RestAPIVersionComparator();
+
                static final RestHandlerUrlComparator INSTANCE = new 
RestHandlerUrlComparator();
 
                @Override
                public int compare(
                                Tuple2<RestHandlerSpecification, 
ChannelInboundHandler> o1,
                                Tuple2<RestHandlerSpecification, 
ChannelInboundHandler> o2) {
-                       return 
CASE_INSENSITIVE_ORDER.compare(o1.f0.getTargetRestEndpointURL(), 
o2.f0.getTargetRestEndpointURL());
+                       final int urlComparisonResult = 
CASE_INSENSITIVE_ORDER.compare(o1.f0.getTargetRestEndpointURL(), 
o2.f0.getTargetRestEndpointURL());
+                       if (urlComparisonResult != 0) {
+                               return urlComparisonResult;
+                       } else {
+                               return API_VERSION_ORDER.compare(
+                                       
Collections.min(o1.f0.getSupportedAPIVersions()),
+                                       
Collections.min(o2.f0.getSupportedAPIVersions()));
+                       }
                }
 
                /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
index 4ebcd49c159..656167974b3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
@@ -19,6 +19,10 @@
 package org.apache.flink.runtime.rest.handler;
 
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+
+import java.util.Collection;
+import java.util.Collections;
 
 /**
  * Rest handler interface which all rest handler implementation have to 
implement.
@@ -38,4 +42,13 @@
         * @return endpoint url that this request should be sent to
         */
        String getTargetRestEndpointURL();
+
+       /**
+        * Returns the supported API versions that this request supports.
+        *
+        * @return Collection of supported API versions
+        */
+       default Collection<RestAPIVersion> getSupportedAPIVersions() {
+               return Collections.singleton(RestAPIVersion.V1);
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
index 62b94e50bc8..969945f749d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
@@ -29,11 +29,12 @@
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.rest.handler.RedirectHandler;
 import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
 import org.apache.flink.runtime.rest.handler.util.MimeTypes;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
-import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
@@ -50,7 +51,6 @@
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
 import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;
-import org.apache.flink.shaded.netty4.io.netty.util.CharsetUtil;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -184,7 +184,12 @@ private void respondToRequest(ChannelHandlerContext ctx, 
HttpRequest request, St
                                } finally {
                                        if (!success) {
                                                logger.debug("Unable to load 
requested file {} from classloader", requestPath);
-                                               sendError(ctx, NOT_FOUND);
+                                               HandlerUtils.sendErrorResponse(
+                                                       ctx,
+                                                       request,
+                                                       new 
ErrorResponseBody(String.format("Unable to load requested file %s.", 
requestPath)),
+                                                       NOT_FOUND,
+                                                       responseHeaders);
                                                return;
                                        }
                                }
@@ -192,12 +197,22 @@ private void respondToRequest(ChannelHandlerContext ctx, 
HttpRequest request, St
                }
 
                if (!file.exists() || file.isHidden() || file.isDirectory() || 
!file.isFile()) {
-                       sendError(ctx, NOT_FOUND);
+                       HandlerUtils.sendErrorResponse(
+                               ctx,
+                               request,
+                               new ErrorResponseBody("File not found."),
+                               NOT_FOUND,
+                               responseHeaders);
                        return;
                }
 
                if 
(!file.getCanonicalFile().toPath().startsWith(rootPath.toPath())) {
-                       sendError(ctx, NOT_FOUND);
+                       HandlerUtils.sendErrorResponse(
+                               ctx,
+                               request,
+                               new ErrorResponseBody("File not found."),
+                               NOT_FOUND,
+                               responseHeaders);
                        return;
                }
 
@@ -231,7 +246,12 @@ private void respondToRequest(ChannelHandlerContext ctx, 
HttpRequest request, St
                        raf = new RandomAccessFile(file, "r");
                }
                catch (FileNotFoundException e) {
-                       sendError(ctx, NOT_FOUND);
+                       HandlerUtils.sendErrorResponse(
+                               ctx,
+                               request,
+                               new ErrorResponseBody("File not found."),
+                               HttpResponseStatus.NOT_FOUND,
+                               responseHeaders);
                        return;
                }
 
@@ -271,7 +291,12 @@ private void respondToRequest(ChannelHandlerContext ctx, 
HttpRequest request, St
                } catch (Exception e) {
                        raf.close();
                        logger.error("Failed to serve file.", e);
-                       sendError(ctx, INTERNAL_SERVER_ERROR);
+                       HandlerUtils.sendErrorResponse(
+                               ctx,
+                               request,
+                               new ErrorResponseBody("Internal server error."),
+                               INTERNAL_SERVER_ERROR,
+                               responseHeaders);
                }
        }
 
@@ -279,7 +304,12 @@ private void respondToRequest(ChannelHandlerContext ctx, 
HttpRequest request, St
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
{
                if (ctx.channel().isActive()) {
                        logger.error("Caught exception", cause);
-                       sendError(ctx, INTERNAL_SERVER_ERROR);
+                       HandlerUtils.sendErrorResponse(
+                               ctx,
+                               false,
+                               new ErrorResponseBody("Internal server error."),
+                               INTERNAL_SERVER_ERROR,
+                               Collections.emptyMap());
                }
        }
 
@@ -287,21 +317,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, 
Throwable cause) {
        //  Utilities to encode headers and responses
        // 
------------------------------------------------------------------------
 
-       /**
-        * Writes a simple  error response message.
-        *
-        * @param ctx    The channel context to write the response to.
-        * @param status The response status.
-        */
-       public static void sendError(ChannelHandlerContext ctx, 
HttpResponseStatus status) {
-               FullHttpResponse response = new DefaultFullHttpResponse(
-                               HTTP_1_1, status, 
Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8));
-               response.headers().set(CONTENT_TYPE, "text/plain; 
charset=UTF-8");
-
-               // close the connection as soon as the error message is sent.
-               
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
-       }
-
        /**
         * Send the "304 Not Modified" response. This response can be used when 
the
         * file timestamp is the same as what the browser is sending up.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java
new file mode 100644
index 00000000000..d6305635ec0
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.versioning;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+
+/**
+ * An enum for all versions of the REST API.
+ *
+ * <p>REST API versions are global and thus apply to every REST component.
+ *
+ * <p>Changes that must result in an API version increment include but are not 
limited to:
+ * - modification of a handler url
+ * - addition of new mandatory parameters
+ * - removal of a handler/request
+ * - modifications to request/response bodies (excluding additions)
+ */
+public enum RestAPIVersion {
+       V0(0, false), // strictly for testing purposes
+       V1(1, true);
+
+       private final int versionNumber;
+
+       private final boolean isDefaultVersion;
+
+       RestAPIVersion(int versionNumber, boolean isDefaultVersion) {
+               this.versionNumber = versionNumber;
+               this.isDefaultVersion = isDefaultVersion;
+       }
+
+       /**
+        * Returns the URL version prefix (e.g. "v1") for this version.
+        *
+        * @return URL version prefix
+        */
+       public String getURLVersionPrefix() {
+               return name().toLowerCase();
+       }
+
+       /**
+        * Returns whether this version is the default REST API version.
+        *
+        * @return whether this version is the default
+        */
+       public boolean isDefaultVersion() {
+               return isDefaultVersion;
+       }
+
+       /**
+        * Converts the given URL version prefix (e.g "v1") to a {@link 
RestAPIVersion}.
+        *
+        * @param prefix prefix to converted
+        * @return REST API version matching the prefix
+        * @throws IllegalArgumentException if the prefix doesn't match any 
version
+        */
+       public static RestAPIVersion fromURLVersionPrefix(String prefix) {
+               return valueOf(prefix.toUpperCase());
+       }
+
+       /**
+        * Returns the latest version from the given collection.
+        *
+        * @param versions possible candidates
+        * @return latest version
+        */
+       public static RestAPIVersion 
getLatestVersion(Collection<RestAPIVersion> versions) {
+               return Collections.max(versions, new 
RestAPIVersionComparator());
+       }
+
+       /**
+        * Comparator for {@link RestAPIVersion} that sorts versions based on 
their version number, i.e. oldest to latest.
+        */
+       public static class RestAPIVersionComparator implements 
Comparator<RestAPIVersion> {
+
+               @Override
+               public int compare(RestAPIVersion o1, RestAPIVersion o2) {
+                       return Integer.compare(o1.versionNumber, 
o2.versionNumber);
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
index 209f2d1d351..22cd6f62063 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
@@ -25,14 +25,18 @@
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -45,12 +49,13 @@
  */
 public class RestClientTest extends TestLogger {
 
+       private static final String unroutableIp = "10.255.255.1";
+
        @Test
        public void testConnectionTimeout() throws Exception {
                final Configuration config = new Configuration();
                config.setLong(RestOptions.CONNECTION_TIMEOUT, 1);
                final RestClient restClient = new 
RestClient(RestClientConfiguration.fromConfiguration(config), 
Executors.directExecutor());
-               final String unroutableIp = "10.255.255.1";
                try {
                        restClient.sendRequest(
                                unroutableIp,
@@ -66,6 +71,27 @@ public void testConnectionTimeout() throws Exception {
                }
        }
 
+       @Test
+       public void testInvalidVersionRejection() throws Exception {
+               final RestClient restClient = new 
RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), 
Executors.directExecutor());
+
+               try {
+                       CompletableFuture<EmptyResponseBody> 
invalidVersionResponse = restClient.sendRequest(
+                               unroutableIp,
+                               80,
+                               new TestMessageHeaders(),
+                               EmptyMessageParameters.getInstance(),
+                               EmptyRequestBody.getInstance(),
+                               Collections.emptyList(),
+                               RestAPIVersion.V0
+                       );
+                       Assert.fail("The request should have been rejected due 
to a version mismatch.");
+               } catch (IllegalArgumentException e) {
+                       // expected
+               }
+
+       }
+
        private static class TestMessageHeaders implements 
MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
 
                @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 31f78e36cf8..b017610aa3a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -43,6 +43,7 @@
 import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.util.RestClientException;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
@@ -56,8 +57,12 @@
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
 import org.apache.commons.io.IOUtils;
 import org.junit.After;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -85,6 +90,7 @@
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
@@ -184,6 +190,21 @@ public void setup() throws Exception {
                        mockGatewayRetriever,
                        RpcUtils.INF_TIMEOUT);
 
+               TestVersionHandler testVersionHandler = new TestVersionHandler(
+                       CompletableFuture.completedFuture(restAddress),
+                       mockGatewayRetriever,
+                       RpcUtils.INF_TIMEOUT);
+
+               TestVersionSelectionHandler1 testVersionSelectionHandler1 = new 
TestVersionSelectionHandler1(
+                       CompletableFuture.completedFuture(restAddress),
+                       mockGatewayRetriever,
+                       RpcUtils.INF_TIMEOUT);
+
+               TestVersionSelectionHandler2 testVersionSelectionHandler2 = new 
TestVersionSelectionHandler2(
+                       CompletableFuture.completedFuture(restAddress),
+                       mockGatewayRetriever,
+                       RpcUtils.INF_TIMEOUT);
+
                testUploadHandler = new TestUploadHandler(
                        CompletableFuture.completedFuture(restAddress),
                        mockGatewayRetriever,
@@ -198,6 +219,9 @@ public void setup() throws Exception {
                final List<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> handlers = Arrays.asList(
                        Tuple2.of(new TestHeaders(), testHandler),
                        Tuple2.of(TestUploadHeaders.INSTANCE, 
testUploadHandler),
+                       Tuple2.of(testVersionHandler.getMessageHeaders(), 
testVersionHandler),
+                       
Tuple2.of(testVersionSelectionHandler1.getMessageHeaders(), 
testVersionSelectionHandler1),
+                       
Tuple2.of(testVersionSelectionHandler2.getMessageHeaders(), 
testVersionSelectionHandler2),
                        Tuple2.of(WebContentHandlerSpecification.getInstance(), 
staticFileServerHandler));
 
                serverEndpoint = new TestRestServerEndpoint(serverConfig, 
handlers);
@@ -415,6 +439,88 @@ public void testStaticFileServerHandler() throws Exception 
{
                assertEquals("foobar", fileContents.trim());
        }
 
+       @Test
+       public void testVersioning() throws Exception {
+               CompletableFuture<EmptyResponseBody> unspecifiedVersionResponse 
= restClient.sendRequest(
+                       serverAddress.getHostName(),
+                       serverAddress.getPort(),
+                       TestVersionHeaders.INSTANCE,
+                       EmptyMessageParameters.getInstance(),
+                       EmptyRequestBody.getInstance(),
+                       Collections.emptyList()
+               );
+
+               unspecifiedVersionResponse.get(5, TimeUnit.SECONDS);
+
+               CompletableFuture<EmptyResponseBody> specifiedVersionResponse = 
restClient.sendRequest(
+                       serverAddress.getHostName(),
+                       serverAddress.getPort(),
+                       TestVersionHeaders.INSTANCE,
+                       EmptyMessageParameters.getInstance(),
+                       EmptyRequestBody.getInstance(),
+                       Collections.emptyList(),
+                       RestAPIVersion.V1
+               );
+
+               specifiedVersionResponse.get(5, TimeUnit.SECONDS);
+       }
+
+       @Test
+       public void testVersionSelection() throws Exception {
+               CompletableFuture<EmptyResponseBody> version1Response = 
restClient.sendRequest(
+                       serverAddress.getHostName(),
+                       serverAddress.getPort(),
+                       TestVersionSelectionHeaders1.INSTANCE,
+                       EmptyMessageParameters.getInstance(),
+                       EmptyRequestBody.getInstance(),
+                       Collections.emptyList(),
+                       RestAPIVersion.V0
+               );
+
+               try {
+                       version1Response.get(5, TimeUnit.SECONDS);
+                       fail();
+               } catch (ExecutionException ee) {
+                       RestClientException rce = (RestClientException) 
ee.getCause();
+                       assertEquals(HttpResponseStatus.OK, 
rce.getHttpResponseStatus());
+               }
+
+               CompletableFuture<EmptyResponseBody> version2Response = 
restClient.sendRequest(
+                       serverAddress.getHostName(),
+                       serverAddress.getPort(),
+                       TestVersionSelectionHeaders2.INSTANCE,
+                       EmptyMessageParameters.getInstance(),
+                       EmptyRequestBody.getInstance(),
+                       Collections.emptyList(),
+                       RestAPIVersion.V1
+               );
+
+               try {
+                       version2Response.get(5, TimeUnit.SECONDS);
+                       fail();
+               } catch (ExecutionException ee) {
+                       RestClientException rce = (RestClientException) 
ee.getCause();
+                       assertEquals(HttpResponseStatus.ACCEPTED, 
rce.getHttpResponseStatus());
+               }
+       }
+
+       @Test
+       public void testDefaultVersionRouting() throws Exception {
+               Assume.assumeFalse(
+                       "Ignoring SSL-enabled test to keep OkHttp usage 
simple.",
+                       config.getBoolean(SecurityOptions.SSL_REST_ENABLED));
+
+               OkHttpClient client = new OkHttpClient();
+
+               final Request request = new Request.Builder()
+                       .url(serverEndpoint.getRestBaseUrl() + 
TestVersionSelectionHeaders2.INSTANCE.getTargetRestEndpointURL())
+                       .build();
+
+               try (final Response response = 
client.newCall(request).execute()) {
+                       assertEquals(HttpResponseStatus.ACCEPTED.code(), 
response.code());
+               }
+       }
+
        private HttpURLConnection openHttpConnectionForUpload(final String 
boundary) throws IOException {
                final HttpURLConnection connection =
                        (HttpURLConnection) new 
URL(serverEndpoint.getRestBaseUrl() + "/upload").openConnection();
@@ -697,6 +803,151 @@ private TestUploadHandler(
                }
        }
 
+       private static class TestVersionHandler extends 
AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, 
EmptyMessageParameters> {
+
+               private TestVersionHandler(
+                       final CompletableFuture<String> localRestAddress,
+                       final GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+                       final Time timeout) {
+                       super(localRestAddress, leaderRetriever, timeout, 
Collections.emptyMap(), TestVersionHeaders.INSTANCE);
+               }
+
+               @Override
+               protected CompletableFuture<EmptyResponseBody> 
handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> 
request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+                       return 
CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+               }
+       }
+
+       private enum TestVersionHeaders implements 
MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+               INSTANCE;
+
+               @Override
+               public Class<EmptyRequestBody> getRequestClass() {
+                       return EmptyRequestBody.class;
+               }
+
+               @Override
+               public HttpMethodWrapper getHttpMethod() {
+                       return HttpMethodWrapper.GET;
+               }
+
+               @Override
+               public String getTargetRestEndpointURL() {
+                       return "/test/versioning";
+               }
+
+               @Override
+               public Class<EmptyResponseBody> getResponseClass() {
+                       return EmptyResponseBody.class;
+               }
+
+               @Override
+               public HttpResponseStatus getResponseStatusCode() {
+                       return HttpResponseStatus.OK;
+               }
+
+               @Override
+               public String getDescription() {
+                       return null;
+               }
+
+               @Override
+               public EmptyMessageParameters getUnresolvedMessageParameters() {
+                       return EmptyMessageParameters.getInstance();
+               }
+
+               @Override
+               public Collection<RestAPIVersion> getSupportedAPIVersions() {
+                       return Collections.singleton(RestAPIVersion.V1);
+               }
+       }
+
+       private interface TestVersionSelectionHeadersBase extends 
MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+
+               @Override
+               default Class<EmptyRequestBody> getRequestClass() {
+                       return EmptyRequestBody.class;
+               }
+
+               @Override
+               default HttpMethodWrapper getHttpMethod() {
+                       return HttpMethodWrapper.GET;
+               }
+
+               @Override
+               default String getTargetRestEndpointURL() {
+                       return "/test/select-version";
+               }
+
+               @Override
+               default Class<EmptyResponseBody> getResponseClass() {
+                       return EmptyResponseBody.class;
+               }
+
+               @Override
+               default HttpResponseStatus getResponseStatusCode() {
+                       return HttpResponseStatus.OK;
+               }
+
+               @Override
+               default String getDescription() {
+                       return null;
+               }
+
+               @Override
+               default EmptyMessageParameters getUnresolvedMessageParameters() 
{
+                       return EmptyMessageParameters.getInstance();
+               }
+       }
+
+       private enum TestVersionSelectionHeaders1 implements 
TestVersionSelectionHeadersBase {
+               INSTANCE;
+
+               @Override
+               public Collection<RestAPIVersion> getSupportedAPIVersions() {
+                       return Collections.singleton(RestAPIVersion.V0);
+               }
+       }
+
+       private enum TestVersionSelectionHeaders2 implements 
TestVersionSelectionHeadersBase {
+               INSTANCE;
+
+               @Override
+               public Collection<RestAPIVersion> getSupportedAPIVersions() {
+                       return Collections.singleton(RestAPIVersion.V1);
+               }
+       }
+
+       private static class TestVersionSelectionHandler1 extends 
AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, 
EmptyMessageParameters> {
+
+               private TestVersionSelectionHandler1(
+                       final CompletableFuture<String> localRestAddress,
+                       final GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+                       final Time timeout) {
+                       super(localRestAddress, leaderRetriever, timeout, 
Collections.emptyMap(), TestVersionSelectionHeaders1.INSTANCE);
+               }
+
+               @Override
+               protected CompletableFuture<EmptyResponseBody> 
handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> 
request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+                       throw new RestHandlerException("test failure 1", 
HttpResponseStatus.OK);
+               }
+       }
+
+       private static class TestVersionSelectionHandler2 extends 
AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, 
EmptyMessageParameters> {
+
+               private TestVersionSelectionHandler2(
+                       final CompletableFuture<String> localRestAddress,
+                       final GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+                       final Time timeout) {
+                       super(localRestAddress, leaderRetriever, timeout, 
Collections.emptyMap(), TestVersionSelectionHeaders2.INSTANCE);
+               }
+
+               @Override
+               protected CompletableFuture<EmptyResponseBody> 
handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> 
request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
+                       throw new RestHandlerException("test failure 2", 
HttpResponseStatus.ACCEPTED);
+               }
+       }
+
        private enum TestUploadHeaders implements 
MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
                INSTANCE;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RestAPIVersionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RestAPIVersionTest.java
new file mode 100644
index 00000000000..4f60da1716e
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RestAPIVersionTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.versioning;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Tests for {@link RestAPIVersion}.
+ */
+public class RestAPIVersionTest extends TestLogger {
+       @Test
+       public void testGetLatest() {
+               Collection<RestAPIVersion> candidates = 
Arrays.asList(RestAPIVersion.V0, RestAPIVersion.V1);
+               Assert.assertEquals(RestAPIVersion.V1, 
RestAPIVersion.getLatestVersion(candidates));
+       }
+
+       @Test
+       public void testSingleDefaultVersion() {
+               final List<RestAPIVersion> defaultVersions = 
Arrays.stream(RestAPIVersion.values())
+                       .filter(RestAPIVersion::isDefaultVersion)
+                       .collect(Collectors.toList());
+
+               Assert.assertEquals(
+                       "Only one RestAPIVersion should be marked as the 
default. Defaults: " + defaultVersions,
+                       1,
+                       defaultVersions.size());
+       }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add VERSION to the REST urls. 
> ------------------------------
>
>                 Key: FLINK-7551
>                 URL: https://issues.apache.org/jira/browse/FLINK-7551
>             Project: Flink
>          Issue Type: Improvement
>          Components: REST
>    Affects Versions: 1.4.0
>            Reporter: Kostas Kloudas
>            Assignee: Chesnay Schepler
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> This is to guarantee that we can update the REST API without breaking 
> existing third-party clients.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to