remm 2005/04/15 08:49:33 Modified: util/java/org/apache/tomcat/util/net AprEndpoint.java http11/src/java/org/apache/coyote/http11 Http11AprProcessor.java InternalAprInputBuffer.java Log: - Add configuration flags. - Better sync for the poller. - There's a problem with the poller in this commit under investigation, though. Revision Changes Path 1.4 +72 -46 jakarta-tomcat-connectors/util/java/org/apache/tomcat/util/net/AprEndpoint.java Index: AprEndpoint.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/util/java/org/apache/tomcat/util/net/AprEndpoint.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- AprEndpoint.java 15 Apr 2005 09:23:37 -0000 1.3 +++ AprEndpoint.java 15 Apr 2005 15:49:32 -0000 1.4 @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tomcat.jni.Address; +import org.apache.tomcat.jni.Error; import org.apache.tomcat.jni.Library; import org.apache.tomcat.jni.Poll; import org.apache.tomcat.jni.Pool; @@ -242,6 +243,23 @@ public void setSoTimeout(int soTimeout) { this.soTimeout = soTimeout; } + /** + * Timeout on first request read before going to the poller, in ms. + */ + protected int firstReadPollerTimeout = 100; + public int getFirstReadPollerTimeout() { return firstReadPollerTimeout; } + public void setFirstReadPollerTimeout(int firstReadPollerTimeout) { this.firstReadPollerTimeout = firstReadPollerTimeout; } + + + /** + * Poll interval, in microseconds. The smaller the value, the more CPU the poller + * will use, but the more responsive to activity it will be. + */ + protected int pollTime = 100000; + public int getPollTime() { return pollTime; } + public void setPollTime(int pollTime) { this.pollTime = pollTime; } + + /** * The default is true - the created threads will be * in daemon mode. If set to false, the control thread @@ -356,7 +374,7 @@ // Create the APR address that will be bound String addressStr = null; if (address == null) { - addressStr = "0.0.0.0"; + addressStr = null; } else { addressStr = "" + address; } @@ -677,16 +695,18 @@ protected long serverPollset = 0; protected long pool = 0; protected long[] desc; + protected long[] sockets; public Poller(int size) { + pool = Pool.create(serverSockPool); try { - pool = Pool.create(serverSockPool); serverPollset = Poll.create(size, pool, 0, soTimeout * 1000); - desc = new long[size]; - } catch( Exception ex ) { + } catch (Error e) { // FIXME: more appropriate logging - ex.printStackTrace(); + e.printStackTrace(); } + desc = new long[size]; + sockets = new long[size]; } public synchronized void add(long socket, long pool) { @@ -696,7 +716,7 @@ } } - public synchronized void remove(long socket) { + public void remove(long socket) { int rv = Poll.remove(serverPollset, socket); if (rv == Status.APR_SUCCESS) { keepAliveCount--; @@ -730,50 +750,56 @@ } try { - // Pool for one second - // FIXME: Polling time could be configurable - int rv = Poll.poll(serverPollset, 100000, desc); - for (int n = 0; n < rv; n++) { - long socket = Poll.socket(desc[n]); - int pool = (int) Poll.data(desc[n]); - remove(socket); - - int events = Poll.events(desc[n]); - - if (((events & Poll.APR_POLLHUP) == Poll.APR_POLLHUP) - || ((events & Poll.APR_POLLERR) == Poll.APR_POLLERR)) { - // Close socket and clear pool - Pool.destroy(pool); - continue; - } - - if (!((events & Poll.APR_POLLIN) == Poll.APR_POLLIN)) { - // Close socket and clear pool - Pool.destroy(pool); - continue; + // Pool for the specified interval + int rv = Poll.poll(serverPollset, pollTime, desc); + if (rv > 0) { + synchronized (this) { + for (int n = 0; n < rv; n++) { + // Remove each socket from the poll right away + sockets[n] = Poll.socket(desc[n]); + remove(sockets[n]); + } } - - // Allocate a new worker thread - Worker workerThread = createWorkerThread(); - while (workerThread == null) { - try { - // Wait a little for load to go down: as a result, - // no accept will be made until the concurrency is - // lower than the specified maxThreads, and current - // connections will wait for a little bit instead of - // failing right away. - Thread.sleep(100); - } catch (InterruptedException e) { - // Ignore + for (int n = 0; n < rv; n++) { + // Get the socket pool + int pool = (int) Poll.data(desc[n]); + // Get retuned events for this socket + int events = Poll.events(desc[n]); + //System.out.println("Events: " + sockets[n] + " code: " + events + " OK: " + Poll.APR_POLLIN); + // Problem events + if (((events & Poll.APR_POLLHUP) == Poll.APR_POLLHUP) + || ((events & Poll.APR_POLLERR) == Poll.APR_POLLERR)) { + // Close socket and clear pool + Pool.destroy(pool); + continue; } - workerThread = createWorkerThread(); + // Anything non normal + if (!((events & Poll.APR_POLLIN) == Poll.APR_POLLIN)) { + // Close socket and clear pool + Pool.destroy(pool); + continue; + } + // Allocate a new worker thread + Worker workerThread = createWorkerThread(); + while (workerThread == null) { + try { + // Wait a little for load to go down: as a result, + // no accept will be made until the concurrency is + // lower than the specified maxThreads, and current + // connections will wait for a little bit instead of + // failing right away. + Thread.sleep(100); + } catch (InterruptedException e) { + // Ignore + } + workerThread = createWorkerThread(); + } + // Hand this socket off to an appropriate processor + //System.out.println("Process: " + sockets[n]); + workerThread.assign(sockets[n], pool); } - - // Hand this socket off to an appropriate processor - workerThread.assign(socket, pool); - } - } catch(Throwable t) { + } catch (Throwable t) { // FIXME: Proper logging t.printStackTrace(); } 1.3 +3 -2 jakarta-tomcat-connectors/http11/src/java/org/apache/coyote/http11/Http11AprProcessor.java Index: Http11AprProcessor.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/http11/src/java/org/apache/coyote/http11/Http11AprProcessor.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- Http11AprProcessor.java 15 Apr 2005 13:46:11 -0000 1.2 +++ Http11AprProcessor.java 15 Apr 2005 15:49:33 -0000 1.3 @@ -39,6 +39,7 @@ import org.apache.coyote.http11.filters.VoidInputFilter; import org.apache.coyote.http11.filters.VoidOutputFilter; import org.apache.coyote.http11.filters.BufferedInputFilter; +import org.apache.tomcat.jni.Address; import org.apache.tomcat.jni.Socket; import org.apache.tomcat.util.buf.Ascii; import org.apache.tomcat.util.buf.ByteChunk; @@ -75,7 +76,7 @@ request = new Request(); inputBuffer = new InternalAprInputBuffer(request, headerBufferSize, - endpoint.getSoTimeout()); + endpoint.getFirstReadPollerTimeout()); request.setInputBuffer(inputBuffer); response = new Response(); @@ -755,7 +756,7 @@ if( !disableUploadTimeout && keptAlive && soTimeout > 0 ) { Socket.timeoutSet(socket, soTimeout); } - if (!inputBuffer.parseRequestLine(keptAlive)) { + if (!inputBuffer.parseRequestLine()) { // This means that no data is available right now // (long keepalive), so that the processor should be recycled // and the method should return true 1.4 +17 -22 jakarta-tomcat-connectors/http11/src/java/org/apache/coyote/http11/InternalAprInputBuffer.java Index: InternalAprInputBuffer.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/http11/src/java/org/apache/coyote/http11/InternalAprInputBuffer.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- InternalAprInputBuffer.java 15 Apr 2005 13:46:11 -0000 1.3 +++ InternalAprInputBuffer.java 15 Apr 2005 15:49:33 -0000 1.4 @@ -48,7 +48,8 @@ /** * Alternate constructor. */ - public InternalAprInputBuffer(Request request, int headerBufferSize, long timeout) { + public InternalAprInputBuffer(Request request, int headerBufferSize, + long readTimeout) { this.request = request; headers = request.getMimeHeaders(); @@ -70,7 +71,7 @@ parsingHeader = true; swallowInput = true; - this.timeout = timeout; + this.readTimeout = readTimeout * 1000; } @@ -192,9 +193,10 @@ /** - * The socket timeout. + * The socket timeout used when reading the first block of the request + * header. */ - protected long timeout; + protected long readTimeout; // ------------------------------------------------------------- Properties @@ -221,8 +223,6 @@ */ public void addFilter(InputFilter filter) { - // FIXME: Check for null ? - InputFilter[] newFilterLibrary = new InputFilter[filterLibrary.length + 1]; for (int i = 0; i < filterLibrary.length; i++) { @@ -378,7 +378,7 @@ * @return true if data is properly fed; false if no data is available * immediately and thread should be freed */ - public boolean parseRequestLine(boolean keptAlive) + public boolean parseRequestLine() throws IOException { int start = 0; @@ -392,18 +392,13 @@ // Read new bytes if needed if (pos >= lastValid) { - if (keptAlive) { - // Do a simple read with a short timeout - int nRead = Socket.recvt(socket, buf, pos, buf.length - lastValid, 50000); - if (nRead > 0) { - lastValid = pos + nRead; - } - if (pos >= lastValid) { - return false; - } + // Do a simple read with a short timeout + int nRead = Socket.recvt + (socket, buf, pos, buf.length - lastValid, readTimeout); + if (nRead > 0) { + lastValid = pos + nRead; } else { - if (!fill()) - throw new EOFException(sm.getString("iib.eof.error")); + return false; } } @@ -416,13 +411,13 @@ // Mark the current buffer position start = pos; - if (keptAlive && pos >= lastValid) { + if (pos >= lastValid) { // Do a simple read with a short timeout - int nRead = Socket.recvt(socket, buf, pos, buf.length - lastValid, 50000); + int nRead = Socket.recvt + (socket, buf, pos, buf.length - lastValid, readTimeout); if (nRead > 0) { lastValid = pos + nRead; - } - if (pos >= lastValid) { + } else { return false; } }
--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]