mturk 2005/04/18 06:57:12 Modified: util/java/org/apache/tomcat/util/net AprEndpoint.java Log: Fix pool usage. Use indexed descriptors for obtaining poll params and data. It spares 3 JNI calls for each polled socket. Revision Changes Path 1.8 +83 -91 jakarta-tomcat-connectors/util/java/org/apache/tomcat/util/net/AprEndpoint.java Index: AprEndpoint.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/util/java/org/apache/tomcat/util/net/AprEndpoint.java,v retrieving revision 1.7 retrieving revision 1.8 diff -u -r1.7 -r1.8 --- AprEndpoint.java 15 Apr 2005 17:19:52 -0000 1.7 +++ AprEndpoint.java 18 Apr 2005 13:57:12 -0000 1.8 @@ -53,7 +53,7 @@ protected static Log log = LogFactory.getLog(AprEndpoint.class); - protected static StringManager sm = + protected static StringManager sm = StringManager.getManager("org.apache.tomcat.util.net.res"); @@ -70,27 +70,27 @@ * The acceptor thread. */ protected Thread acceptorThread = null; - - + + /** * The socket poller. */ protected Poller poller = null; - - + + /** * The socket poller thread. */ protected Thread pollerThread = null; - - + + /** * The sendfile thread. */ // FIXME: Add senfile support protected Thread sendfileThread = null; - - + + /** * Available processors. */ @@ -108,14 +108,14 @@ * Running state of the endpoint. */ protected volatile boolean running = false; - - + + /** * Will be set to true whenever the endpoint is paused. */ protected volatile boolean paused = false; - - + + /** * Track the initialization state of the endpoint. */ @@ -126,14 +126,14 @@ * Current worker threads busy count. */ protected int curThreadsBusy = 0; - + /** * Current worker threads count. */ protected int curThreads = 0; - - + + /** * Sequence number used to generate thread names. */ @@ -144,14 +144,14 @@ * Root APR memory pool. */ protected long rootPool = 0; - - + + /** * Server socket "pointer". */ protected long serverSock = 0; - - + + /** * APR memory pool for the server socket. */ @@ -167,8 +167,8 @@ protected int maxThreads = 20; public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; } public int getMaxThreads() { return maxThreads; } - - + + /** * Priority of the acceptor and poller threads. */ @@ -192,7 +192,7 @@ public int getPort() { return port; } public void setPort(int port ) { this.port=port; } - + /** * Address for the server socket. */ @@ -200,7 +200,7 @@ public InetAddress getAddress() { return address; } public void setAddress(InetAddress address) { this.address = address; } - + /** * Handling of accepted sockets. */ @@ -260,7 +260,7 @@ public void setPollTime(int pollTime) { this.pollTime = pollTime; } - /** + /** * The default is true - the created threads will be * in daemon mode. If set to false, the control thread * will not be daemon - and will keep the process alive. @@ -277,15 +277,15 @@ public void setName(String name) { this.name = name; } public String getName() { return name; } - + /** * Number of keepalive sockets. */ protected int keepAliveCount = 0; public int getKeepAliveCount() { return keepAliveCount; } public void setKeepAliveCount(int keepAliveCount) { this.keepAliveCount = keepAliveCount; } - - + + /** * Dummy maxSpareThreads property. */ @@ -303,9 +303,9 @@ /** * Return the APR memory pool for the server socket, to be used by handler - * which would need to allocate things like pollers, while having + * which would need to allocate things like pollers, while having * consistent resource handling. - * + * * @return the id for the server socket pool */ public long getServerSocketPool() { @@ -315,7 +315,7 @@ /** * Return the amount of threads that are managed by the pool. - * + * * @return the amount of threads that are managed by the pool */ public int getCurrentThreadCount() { @@ -325,17 +325,17 @@ /** * Return the amount of threads currently busy. - * + * * @return the amount of threads currently busy */ public int getCurrentThreadsBusy() { return curThreadsBusy; } - + /** * Return the state of the endpoint. - * + * * @return true if the endpoint is running, false otherwise */ public boolean isRunning() { @@ -345,14 +345,14 @@ /** * Return the state of the endpoint. - * + * * @return true if the endpoint is paused, false otherwise */ public boolean isPaused() { return paused; } - + // ----------------------------------------------- Public Lifecycle Methods @@ -360,10 +360,10 @@ * Initialize the endpoint. */ public void init() throws Exception { - + if (initialized) return; - + try { // Initialize APR Library.initialize(null); @@ -379,10 +379,10 @@ addressStr = "" + address; } long inetAddress = Address.info(addressStr, Socket.APR_INET, - port, 0, serverSockPool); + port, 0, rootPool); // Create the APR server socket serverSock = Socket.create(Socket.APR_INET, Socket.SOCK_STREAM, - Socket.APR_PROTO_TCP, serverSockPool); + Socket.APR_PROTO_TCP, rootPool); // Bind the server socket Socket.bind(serverSock, inetAddress); // Start listening on the server socket @@ -392,7 +392,7 @@ throw e; } initialized = true; - + } @@ -417,7 +417,7 @@ pollerThread.setPriority(getThreadPriority()); pollerThread.setDaemon(true); pollerThread.start(); - + // Start sendfile thread // FIXME: Implement sendfile support } @@ -451,14 +451,17 @@ if (running) { stop(); } + Pool.destroy(serverSockPool); + serverSockPool = 0; // Close server socket Socket.close(serverSock); // Close all APR memory pools and resources Pool.destroy(rootPool); + rootPool = 0; initialized = false ; } - + // ------------------------------------------------------ Protected Methods @@ -500,14 +503,14 @@ } } - + /** * Set options on a newly accepted socket. - * + * * @param socket "pointer" to the accepted socket */ protected void setSocketOptions(long socket) { - if (soLinger >= 0) + if (soLinger >= 0) Socket.optSet(socket, Socket.APR_SO_LINGER, soLinger); if (tcpNoDelay) Socket.optSet(socket, Socket.APR_TCP_NODELAY, (tcpNoDelay ? 1 : 0)); @@ -515,16 +518,16 @@ Socket.timeoutSet(socket, soTimeout); } - + protected boolean processSocket(long s) { // Process the connection int step = 1; boolean result = true; try { - + // 1: Set socket options: timeout, linger, etc setSocketOptions(s); - + // 2: SSL handshake step = 2; // FIXME: SSL implementation so that Bill is happy @@ -533,11 +536,11 @@ getServerSocketFactory().handshake(s); } */ - + // 3: Process the connection step = 3; result = getHandler().process(s); - + } catch (Throwable t) { if (step == 2) { if (log.isDebugEnabled()) { @@ -584,7 +587,7 @@ } - + /** * Create and return a new processor suitable for processing HTTP * requests and returning the corresponding responses. @@ -598,7 +601,7 @@ } - + /** * Return a new worker thread, and block while to worker is available. */ @@ -607,7 +610,7 @@ Worker workerThread = createWorkerThread(); while (workerThread == null) { try { - // Wait a little for load to go down: as a result, + // Wait a little for load to go down: as a result, // no accept will be made until the concurrency is // lower than the specified maxThreads, and current // connections will wait for a little bit instead of @@ -620,7 +623,7 @@ } return workerThread; } - + /** * Recycle the specified Processor so that it can be used again. @@ -642,8 +645,8 @@ * Server socket acceptor thread. */ protected class Acceptor implements Runnable { - - + + /** * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. @@ -664,7 +667,7 @@ // Allocate a new worker thread Worker workerThread = getWorkerThread(); - + // Accept the next incoming connection from the server socket long socket = 0; long pool = 0; @@ -697,17 +700,14 @@ /** * Poller class. - * + * * FIXME: Windows support using 64 sized pollers */ protected class Poller implements Runnable { - + protected long serverPollset = 0; protected long pool = 0; protected long[] desc; - protected long[] sockets; - protected long[] events; - protected long[] pools; public Poller(int size) { pool = Pool.create(serverSockPool); @@ -717,26 +717,23 @@ // FIXME: more appropriate logging e.printStackTrace(); } - desc = new long[size]; - sockets = new long[size]; - events = new long[size]; - pools = new long[size]; + desc = new long[size * 4]; } - + public synchronized void add(long socket, long pool) { int rv = Poll.add(serverPollset, socket, pool, Poll.APR_POLLIN); if (rv == Status.APR_SUCCESS) { keepAliveCount++; } } - + public void remove(long socket) { int rv = Poll.remove(serverPollset, socket); if (rv == Status.APR_SUCCESS) { keepAliveCount--; } } - + /** * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. @@ -767,34 +764,28 @@ // Pool for the specified interval int rv = Poll.poll(serverPollset, pollTime, desc); if (rv > 0) { - synchronized (this) { - for (int n = 0; n < rv; n++) { - sockets[n] = Poll.socket(desc[n]); - // Get the socket pool - pools[n] = Poll.data(desc[n]); - // Get retuned events for this socket - events[n] = Poll.events(desc[n]); - // Remove each socket from the poll right away - remove(sockets[n]); - } - } for (int n = 0; n < rv; n++) { - // Problem events - if (((events[n] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP) - || ((events[n] & Poll.APR_POLLERR) == Poll.APR_POLLERR)) { + // Remove the socket from the pollset + remove(desc[n*4+1]); + // Check for failed sockets + if (((desc[n*4] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP) + || ((desc[n*4] & Poll.APR_POLLERR) == Poll.APR_POLLERR)) { // Close socket and clear pool - Pool.destroy(pools[n]); + Pool.destroy(desc[n*4+2]); continue; } // Hand this socket off to a worker - getWorkerThread().assign(sockets[n], pools[n]); + getWorkerThread().assign(desc[n*4+1], desc[n*4+2]); } } + else if (rv < 0) { + // TODO: Poll is probably unusable. So it should bail out. + } } catch (Throwable t) { // FIXME: Proper logging t.printStackTrace(); } - + } // Notify the threadStop() method that we have shut ourselves down @@ -806,7 +797,7 @@ } - + // ----------------------------------------------------- Worker Inner Class @@ -895,6 +886,7 @@ } else { // Close socket and pool Pool.destroy(pool); + pool = 0; } // Finish up this request @@ -929,12 +921,12 @@ /** * Bare bones interface used for socket processing. Per thread data is to be - * stored in the ThreadWithAttributes extra folders, or alternately in + * stored in the ThreadWithAttributes extra folders, or alternately in * thread local fields. */ public interface Handler { public boolean process(long socket); } - - + + }
--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]