    @@ -0,0 +1,304 @@
    +package org.apache.flink.mesos.util;
    +import io.netty.bootstrap.ServerBootstrap;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.Channel;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandler;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.ChannelInitializer;
    +import io.netty.channel.DefaultFileRegion;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import io.netty.channel.nio.NioEventLoopGroup;
    +import io.netty.channel.socket.SocketChannel;
    +import io.netty.channel.socket.nio.NioServerSocketChannel;
    +import io.netty.handler.codec.http.DefaultFullHttpResponse;
    +import io.netty.handler.codec.http.DefaultHttpResponse;
    +import io.netty.handler.codec.http.FullHttpResponse;
    +import io.netty.handler.codec.http.HttpHeaders;
    +import io.netty.handler.codec.http.HttpRequest;
    +import io.netty.handler.codec.http.HttpResponse;
    +import io.netty.handler.codec.http.HttpResponseStatus;
    +import io.netty.handler.codec.http.HttpServerCodec;
    +import io.netty.handler.codec.http.LastHttpContent;
    +import io.netty.handler.codec.http.router.Handler;
    +import io.netty.handler.codec.http.router.Routed;
    +import io.netty.handler.codec.http.router.Router;
    +import io.netty.util.CharsetUtil;
    +import org.jets3t.service.utils.Mimetypes;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import java.io.File;
    +import java.io.FileNotFoundException;
    +import java.io.RandomAccessFile;
    +import java.net.InetSocketAddress;
    +import java.net.MalformedURLException;
    +import java.net.URL;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
    +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    +import static io.netty.handler.codec.http.HttpMethod.GET;
    +import static io.netty.handler.codec.http.HttpMethod.HEAD;
    +import static io.netty.handler.codec.http.HttpResponseStatus.GONE;
    +import static 
    +import static 
    +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
    +import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    + * A generic Mesos artifact server, designed specifically for use by the 
Mesos Fetcher.
    + *
    + * More information:
    + * http://mesos.apache.org/documentation/latest/fetcher/
    + * http://mesos.apache.org/documentation/latest/fetcher-cache-internals/
    + */
    +public class MesosArtifactServer {
    +   private static final Logger LOG = 
    +   private final Router router;
    +   private ServerBootstrap bootstrap;
    +   private Channel serverChannel;
    +   private URL baseURL;
    +   public MesosArtifactServer(String sessionID, String serverHostname, int 
configuredPort) throws Exception {
    +           if (configuredPort < 0 || configuredPort > 0xFFFF) {
    +                   throw new IllegalArgumentException("File server port is 
invalid: " + configuredPort);
    +           }
    +           router = new Router();
    +           ChannelInitializer<SocketChannel> initializer = new 
ChannelInitializer<SocketChannel>() {
    +                   @Override
    +                   protected void initChannel(SocketChannel ch) {
    +                           Handler handler = new Handler(router);
    +                           ch.pipeline()
    +                                   .addLast(new HttpServerCodec())
    +                                   .addLast(handler.name(), handler)
    +                                   .addLast(new UnknownFileHandler());
    +                   }
    +           };
    +           NioEventLoopGroup bossGroup   = new NioEventLoopGroup(1);
    +           NioEventLoopGroup workerGroup = new NioEventLoopGroup();
    +           this.bootstrap = new ServerBootstrap();
    +           this.bootstrap
    +                   .group(bossGroup, workerGroup)
    +                   .channel(NioServerSocketChannel.class)
    +                   .childHandler(initializer);
    +           Channel ch = this.bootstrap.bind(serverHostname, 
    +           this.serverChannel = ch;
    +           InetSocketAddress bindAddress = (InetSocketAddress) 
    +           String address = bindAddress.getAddress().getHostAddress();
    +           int port = bindAddress.getPort();
    +           baseURL = new URL("http", serverHostname, port, "/" + sessionID 
+ "/");
    +           LOG.info("Mesos artifact server listening at " + address + ':' 
+ port);
    +   }
    +   /**
    +    * Get the server port on which the artifact server is listening.
    +     */
    +   public synchronized int getServerPort() {
    +           Channel server = this.serverChannel;
    +           if (server != null) {
    +                   try {
    +                           return ((InetSocketAddress) 
    +                   } catch (Exception e) {
    +                           LOG.error("Cannot access local server port", e);
    +                   }
    +           }
    +           return -1;
    +   }
    +   /**
    +    * Adds a file to the artifact server.
    +    * @param localFile the local file to serve.
    +    * @param remoteFile the remote path with which to locate the file.
    +    * @return the fully-qualified remote path to the file.
    +    * @throws MalformedURLException if the remote path is invalid.
    +     */
    +   public synchronized URL addFile(File localFile, String remoteFile) 
throws MalformedURLException {
    +           URL fileURL = new URL(baseURL, remoteFile);
    +           router.ANY(fileURL.getPath(), new 
    +           return fileURL;
    +   }
    +   /**
    +    * Stops the artifact server.
    +    * @throws Exception
    +     */
    +   public synchronized void stop() throws Exception {
    +           if (this.serverChannel != null) {
    +                   this.serverChannel.close().awaitUninterruptibly();
    +                   this.serverChannel = null;
    +           }
    +           if (bootstrap != null) {
    +                   if (bootstrap.group() != null) {
    +                           bootstrap.group().shutdownGracefully();
    +                   }
    +                   bootstrap = null;
    +           }
    +   }
    +   /**
    +    * Handle HEAD and GET requests for a specific file.
    +    */
    +   @ChannelHandler.Sharable
    +   public static class VirtualFileServerHandler extends 
SimpleChannelInboundHandler<Routed> {
    +           private File file;
> Integrate Flink with Apache Mesos
> ---------------------------------
>                 Key: FLINK-1984
>                 URL: https://issues.apache.org/jira/browse/FLINK-1984
>             Project: Flink
>          Issue Type: New Feature
>          Components: Cluster Management
>            Reporter: Robert Metzger
>            Assignee: Eron Wright 
>            Priority: Minor
>         Attachments: 251.patch
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-: 
> https://github.com/apache/flink/pull/251
> Update (May '16):  a new effort is now underway, building on the recent 
> ResourceManager work.
> Design document:  ([google 
> doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])

