Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4814#discussion_r144237816
  
    --- Diff: 
flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
    @@ -96,90 +113,158 @@ protected AbstractServerBase(
                        final Integer bindPort,
                        final Integer numEventLoopThreads,
                        final Integer numQueryThreads) {
    +           this(
    +                           serverName,
    +                           bindAddress,
    +                           Collections.singleton(bindPort).iterator(),
    +                           numEventLoopThreads,
    +                           numQueryThreads
    +           );
    +   }
    +
    +   /**
    +    * Creates the {@link AbstractServerBase}.
    +    *
    +    * <p>The server needs to be started via {@link #start()}.
    +    *
    +    * @param serverName the name of the server
    +    * @param bindAddress address to bind to
    +    * @param bindPortIterator port to bind to
    +    * @param numEventLoopThreads number of event loop threads
    +    */
    +   protected AbstractServerBase(
    +                   final String serverName,
    +                   final InetAddress bindAddress,
    +                   final Iterator<Integer> bindPortIterator,
    +                   final Integer numEventLoopThreads,
    +                   final Integer numQueryThreads) {
     
    -           Preconditions.checkNotNull(bindAddress);
    -           Preconditions.checkArgument(bindPort >= 0 && bindPort <= 65536, 
"Port " + bindPort + " out of valid range (0-65536).");
    +           Preconditions.checkNotNull(bindPortIterator);
                Preconditions.checkArgument(numEventLoopThreads >= 1, 
"Non-positive number of event loop threads.");
                Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive 
number of query threads.");
     
                this.serverName = Preconditions.checkNotNull(serverName);
    -           this.queryExecutor = createQueryExecutor(numQueryThreads);
    -
    -           final NettyBufferPool bufferPool = new 
NettyBufferPool(numEventLoopThreads);
    -
    -           final ThreadFactory threadFactory = new ThreadFactoryBuilder()
    -                           .setDaemon(true)
    -                           .setNameFormat("Flink " + serverName + " 
EventLoop Thread %d")
    -                           .build();
    -
    -           final NioEventLoopGroup nioGroup = new 
NioEventLoopGroup(numEventLoopThreads, threadFactory);
    -
    -           bootstrap = new ServerBootstrap()
    -                           // Bind address and port
    -                           .localAddress(bindAddress, bindPort)
    -                           // NIO server channels
    -                           .group(nioGroup)
    -                           .channel(NioServerSocketChannel.class)
    -                           // AbstractServerBase channel Options
    -                           .option(ChannelOption.ALLOCATOR, bufferPool)
    -                           // Child channel options
    -                           .childOption(ChannelOption.ALLOCATOR, 
bufferPool)
    -                           
.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK)
    -                           
.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK);
    +           this.bindAddress = Preconditions.checkNotNull(bindAddress);
    +           this.numEventLoopThreads = numEventLoopThreads;
    +           this.numQueryThreads = numQueryThreads;
    +
    +           this.bindPortRange = new HashSet<>();
    +           while (bindPortIterator.hasNext()) {
    +                   int port = bindPortIterator.next();
    +                   Preconditions.checkArgument(port >= 0 && port <= 65535,
    +                                   "Invalid port configuration. Port must 
be between 0 and 65535, but was " + port + ".");
    +                   bindPortRange.add(port);
    +           }
        }
     
        /**
         * Creates a thread pool for the query execution.
    -    *
    -    * @param numQueryThreads Number of query threads.
         * @return Thread pool for query execution
         */
    -   private ExecutorService createQueryExecutor(int numQueryThreads) {
    +   private ExecutorService createQueryExecutor() {
                ThreadFactory threadFactory = new ThreadFactoryBuilder()
                                .setDaemon(true)
                                .setNameFormat("Flink " + getServerName() + " 
Thread %d")
                                .build();
    -
                return Executors.newFixedThreadPool(numQueryThreads, 
threadFactory);
        }
     
    +   /**
    +    * Returns the thread-pool responsible for processing incoming requests.
    +    */
        protected ExecutorService getQueryExecutor() {
                return queryExecutor;
        }
     
    +   /**
    +    * Gets the name of the server. This is useful for debugging.
    +    * @return The name of the server.
    +    */
        public String getServerName() {
                return serverName;
        }
     
    +   /**
    +    * Returns the {@link AbstractServerHandler handler} to be used for
    +    * serving the incoming requests.
    +    */
        public abstract AbstractServerHandler<REQ, RESP> initializeHandler();
     
        /**
    +    * Returns the address of this server.
    +    *
    +    * @return AbstractServerBase address
    +    * @throws IllegalStateException If server has not been started yet
    +    */
    +   public KvStateServerAddress getServerAddress() {
    +           Preconditions.checkState(serverAddress != null, "Server " + 
serverName + " has not been started.");
    +           return serverAddress;
    +   }
    +
    +   /**
         * Starts the server by binding to the configured bind address 
(blocking).
         * @throws InterruptedException If interrupted during the bind operation
         */
        public void start() throws InterruptedException {
                Preconditions.checkState(serverAddress == null,
    -                           "Server " + serverName + " has already been 
started @ " + serverAddress + '.');
    -
    -           this.handler = initializeHandler();
    -           bootstrap.childHandler(new ServerChannelInitializer<>(handler));
    -
    -           Channel channel = bootstrap.bind().sync().channel();
    -           InetSocketAddress localAddress = (InetSocketAddress) 
channel.localAddress();
    -           serverAddress = new 
KvStateServerAddress(localAddress.getAddress(), localAddress.getPort());
    +                           "Server " + serverName + " already running @ " 
+ serverAddress + '.');
    +
    +           Iterator<Integer> portIterator = bindPortRange.iterator();
    +           while (portIterator.hasNext() && serverAddress == null) {
    +                   final int port = portIterator.next();
    +                   try {
    +                           attemptToBind(port);
    +                   } catch (Exception e) {
    --- End diff --
    
    Is there a more specific exception that will be thrown if the port is 
taken? We could catch specifically that one in `attemptToBind()` and return 
true or false depending on whether binding was succesful.


---

Reply via email to