mturk       2005/04/18 06:57:12

  Modified:    util/java/org/apache/tomcat/util/net AprEndpoint.java
  Log:
  Fix pool usage.
  Use indexed descriptors for obtaining poll params and data.
  It spares 3 JNI calls for each polled socket.
  
  Revision  Changes    Path
  1.8       +83 -91    
jakarta-tomcat-connectors/util/java/org/apache/tomcat/util/net/AprEndpoint.java
  
  Index: AprEndpoint.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-connectors/util/java/org/apache/tomcat/util/net/AprEndpoint.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- AprEndpoint.java  15 Apr 2005 17:19:52 -0000      1.7
  +++ AprEndpoint.java  18 Apr 2005 13:57:12 -0000      1.8
  @@ -53,7 +53,7 @@
   
       protected static Log log = LogFactory.getLog(AprEndpoint.class);
   
  -    protected static StringManager sm = 
  +    protected static StringManager sm =
           StringManager.getManager("org.apache.tomcat.util.net.res");
   
   
  @@ -70,27 +70,27 @@
        * The acceptor thread.
        */
       protected Thread acceptorThread = null;
  -    
  -    
  +
  +
       /**
        * The socket poller.
        */
       protected Poller poller = null;
  -    
  -    
  +
  +
       /**
        * The socket poller thread.
        */
       protected Thread pollerThread = null;
  -    
  -    
  +
  +
       /**
        * The sendfile thread.
        */
       // FIXME: Add senfile support
       protected Thread sendfileThread = null;
  -    
  -    
  +
  +
       /**
        * Available processors.
        */
  @@ -108,14 +108,14 @@
        * Running state of the endpoint.
        */
       protected volatile boolean running = false;
  -    
  -    
  +
  +
       /**
        * Will be set to true whenever the endpoint is paused.
        */
       protected volatile boolean paused = false;
  -    
  -    
  +
  +
       /**
        * Track the initialization state of the endpoint.
        */
  @@ -126,14 +126,14 @@
        * Current worker threads busy count.
        */
       protected int curThreadsBusy = 0;
  -    
  +
   
       /**
        * Current worker threads count.
        */
       protected int curThreads = 0;
  -    
  -    
  +
  +
       /**
        * Sequence number used to generate thread names.
        */
  @@ -144,14 +144,14 @@
        * Root APR memory pool.
        */
       protected long rootPool = 0;
  -    
  -    
  +
  +
       /**
        * Server socket "pointer".
        */
       protected long serverSock = 0;
  -    
  -    
  +
  +
       /**
        * APR memory pool for the server socket.
        */
  @@ -167,8 +167,8 @@
       protected int maxThreads = 20;
       public void setMaxThreads(int maxThreads) { this.maxThreads = 
maxThreads; }
       public int getMaxThreads() { return maxThreads; }
  -    
  -    
  +
  +
       /**
        * Priority of the acceptor and poller threads.
        */
  @@ -192,7 +192,7 @@
       public int getPort() { return port; }
       public void setPort(int port ) { this.port=port; }
   
  -    
  +
       /**
        * Address for the server socket.
        */
  @@ -200,7 +200,7 @@
       public InetAddress getAddress() { return address; }
       public void setAddress(InetAddress address) { this.address = address; }
   
  -    
  +
       /**
        * Handling of accepted sockets.
        */
  @@ -260,7 +260,7 @@
       public void setPollTime(int pollTime) { this.pollTime = pollTime; }
   
   
  -    /** 
  +    /**
        * The default is true - the created threads will be
        *  in daemon mode. If set to false, the control thread
        *  will not be daemon - and will keep the process alive.
  @@ -277,15 +277,15 @@
       public void setName(String name) { this.name = name; }
       public String getName() { return name; }
   
  -    
  +
       /**
        * Number of keepalive sockets.
        */
       protected int keepAliveCount = 0;
       public int getKeepAliveCount() { return keepAliveCount; }
       public void setKeepAliveCount(int keepAliveCount) { this.keepAliveCount 
= keepAliveCount; }
  -    
  -    
  +
  +
       /**
        * Dummy maxSpareThreads property.
        */
  @@ -303,9 +303,9 @@
   
       /**
        * Return the APR memory pool for the server socket, to be used by 
handler
  -     * which would need to allocate things like pollers, while having 
  +     * which would need to allocate things like pollers, while having
        * consistent resource handling.
  -     * 
  +     *
        * @return the id for the server socket pool
        */
       public long getServerSocketPool() {
  @@ -315,7 +315,7 @@
   
       /**
        * Return the amount of threads that are managed by the pool.
  -     * 
  +     *
        * @return the amount of threads that are managed by the pool
        */
       public int getCurrentThreadCount() {
  @@ -325,17 +325,17 @@
   
       /**
        * Return the amount of threads currently busy.
  -     * 
  +     *
        * @return the amount of threads currently busy
        */
       public int getCurrentThreadsBusy() {
           return curThreadsBusy;
       }
  -    
  +
   
       /**
        * Return the state of the endpoint.
  -     * 
  +     *
        * @return true if the endpoint is running, false otherwise
        */
       public boolean isRunning() {
  @@ -345,14 +345,14 @@
   
       /**
        * Return the state of the endpoint.
  -     * 
  +     *
        * @return true if the endpoint is paused, false otherwise
        */
       public boolean isPaused() {
           return paused;
       }
   
  -    
  +
       // ----------------------------------------------- Public Lifecycle 
Methods
   
   
  @@ -360,10 +360,10 @@
        * Initialize the endpoint.
        */
       public void init() throws Exception {
  -        
  +
           if (initialized)
               return;
  -        
  +
           try {
               // Initialize APR
               Library.initialize(null);
  @@ -379,10 +379,10 @@
                   addressStr = "" + address;
               }
               long inetAddress = Address.info(addressStr, Socket.APR_INET,
  -                                       port, 0, serverSockPool);
  +                                       port, 0, rootPool);
               // Create the APR server socket
               serverSock = Socket.create(Socket.APR_INET, Socket.SOCK_STREAM,
  -                                       Socket.APR_PROTO_TCP, serverSockPool);
  +                                       Socket.APR_PROTO_TCP, rootPool);
               // Bind the server socket
               Socket.bind(serverSock, inetAddress);
               // Start listening on the server socket
  @@ -392,7 +392,7 @@
               throw e;
           }
           initialized = true;
  -        
  +
       }
   
   
  @@ -417,7 +417,7 @@
               pollerThread.setPriority(getThreadPriority());
               pollerThread.setDaemon(true);
               pollerThread.start();
  -            
  +
               // Start sendfile thread
               // FIXME: Implement sendfile support
           }
  @@ -451,14 +451,17 @@
           if (running) {
               stop();
           }
  +        Pool.destroy(serverSockPool);
  +        serverSockPool = 0;
           // Close server socket
           Socket.close(serverSock);
           // Close all APR memory pools and resources
           Pool.destroy(rootPool);
  +        rootPool = 0;
           initialized = false ;
       }
   
  -    
  +
       // ------------------------------------------------------ Protected 
Methods
   
   
  @@ -500,14 +503,14 @@
           }
       }
   
  -    
  +
       /**
        * Set options on a newly accepted socket.
  -     * 
  +     *
        * @param socket "pointer" to the accepted socket
        */
       protected void setSocketOptions(long socket) {
  -        if (soLinger >= 0) 
  +        if (soLinger >= 0)
               Socket.optSet(socket, Socket.APR_SO_LINGER, soLinger);
           if (tcpNoDelay)
               Socket.optSet(socket, Socket.APR_TCP_NODELAY, (tcpNoDelay ? 1 : 
0));
  @@ -515,16 +518,16 @@
               Socket.timeoutSet(socket, soTimeout);
       }
   
  -    
  +
       protected boolean processSocket(long s) {
           // Process the connection
           int step = 1;
           boolean result = true;
           try {
  -            
  +
               // 1: Set socket options: timeout, linger, etc
               setSocketOptions(s);
  -            
  +
               // 2: SSL handshake
               step = 2;
               // FIXME: SSL implementation so that Bill is happy
  @@ -533,11 +536,11 @@
                   getServerSocketFactory().handshake(s);
               }
               */
  -            
  +
               // 3: Process the connection
               step = 3;
               result = getHandler().process(s);
  -            
  +
           } catch (Throwable t) {
               if (step == 2) {
                   if (log.isDebugEnabled()) {
  @@ -584,7 +587,7 @@
   
       }
   
  -    
  +
       /**
        * Create and return a new processor suitable for processing HTTP
        * requests and returning the corresponding responses.
  @@ -598,7 +601,7 @@
   
       }
   
  -    
  +
       /**
        * Return a new worker thread, and block while to worker is available.
        */
  @@ -607,7 +610,7 @@
           Worker workerThread = createWorkerThread();
           while (workerThread == null) {
               try {
  -                // Wait a little for load to go down: as a result, 
  +                // Wait a little for load to go down: as a result,
                   // no accept will be made until the concurrency is
                   // lower than the specified maxThreads, and current
                   // connections will wait for a little bit instead of
  @@ -620,7 +623,7 @@
           }
           return workerThread;
       }
  -    
  +
   
       /**
        * Recycle the specified Processor so that it can be used again.
  @@ -642,8 +645,8 @@
        * Server socket acceptor thread.
        */
       protected class Acceptor implements Runnable {
  -        
  -        
  +
  +
           /**
            * The background thread that listens for incoming TCP/IP 
connections and
            * hands them off to an appropriate processor.
  @@ -664,7 +667,7 @@
   
                   // Allocate a new worker thread
                   Worker workerThread = getWorkerThread();
  -                
  +
                   // Accept the next incoming connection from the server socket
                   long socket = 0;
                   long pool = 0;
  @@ -697,17 +700,14 @@
   
       /**
        * Poller class.
  -     * 
  +     *
        * FIXME: Windows support using 64 sized pollers
        */
       protected class Poller implements Runnable {
  -        
  +
           protected long serverPollset = 0;
           protected long pool = 0;
           protected long[] desc;
  -        protected long[] sockets;
  -        protected long[] events;
  -        protected long[] pools;
   
           public Poller(int size) {
               pool = Pool.create(serverSockPool);
  @@ -717,26 +717,23 @@
                   // FIXME: more appropriate logging
                   e.printStackTrace();
               }
  -            desc = new long[size];
  -            sockets = new long[size];
  -            events = new long[size];
  -            pools = new long[size];
  +            desc = new long[size * 4];
           }
  -        
  +
           public synchronized void add(long socket, long pool) {
               int rv = Poll.add(serverPollset, socket, pool, Poll.APR_POLLIN);
               if (rv == Status.APR_SUCCESS) {
                   keepAliveCount++;
               }
           }
  -        
  +
           public void remove(long socket) {
               int rv = Poll.remove(serverPollset, socket);
               if (rv == Status.APR_SUCCESS) {
                   keepAliveCount--;
               }
           }
  -        
  +
           /**
            * The background thread that listens for incoming TCP/IP 
connections and
            * hands them off to an appropriate processor.
  @@ -767,34 +764,28 @@
                       // Pool for the specified interval
                       int rv = Poll.poll(serverPollset, pollTime, desc);
                       if (rv > 0) {
  -                        synchronized (this) {
  -                            for (int n = 0; n < rv; n++) {
  -                                sockets[n] = Poll.socket(desc[n]);
  -                                // Get the socket pool
  -                                pools[n] = Poll.data(desc[n]);
  -                                // Get retuned events for this socket
  -                                events[n] = Poll.events(desc[n]);
  -                                // Remove each socket from the poll right 
away
  -                                remove(sockets[n]);
  -                            }
  -                        }
                           for (int n = 0; n < rv; n++) {
  -                            // Problem events
  -                            if (((events[n] & Poll.APR_POLLHUP) == 
Poll.APR_POLLHUP)
  -                                    || ((events[n] & Poll.APR_POLLERR) == 
Poll.APR_POLLERR)) {
  +                            // Remove the socket from the pollset
  +                            remove(desc[n*4+1]);
  +                            // Check for failed sockets
  +                            if (((desc[n*4] & Poll.APR_POLLHUP) == 
Poll.APR_POLLHUP)
  +                                    || ((desc[n*4] & Poll.APR_POLLERR) == 
Poll.APR_POLLERR)) {
                                   // Close socket and clear pool
  -                                Pool.destroy(pools[n]);
  +                                Pool.destroy(desc[n*4+2]);
                                   continue;
                               }
                               // Hand this socket off to a worker
  -                            getWorkerThread().assign(sockets[n], pools[n]);
  +                            getWorkerThread().assign(desc[n*4+1], 
desc[n*4+2]);
                           }
                       }
  +                    else if (rv < 0) {
  +                        // TODO: Poll is probably unusable. So it should 
bail out.
  +                    }
                   } catch (Throwable t) {
                       // FIXME: Proper logging
                       t.printStackTrace();
                   }
  -                
  +
               }
   
               // Notify the threadStop() method that we have shut ourselves 
down
  @@ -806,7 +797,7 @@
   
       }
   
  -    
  +
       // ----------------------------------------------------- Worker Inner 
Class
   
   
  @@ -895,6 +886,7 @@
                   } else {
                       // Close socket and pool
                       Pool.destroy(pool);
  +                    pool = 0;
                   }
   
                   // Finish up this request
  @@ -929,12 +921,12 @@
   
       /**
        * Bare bones interface used for socket processing. Per thread data is 
to be
  -     * stored in the ThreadWithAttributes extra folders, or alternately in 
  +     * stored in the ThreadWithAttributes extra folders, or alternately in
        * thread local fields.
        */
       public interface Handler {
           public boolean process(long socket);
       }
  -    
  -    
  +
  +
   }
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to