Github user EronWright commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4767#discussion_r142712091
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
 ---
    @@ -325,6 +406,98 @@ public TestParameters getUnresolvedMessageParameters() 
{
                }
        }
     
    +   private static class TestWebSocketOperation {
    +
    +           private static class WsParameters extends MessageParameters {
    +                   private final JobIDPathParameter jobIDPathParameter = 
new JobIDPathParameter();
    +
    +                   @Override
    +                   public Collection<MessagePathParameter<?>> 
getPathParameters() {
    +                           return 
Collections.singleton(jobIDPathParameter);
    +                   }
    +
    +                   @Override
    +                   public Collection<MessageQueryParameter<?>> 
getQueryParameters() {
    +                           return Collections.emptyList();
    +                   }
    +           }
    +
    +           static class WsHeaders implements 
MessageHeaders<EmptyRequestBody, WebSocketUpgradeResponseBody, WsParameters> {
    +
    +                   @Override
    +                   public HttpMethodWrapper getHttpMethod() {
    +                           return HttpMethodWrapper.GET;
    +                   }
    +
    +                   @Override
    +                   public String getTargetRestEndpointURL() {
    +                           return "/test/:jobid/subscribe";
    +                   }
    +
    +                   @Override
    +                   public Class<EmptyRequestBody> getRequestClass() {
    +                           return EmptyRequestBody.class;
    +                   }
    +
    +                   @Override
    +                   public Class<WebSocketUpgradeResponseBody> 
getResponseClass() {
    +                           return WebSocketUpgradeResponseBody.class;
    +                   }
    +
    +                   @Override
    +                   public HttpResponseStatus getResponseStatusCode() {
    +                           return HttpResponseStatus.OK;
    +                   }
    +
    +                   @Override
    +                   public WsParameters getUnresolvedMessageParameters() {
    +                           return new WsParameters();
    +                   }
    +           }
    +
    +           static class WsRestHandler extends 
AbstractRestHandler<RestfulGateway, EmptyRequestBody, 
WebSocketUpgradeResponseBody, WsParameters> {
    +
    +                   private final TestEventProvider eventProvider;
    +
    +                   WsRestHandler(
    +                           CompletableFuture<String> localAddressFuture,
    +                           GatewayRetriever<RestfulGateway> 
leaderRetriever,
    +                           TestEventProvider eventProvider,
    +                           Time timeout) {
    +                           super(localAddressFuture, leaderRetriever, 
timeout, new WsHeaders());
    +                           this.eventProvider = eventProvider;
    +                   }
    +
    +                   @Override
    +                   protected 
CompletableFuture<WebSocketUpgradeResponseBody> handleRequest(@Nonnull 
HandlerRequest<EmptyRequestBody, WsParameters> request, @Nonnull RestfulGateway 
gateway) throws RestHandlerException {
    +                           JobID jobID = 
request.getPathParameter(JobIDPathParameter.class);
    +                           Assert.assertEquals(PATH_JOB_ID, jobID);
    +                           ChannelHandler messageHandler = new 
WsMessageHandler(eventProvider, jobID);
    --- End diff --
    
    The main value of `AbstractRestHandler` in this scenario is in decoding the 
HTTP request into a `HandlerRequest`.   By factoring that code into a 
`MessageToMessageDecoder` we could reuse it and avoid the need for 
`AbstractRestHandler` in this scenario.
    
    In other words, the 'Netty way' would be to use a pipeline of handlers, 
which is more flexible than an inheritance hierarchy in my opinion.
    
    Normal operation: `[HttpCodec] -> [RestRequestDecoder] -> [RestHandler]`
    WebSocket operation: `[HttpCodec] -> [RestRequestDecoder] -> 
[WebSocketHandler]`
    
    We could go further by encapsulating each operation in a handler that 
simply adds the appropriate child handlers, similar to how `HttpCodec` simply 
adds an encoder and decoder to the pipeline.   WDYT?
    



---

Reply via email to