[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15424547#comment-15424547 ]
ASF GitHub Bot commented on FLINK-1984: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75126311 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java --- @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.util; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.router.Handler; +import io.netty.handler.codec.http.router.Routed; +import io.netty.handler.codec.http.router.Router; +import io.netty.util.CharsetUtil; +import org.jets3t.service.utils.Mimetypes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.RandomAccessFile; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.URL; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpMethod.GET; +import static io.netty.handler.codec.http.HttpMethod.HEAD; +import static io.netty.handler.codec.http.HttpResponseStatus.GONE; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + + +/** + * A generic Mesos artifact server, designed specifically for use by the Mesos Fetcher. + * + * More information: + * http://mesos.apache.org/documentation/latest/fetcher/ + * http://mesos.apache.org/documentation/latest/fetcher-cache-internals/ + */ +public class MesosArtifactServer { + + private static final Logger LOG = LoggerFactory.getLogger(MesosArtifactServer.class); + + private final Router router; + + private ServerBootstrap bootstrap; + + private Channel serverChannel; + + private URL baseURL; + + public MesosArtifactServer(String sessionID, String serverHostname, int configuredPort) throws Exception { + if (configuredPort < 0 || configuredPort > 0xFFFF) { + throw new IllegalArgumentException("File server port is invalid: " + configuredPort); + } + + router = new Router(); + + ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() { + + @Override + protected void initChannel(SocketChannel ch) { + Handler handler = new Handler(router); + + ch.pipeline() + .addLast(new HttpServerCodec()) + .addLast(handler.name(), handler) + .addLast(new UnknownFileHandler()); + } + }; + + NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); + NioEventLoopGroup workerGroup = new NioEventLoopGroup(); + + this.bootstrap = new ServerBootstrap(); + this.bootstrap + .group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(initializer); + + Channel ch = this.bootstrap.bind(serverHostname, configuredPort).sync().channel(); + this.serverChannel = ch; + + InetSocketAddress bindAddress = (InetSocketAddress) ch.localAddress(); + String address = bindAddress.getAddress().getHostAddress(); + int port = bindAddress.getPort(); + + baseURL = new URL("http", serverHostname, port, "/" + sessionID + "/"); + + LOG.info("Mesos artifact server listening at " + address + ':' + port); + } + + /** + * Get the server port on which the artifact server is listening. + */ + public synchronized int getServerPort() { + Channel server = this.serverChannel; + if (server != null) { + try { + return ((InetSocketAddress) server.localAddress()).getPort(); + } catch (Exception e) { + LOG.error("Cannot access local server port", e); + } + } + return -1; + } + + /** + * Adds a file to the artifact server. + * @param localFile the local file to serve. + * @param remoteFile the remote path with which to locate the file. + * @return the fully-qualified remote path to the file. + * @throws MalformedURLException if the remote path is invalid. + */ + public synchronized URL addFile(File localFile, String remoteFile) throws MalformedURLException { + URL fileURL = new URL(baseURL, remoteFile); + router.ANY(fileURL.getPath(), new VirtualFileServerHandler(localFile)); + return fileURL; + } + + /** + * Stops the artifact server. + * @throws Exception + */ + public synchronized void stop() throws Exception { + if (this.serverChannel != null) { + this.serverChannel.close().awaitUninterruptibly(); + this.serverChannel = null; + } + if (bootstrap != null) { + if (bootstrap.group() != null) { + bootstrap.group().shutdownGracefully(); + } + bootstrap = null; + } + } + + /** + * Handle HEAD and GET requests for a specific file. + */ + @ChannelHandler.Sharable + public static class VirtualFileServerHandler extends SimpleChannelInboundHandler<Routed> { + + private File file; --- End diff -- final > Integrate Flink with Apache Mesos > --------------------------------- > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management > Reporter: Robert Metzger > Assignee: Eron Wright > Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)