Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4814#discussion_r144237816 --- Diff: flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -96,90 +113,158 @@ protected AbstractServerBase( final Integer bindPort, final Integer numEventLoopThreads, final Integer numQueryThreads) { + this( + serverName, + bindAddress, + Collections.singleton(bindPort).iterator(), + numEventLoopThreads, + numQueryThreads + ); + } + + /** + * Creates the {@link AbstractServerBase}. + * + * <p>The server needs to be started via {@link #start()}. + * + * @param serverName the name of the server + * @param bindAddress address to bind to + * @param bindPortIterator port to bind to + * @param numEventLoopThreads number of event loop threads + */ + protected AbstractServerBase( + final String serverName, + final InetAddress bindAddress, + final Iterator<Integer> bindPortIterator, + final Integer numEventLoopThreads, + final Integer numQueryThreads) { - Preconditions.checkNotNull(bindAddress); - Preconditions.checkArgument(bindPort >= 0 && bindPort <= 65536, "Port " + bindPort + " out of valid range (0-65536)."); + Preconditions.checkNotNull(bindPortIterator); Preconditions.checkArgument(numEventLoopThreads >= 1, "Non-positive number of event loop threads."); Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive number of query threads."); this.serverName = Preconditions.checkNotNull(serverName); - this.queryExecutor = createQueryExecutor(numQueryThreads); - - final NettyBufferPool bufferPool = new NettyBufferPool(numEventLoopThreads); - - final ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("Flink " + serverName + " EventLoop Thread %d") - .build(); - - final NioEventLoopGroup nioGroup = new NioEventLoopGroup(numEventLoopThreads, threadFactory); - - bootstrap = new ServerBootstrap() - // Bind address and port - .localAddress(bindAddress, bindPort) - // NIO server channels - .group(nioGroup) - .channel(NioServerSocketChannel.class) - // AbstractServerBase channel Options - .option(ChannelOption.ALLOCATOR, bufferPool) - // Child channel options - .childOption(ChannelOption.ALLOCATOR, bufferPool) - .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK) - .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK); + this.bindAddress = Preconditions.checkNotNull(bindAddress); + this.numEventLoopThreads = numEventLoopThreads; + this.numQueryThreads = numQueryThreads; + + this.bindPortRange = new HashSet<>(); + while (bindPortIterator.hasNext()) { + int port = bindPortIterator.next(); + Preconditions.checkArgument(port >= 0 && port <= 65535, + "Invalid port configuration. Port must be between 0 and 65535, but was " + port + "."); + bindPortRange.add(port); + } } /** * Creates a thread pool for the query execution. - * - * @param numQueryThreads Number of query threads. * @return Thread pool for query execution */ - private ExecutorService createQueryExecutor(int numQueryThreads) { + private ExecutorService createQueryExecutor() { ThreadFactory threadFactory = new ThreadFactoryBuilder() .setDaemon(true) .setNameFormat("Flink " + getServerName() + " Thread %d") .build(); - return Executors.newFixedThreadPool(numQueryThreads, threadFactory); } + /** + * Returns the thread-pool responsible for processing incoming requests. + */ protected ExecutorService getQueryExecutor() { return queryExecutor; } + /** + * Gets the name of the server. This is useful for debugging. + * @return The name of the server. + */ public String getServerName() { return serverName; } + /** + * Returns the {@link AbstractServerHandler handler} to be used for + * serving the incoming requests. + */ public abstract AbstractServerHandler<REQ, RESP> initializeHandler(); /** + * Returns the address of this server. + * + * @return AbstractServerBase address + * @throws IllegalStateException If server has not been started yet + */ + public KvStateServerAddress getServerAddress() { + Preconditions.checkState(serverAddress != null, "Server " + serverName + " has not been started."); + return serverAddress; + } + + /** * Starts the server by binding to the configured bind address (blocking). * @throws InterruptedException If interrupted during the bind operation */ public void start() throws InterruptedException { Preconditions.checkState(serverAddress == null, - "Server " + serverName + " has already been started @ " + serverAddress + '.'); - - this.handler = initializeHandler(); - bootstrap.childHandler(new ServerChannelInitializer<>(handler)); - - Channel channel = bootstrap.bind().sync().channel(); - InetSocketAddress localAddress = (InetSocketAddress) channel.localAddress(); - serverAddress = new KvStateServerAddress(localAddress.getAddress(), localAddress.getPort()); + "Server " + serverName + " already running @ " + serverAddress + '.'); + + Iterator<Integer> portIterator = bindPortRange.iterator(); + while (portIterator.hasNext() && serverAddress == null) { + final int port = portIterator.next(); + try { + attemptToBind(port); + } catch (Exception e) { --- End diff -- Is there a more specific exception that will be thrown if the port is taken? We could catch specifically that one in `attemptToBind()` and return true or false depending on whether binding was succesful.
---