Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/4767#discussion_r142712091 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java --- @@ -325,6 +406,98 @@ public TestParameters getUnresolvedMessageParameters() { } } + private static class TestWebSocketOperation { + + private static class WsParameters extends MessageParameters { + private final JobIDPathParameter jobIDPathParameter = new JobIDPathParameter(); + + @Override + public Collection<MessagePathParameter<?>> getPathParameters() { + return Collections.singleton(jobIDPathParameter); + } + + @Override + public Collection<MessageQueryParameter<?>> getQueryParameters() { + return Collections.emptyList(); + } + } + + static class WsHeaders implements MessageHeaders<EmptyRequestBody, WebSocketUpgradeResponseBody, WsParameters> { + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return "/test/:jobid/subscribe"; + } + + @Override + public Class<EmptyRequestBody> getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public Class<WebSocketUpgradeResponseBody> getResponseClass() { + return WebSocketUpgradeResponseBody.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public WsParameters getUnresolvedMessageParameters() { + return new WsParameters(); + } + } + + static class WsRestHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, WebSocketUpgradeResponseBody, WsParameters> { + + private final TestEventProvider eventProvider; + + WsRestHandler( + CompletableFuture<String> localAddressFuture, + GatewayRetriever<RestfulGateway> leaderRetriever, + TestEventProvider eventProvider, + Time timeout) { + super(localAddressFuture, leaderRetriever, timeout, new WsHeaders()); + this.eventProvider = eventProvider; + } + + @Override + protected CompletableFuture<WebSocketUpgradeResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, WsParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException { + JobID jobID = request.getPathParameter(JobIDPathParameter.class); + Assert.assertEquals(PATH_JOB_ID, jobID); + ChannelHandler messageHandler = new WsMessageHandler(eventProvider, jobID); --- End diff -- The main value of `AbstractRestHandler` in this scenario is in decoding the HTTP request into a `HandlerRequest`. By factoring that code into a `MessageToMessageDecoder` we could reuse it and avoid the need for `AbstractRestHandler` in this scenario. In other words, the 'Netty way' would be to use a pipeline of handlers, which is more flexible than an inheritance hierarchy in my opinion. Normal operation: `[HttpCodec] -> [RestRequestDecoder] -> [RestHandler]` WebSocket operation: `[HttpCodec] -> [RestRequestDecoder] -> [WebSocketHandler]` We could go further by encapsulating each operation in a handler that simply adds the appropriate child handlers, similar to how `HttpCodec` simply adds an encoder and decoder to the pipeline. WDYT?
---