remm        2005/04/15 08:49:33

  Modified:    util/java/org/apache/tomcat/util/net AprEndpoint.java
               http11/src/java/org/apache/coyote/http11
                        Http11AprProcessor.java InternalAprInputBuffer.java
  Log:
  - Add configuration flags.
  - Better sync for the poller.
  - There's a problem with the poller in this commit under investigation, 
though.
  
  Revision  Changes    Path
  1.4       +72 -46    
jakarta-tomcat-connectors/util/java/org/apache/tomcat/util/net/AprEndpoint.java
  
  Index: AprEndpoint.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-connectors/util/java/org/apache/tomcat/util/net/AprEndpoint.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- AprEndpoint.java  15 Apr 2005 09:23:37 -0000      1.3
  +++ AprEndpoint.java  15 Apr 2005 15:49:32 -0000      1.4
  @@ -23,6 +23,7 @@
   import org.apache.commons.logging.Log;
   import org.apache.commons.logging.LogFactory;
   import org.apache.tomcat.jni.Address;
  +import org.apache.tomcat.jni.Error;
   import org.apache.tomcat.jni.Library;
   import org.apache.tomcat.jni.Poll;
   import org.apache.tomcat.jni.Pool;
  @@ -242,6 +243,23 @@
       public void setSoTimeout(int soTimeout) { this.soTimeout = soTimeout; }
   
   
  +    /**
  +     * Timeout on first request read before going to the poller, in ms.
  +     */
  +    protected int firstReadPollerTimeout = 100;
  +    public int getFirstReadPollerTimeout() { return firstReadPollerTimeout; }
  +    public void setFirstReadPollerTimeout(int firstReadPollerTimeout) { 
this.firstReadPollerTimeout = firstReadPollerTimeout; }
  +
  +
  +    /**
  +     * Poll interval, in microseconds. The smaller the value, the more CPU 
the poller
  +     * will use, but the more responsive to activity it will be.
  +     */
  +    protected int pollTime = 100000;
  +    public int getPollTime() { return pollTime; }
  +    public void setPollTime(int pollTime) { this.pollTime = pollTime; }
  +
  +
       /** 
        * The default is true - the created threads will be
        *  in daemon mode. If set to false, the control thread
  @@ -356,7 +374,7 @@
               // Create the APR address that will be bound
               String addressStr = null;
               if (address == null) {
  -                addressStr = "0.0.0.0";
  +                addressStr = null;
               } else {
                   addressStr = "" + address;
               }
  @@ -677,16 +695,18 @@
           protected long serverPollset = 0;
           protected long pool = 0;
           protected long[] desc;
  +        protected long[] sockets;
   
           public Poller(int size) {
  +            pool = Pool.create(serverSockPool);
               try {
  -                pool = Pool.create(serverSockPool);
                   serverPollset = Poll.create(size, pool, 0, soTimeout * 1000);
  -                desc = new long[size];
  -            } catch( Exception ex ) {
  +            } catch (Error e) {
                   // FIXME: more appropriate logging
  -                ex.printStackTrace();
  +                e.printStackTrace();
               }
  +            desc = new long[size];
  +            sockets = new long[size];
           }
           
           public synchronized void add(long socket, long pool) {
  @@ -696,7 +716,7 @@
               }
           }
           
  -        public synchronized void remove(long socket) {
  +        public void remove(long socket) {
               int rv = Poll.remove(serverPollset, socket);
               if (rv == Status.APR_SUCCESS) {
                   keepAliveCount--;
  @@ -730,50 +750,56 @@
                   }
   
                   try {
  -                    // Pool for one second
  -                    // FIXME: Polling time could be configurable
  -                    int rv = Poll.poll(serverPollset, 100000, desc);
  -                    for (int n = 0; n < rv; n++) {
  -                        long socket = Poll.socket(desc[n]);
  -                        int pool = (int) Poll.data(desc[n]);
  -                        remove(socket);
  -                        
  -                        int events = Poll.events(desc[n]);
  -                        
  -                        if (((events & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
  -                                || ((events & Poll.APR_POLLERR) == 
Poll.APR_POLLERR)) {
  -                            // Close socket and clear pool
  -                            Pool.destroy(pool);
  -                            continue;
  -                        }
  -
  -                        if (!((events & Poll.APR_POLLIN) == 
Poll.APR_POLLIN)) {
  -                            // Close socket and clear pool
  -                            Pool.destroy(pool);
  -                            continue;
  +                    // Pool for the specified interval
  +                    int rv = Poll.poll(serverPollset, pollTime, desc);
  +                    if (rv > 0) {
  +                        synchronized (this) {
  +                            for (int n = 0; n < rv; n++) {
  +                                // Remove each socket from the poll right 
away
  +                                sockets[n] = Poll.socket(desc[n]);
  +                                remove(sockets[n]);
  +                            }
                           }
  -                        
  -                        // Allocate a new worker thread
  -                        Worker workerThread = createWorkerThread();
  -                        while (workerThread == null) {
  -                            try {
  -                                // Wait a little for load to go down: as a 
result, 
  -                                // no accept will be made until the 
concurrency is
  -                                // lower than the specified maxThreads, and 
current
  -                                // connections will wait for a little bit 
instead of
  -                                // failing right away.
  -                                Thread.sleep(100);
  -                            } catch (InterruptedException e) {
  -                                // Ignore
  +                        for (int n = 0; n < rv; n++) {
  +                            // Get the socket pool
  +                            int pool = (int) Poll.data(desc[n]);
  +                            // Get retuned events for this socket
  +                            int events = Poll.events(desc[n]);
  +                            //System.out.println("Events: " + sockets[n] + " 
code: " + events + " OK: " + Poll.APR_POLLIN);
  +                            // Problem events
  +                            if (((events & Poll.APR_POLLHUP) == 
Poll.APR_POLLHUP)
  +                                    || ((events & Poll.APR_POLLERR) == 
Poll.APR_POLLERR)) {
  +                                // Close socket and clear pool
  +                                Pool.destroy(pool);
  +                                continue;
                               }
  -                            workerThread = createWorkerThread();
  +                            // Anything non normal
  +                            if (!((events & Poll.APR_POLLIN) == 
Poll.APR_POLLIN)) {
  +                                // Close socket and clear pool
  +                                Pool.destroy(pool);
  +                                continue;
  +                            }
  +                            // Allocate a new worker thread
  +                            Worker workerThread = createWorkerThread();
  +                            while (workerThread == null) {
  +                                try {
  +                                    // Wait a little for load to go down: as 
a result, 
  +                                    // no accept will be made until the 
concurrency is
  +                                    // lower than the specified maxThreads, 
and current
  +                                    // connections will wait for a little 
bit instead of
  +                                    // failing right away.
  +                                    Thread.sleep(100);
  +                                } catch (InterruptedException e) {
  +                                    // Ignore
  +                                }
  +                                workerThread = createWorkerThread();
  +                            }
  +                            // Hand this socket off to an appropriate 
processor
  +                            //System.out.println("Process: " + sockets[n]);
  +                            workerThread.assign(sockets[n], pool);
                           }
  -                        
  -                        // Hand this socket off to an appropriate processor
  -                        workerThread.assign(socket, pool);
  -
                       }
  -                } catch(Throwable t) {
  +                } catch (Throwable t) {
                       // FIXME: Proper logging
                       t.printStackTrace();
                   }
  
  
  
  1.3       +3 -2      
jakarta-tomcat-connectors/http11/src/java/org/apache/coyote/http11/Http11AprProcessor.java
  
  Index: Http11AprProcessor.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-connectors/http11/src/java/org/apache/coyote/http11/Http11AprProcessor.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- Http11AprProcessor.java   15 Apr 2005 13:46:11 -0000      1.2
  +++ Http11AprProcessor.java   15 Apr 2005 15:49:33 -0000      1.3
  @@ -39,6 +39,7 @@
   import org.apache.coyote.http11.filters.VoidInputFilter;
   import org.apache.coyote.http11.filters.VoidOutputFilter;
   import org.apache.coyote.http11.filters.BufferedInputFilter;
  +import org.apache.tomcat.jni.Address;
   import org.apache.tomcat.jni.Socket;
   import org.apache.tomcat.util.buf.Ascii;
   import org.apache.tomcat.util.buf.ByteChunk;
  @@ -75,7 +76,7 @@
           
           request = new Request();
           inputBuffer = new InternalAprInputBuffer(request, headerBufferSize, 
  -                endpoint.getSoTimeout());
  +                endpoint.getFirstReadPollerTimeout());
           request.setInputBuffer(inputBuffer);
   
           response = new Response();
  @@ -755,7 +756,7 @@
                   if( !disableUploadTimeout && keptAlive && soTimeout > 0 ) {
                       Socket.timeoutSet(socket, soTimeout);
                   }
  -                if (!inputBuffer.parseRequestLine(keptAlive)) {
  +                if (!inputBuffer.parseRequestLine()) {
                       // This means that no data is available right now
                       // (long keepalive), so that the processor should be 
recycled
                       // and the method should return true
  
  
  
  1.4       +17 -22    
jakarta-tomcat-connectors/http11/src/java/org/apache/coyote/http11/InternalAprInputBuffer.java
  
  Index: InternalAprInputBuffer.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-connectors/http11/src/java/org/apache/coyote/http11/InternalAprInputBuffer.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- InternalAprInputBuffer.java       15 Apr 2005 13:46:11 -0000      1.3
  +++ InternalAprInputBuffer.java       15 Apr 2005 15:49:33 -0000      1.4
  @@ -48,7 +48,8 @@
       /**
        * Alternate constructor.
        */
  -    public InternalAprInputBuffer(Request request, int headerBufferSize, 
long timeout) {
  +    public InternalAprInputBuffer(Request request, int headerBufferSize, 
  +                                  long readTimeout) {
   
           this.request = request;
           headers = request.getMimeHeaders();
  @@ -70,7 +71,7 @@
           parsingHeader = true;
           swallowInput = true;
           
  -        this.timeout = timeout;
  +        this.readTimeout = readTimeout * 1000;
   
       }
   
  @@ -192,9 +193,10 @@
   
   
       /**
  -     * The socket timeout.
  +     * The socket timeout used when reading the first block of the request
  +     * header.
        */
  -    protected long timeout;
  +    protected long readTimeout;
       
       
       // ------------------------------------------------------------- 
Properties
  @@ -221,8 +223,6 @@
        */
       public void addFilter(InputFilter filter) {
   
  -        // FIXME: Check for null ?
  -
           InputFilter[] newFilterLibrary = 
               new InputFilter[filterLibrary.length + 1];
           for (int i = 0; i < filterLibrary.length; i++) {
  @@ -378,7 +378,7 @@
        * @return true if data is properly fed; false if no data is available 
        * immediately and thread should be freed
        */
  -    public boolean parseRequestLine(boolean keptAlive)
  +    public boolean parseRequestLine()
           throws IOException {
   
           int start = 0;
  @@ -392,18 +392,13 @@
   
               // Read new bytes if needed
               if (pos >= lastValid) {
  -                if (keptAlive) {
  -                    // Do a simple read with a short timeout
  -                    int nRead = Socket.recvt(socket, buf, pos, buf.length - 
lastValid, 50000);
  -                    if (nRead > 0) {
  -                        lastValid = pos + nRead;
  -                    }
  -                    if (pos >= lastValid) {
  -                        return false;
  -                    }
  +                // Do a simple read with a short timeout
  +                int nRead = Socket.recvt
  +                    (socket, buf, pos, buf.length - lastValid, readTimeout);
  +                if (nRead > 0) {
  +                    lastValid = pos + nRead;
                   } else {
  -                    if (!fill())
  -                        throw new 
EOFException(sm.getString("iib.eof.error"));
  +                    return false;
                   }
               }
   
  @@ -416,13 +411,13 @@
           // Mark the current buffer position
           start = pos;
   
  -        if (keptAlive && pos >= lastValid) {
  +        if (pos >= lastValid) {
               // Do a simple read with a short timeout
  -            int nRead = Socket.recvt(socket, buf, pos, buf.length - 
lastValid, 50000);
  +            int nRead = Socket.recvt
  +                (socket, buf, pos, buf.length - lastValid, readTimeout);
               if (nRead > 0) {
                   lastValid = pos + nRead;
  -            }
  -            if (pos >= lastValid) {
  +            } else {
                   return false;
               }
           }
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to