remm 2004/10/01 16:46:57 Modified: util/java/org/apache/tomcat/util/net PoolTcpEndpoint.java http11/src/java/org/apache/coyote/http11 Http11Protocol.java Added: util/java/org/apache/tomcat/util/net LeaderFollowerWorkerThread.java MasterSlaveWorkerThread.java Log: - Hack in an alternate thread pool strategy (which is basically the TC 4.0 thread pool). - The problem is that there are some environments where the default thread pool doesn't work well, and there's some black magic involved with it. - Another advantage of the thread pool is that its simple design allows server socket restart to mostly work (at least in TC 4.0 it worked), so I think this is a good endpoint for not-that-stable VMs/OS combinations. A drawback is that the thread pool won't scale back (OTOH, scaling back is dangerous as it could cause memory leaks depending on what the application is doing). - I think in the future we could try to use the Java 5 thread pool (although for now, I haven't figured out a way to use it efficiently with our stuff). - Similarly, I haven't found a way to use ThreadPool efficiently with a dedicated socket listener thread. So the code is in PTcpEndpoint. - From an efficiency standpoint, there's no measureable difference between the two thread pools on a 1 CPU machine (no big surprise). - The default obviously remains the current thread pool. Revision Changes Path 1.39 +227 -159 jakarta-tomcat-connectors/util/java/org/apache/tomcat/util/net/PoolTcpEndpoint.java Index: PoolTcpEndpoint.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/util/java/org/apache/tomcat/util/net/PoolTcpEndpoint.java,v retrieving revision 1.38 retrieving revision 1.39 diff -u -r1.38 -r1.39 --- PoolTcpEndpoint.java 13 Jul 2004 09:43:59 -0000 1.38 +++ PoolTcpEndpoint.java 1 Oct 2004 23:46:57 -0000 1.39 @@ -24,6 +24,8 @@ import java.net.Socket; import java.net.SocketException; import java.security.AccessControlException; +import java.util.Stack; +import java.util.Vector; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -54,7 +56,9 @@ * @author Gal Shachor [EMAIL PROTECTED] * @author Yoav Shapira <[EMAIL PROTECTED]> */ -public class PoolTcpEndpoint { // implements Endpoint { +public class PoolTcpEndpoint implements Runnable { // implements Endpoint { + + static Log log=LogFactory.getLog(PoolTcpEndpoint.class ); private StringManager sm = StringManager.getManager("org.apache.tomcat.util.net.res"); @@ -64,33 +68,46 @@ private final Object threadSync = new Object(); - private boolean isPool = true; - private int backlog = BACKLOG; private int serverTimeout = TIMEOUT; - TcpConnectionHandler handler; - private InetAddress inet; private int port; private ServerSocketFactory factory; private ServerSocket serverSocket; - ThreadPoolRunnable listener; private volatile boolean running = false; private volatile boolean paused = false; private boolean initialized = false; private boolean reinitializing = false; static final int debug=0; - ThreadPool tp; - - static Log log=LogFactory.getLog(PoolTcpEndpoint.class ); - protected boolean tcpNoDelay=false; protected int linger=100; protected int socketTimeout=-1; + private boolean lf = false; + + + // ------ Leader follower fields + + + TcpConnectionHandler handler; + ThreadPoolRunnable listener; + ThreadPool tp; + + + // ------ Master slave fields + + /* The background thread. */ + private Thread thread = null; + /* Available processors. */ + private Stack workerThreads = new Stack(); + private int curThreads = 0; + private int maxThreads = 20; + /* All processors which have been created. */ + private Vector created = new Vector(); + public PoolTcpEndpoint() { tp = new ThreadPool(); @@ -102,14 +119,6 @@ // -------------------- Configuration -------------------- - public void setPoolOn(boolean isPool) { - this.isPool = isPool; - } - - public boolean isPoolOn() { - return isPool; - } - public void setMaxThreads(int maxThreads) { if( maxThreads > 0) tp.setMaxThreads(maxThreads); @@ -247,13 +256,37 @@ serverTimeout=i; } + public String getStrategy() { + if (lf) { + return "lf"; + } else { + return "ms"; + } + } + + public void setStrategy(String strategy) { + if ("ms".equals(strategy)) { + lf = false; + } else { + lf = true; + } + } + + public int getCurrentThreadCount() { + return curThreads; + } + + public int getCurrentThreadsBusy() { + return curThreads - workerThreads.size(); + } + // -------------------- Public methods -------------------- public void initEndpoint() throws IOException, InstantiationException { - try { - if(factory==null) - factory=ServerSocketFactory.getDefault(); - if(serverSocket==null) { + try { + if(factory==null) + factory=ServerSocketFactory.getDefault(); + if(serverSocket==null) { try { if (inet == null) { serverSocket = factory.createSocket(port, backlog); @@ -263,34 +296,33 @@ } catch ( BindException be ) { throw new BindException(be.getMessage() + ":" + port); } - } + } if( serverTimeout >= 0 ) - serverSocket.setSoTimeout( serverTimeout ); - } catch( IOException ex ) { - // log("couldn't start endpoint", ex, Logger.DEBUG); + serverSocket.setSoTimeout( serverTimeout ); + } catch( IOException ex ) { throw ex; - } catch( InstantiationException ex1 ) { - // log("couldn't start endpoint", ex1, Logger.DEBUG); + } catch( InstantiationException ex1 ) { throw ex1; - } + } initialized = true; } - + public void startEndpoint() throws IOException, InstantiationException { if (!initialized) { initEndpoint(); } - if(isPool) { - tp.start(); - } - running = true; + if (lf) { + tp.start(); + } + running = true; paused = false; - if(isPool) { - listener = new TcpWorkerThread(this); + if (lf) { + listener = new LeaderFollowerWorkerThread(this); tp.runIt(listener); } else { - log.error("XXX Error - need pool !"); - } + maxThreads = getMaxThreads(); + threadStart(); + } } public void pauseEndpoint() { @@ -307,13 +339,17 @@ } public void stopEndpoint() { - if (running) { - tp.shutdown(); - running = false; + if (running) { + if (lf) { + tp.shutdown(); + } else { + threadStop(); + } + running = false; if (serverSocket != null) { closeServerSocket(); } - } + } } protected void closeServerSocket() { @@ -456,33 +492,6 @@ return accepted; } - /** @deprecated - */ - public void log(String msg) - { - log.info(msg); - } - - /** @deprecated - */ - public void log(String msg, Throwable t) - { - log.error( msg, t ); - } - - /** @deprecated - */ - public void log(String msg, int level) - { - log.info( msg ); - } - - /** @deprecated - */ - public void log(String msg, Throwable t, int level) { - log.error( msg, t ); - } - void setSocketOptions(Socket socket) throws SocketException { if(linger >= 0 ) @@ -493,120 +502,179 @@ socket.setSoTimeout( socketTimeout ); } -} + + void processSocket(Socket s, TcpConnection con, Object[] threadData) { + // Process the connection + int step = 1; + try { + + // 1: Set socket options: timeout, linger, etc + setSocketOptions(s); + + // 2: SSL handshake + step = 2; + if (getServerSocketFactory() != null) { + getServerSocketFactory().handshake(s); + } + + // 3: Process the connection + step = 3; + con.setEndpoint(this); + con.setSocket(s); + getConnectionHandler().processConnection(con, threadData); + + } catch (SocketException se) { + PoolTcpEndpoint.log.error( + "Remote Host " + + s.getInetAddress() + + " SocketException: " + + se.getMessage()); + // Try to close the socket + try { + s.close(); + } catch (IOException e) { + } + } catch (Throwable t) { + if (step == 2) { + PoolTcpEndpoint.log.debug("Handshake failed", t); + } else { + PoolTcpEndpoint.log.error("Unexpected error", t); + } + // Try to close the socket + try { + s.close(); + } catch (IOException e) { + } + } finally { + if (con != null) { + con.recycle(); + } + } + } + -// -------------------- Threads -------------------- + // -------------------------------------------------- Master Slave Methods -/* - * I switched the threading model here. - * - * We used to have a "listener" thread and a "connection" - * thread, this results in code simplicity but also a needless - * thread switch. - * - * Instead I am now using a pool of threads, all the threads are - * simmetric in their execution and no thread switch is needed. - */ -class TcpWorkerThread implements ThreadPoolRunnable { - /* This is not a normal Runnable - it gets attached to an existing - thread, runs and when run() ends - the thread keeps running. - It's better to keep the name ThreadPoolRunnable - avoid confusion. - We also want to use per/thread data and avoid sync wherever possible. - */ - PoolTcpEndpoint endpoint; + /** + * Create (or allocate) and return an available processor for use in + * processing a specific HTTP request, if possible. If the maximum + * allowed processors have already been created and are in use, return + * <code>null</code> instead. + */ + private MasterSlaveWorkerThread createWorkerThread() { + + synchronized (workerThreads) { + if (workerThreads.size() > 0) { + return ((MasterSlaveWorkerThread) workerThreads.pop()); + } + if ((maxThreads > 0) && (curThreads < maxThreads)) { + return (newWorkerThread()); + } else { + if (maxThreads < 0) { + return (newWorkerThread()); + } else { + return (null); + } + } + } + + } + - public TcpWorkerThread(PoolTcpEndpoint endpoint) { - this.endpoint = endpoint; + /** + * Create and return a new processor suitable for processing HTTP + * requests and returning the corresponding responses. + */ + private MasterSlaveWorkerThread newWorkerThread() { + + MasterSlaveWorkerThread workerThread = + new MasterSlaveWorkerThread(this, tp.getName() + "-" + (++curThreads)); + workerThread.start(); + created.addElement(workerThread); + return (workerThread); + } - public Object[] getInitData() { - // no synchronization overhead, but 2 array access - Object obj[]=new Object[2]; - obj[1]= endpoint.getConnectionHandler().init(); - obj[0]=new TcpConnection(); - return obj; + + /** + * Recycle the specified Processor so that it can be used again. + * + * @param processor The processor to be recycled + */ + void recycleWorkerThread(MasterSlaveWorkerThread workerThread) { + workerThreads.push(workerThread); } + - public void runIt(Object perThrData[]) { + /** + * The background thread that listens for incoming TCP/IP connections and + * hands them off to an appropriate processor. + */ + public void run() { - // Create per-thread cache - if (endpoint.isRunning()) { + // Loop until we receive a shutdown command + while (running) { // Loop if endpoint is paused - while (endpoint.isPaused()) { + while (paused) { try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore } } - - // Accept a new connection - Socket s = null; - try { - s = endpoint.acceptSocket(); - } finally { - // Continue accepting on another thread... - if (endpoint.isRunning()) { - endpoint.tp.runIt(this); + + // Accept the next incoming connection from the server socket + Socket socket = acceptSocket(); + + // Hand this socket off to an appropriate processor + MasterSlaveWorkerThread workerThread = createWorkerThread(); + if (workerThread == null) { + try { + log.warn(sm.getString("endpoint.noProcessor")); + socket.close(); + } catch (IOException e) { + ; } + continue; } + workerThread.assign(socket); - // Process the connection - if (null != s) { - TcpConnection con = null; - int step = 1; - try { + // The processor will recycle itself when it finishes - // 1: Set socket options: timeout, linger, etc - endpoint.setSocketOptions(s); + } - // 2: SSL handshake - step = 2; - if (endpoint.getServerSocketFactory() != null) { - endpoint.getServerSocketFactory().handshake(s); - } + // Notify the threadStop() method that we have shut ourselves down + synchronized (threadSync) { + threadSync.notifyAll(); + } - // 3: Process the connection - step = 3; - con = (TcpConnection) perThrData[0]; - con.setEndpoint(endpoint); - con.setSocket(s); - endpoint.getConnectionHandler().processConnection( - con, - (Object[]) perThrData[1]); - - } catch (SocketException se) { - PoolTcpEndpoint.log.error( - "Remote Host " - + s.getInetAddress() - + " SocketException: " - + se.getMessage()); - // Try to close the socket - try { - s.close(); - } catch (IOException e) { - } - } catch (Throwable t) { - if (step == 2) { - PoolTcpEndpoint.log.debug("Handshake failed", t); - } else { - PoolTcpEndpoint.log.error("Unexpected error", t); - } - // Try to close the socket - try { - s.close(); - } catch (IOException e) { - } - } finally { - if (con != null) { - con.recycle(); - } - } - } + } + + + /** + * Start the background processing thread. + */ + private void threadStart() { + thread = new Thread(this, tp.getName()); + thread.setPriority(getThreadPriority()); + thread.setDaemon(true); + thread.start(); + } + + /** + * Stop the background processing thread. + */ + private void threadStop() { + try { + threadSync.wait(5000); + } catch (InterruptedException e) { + ; } + thread = null; } - + + } 1.1 jakarta-tomcat-connectors/util/java/org/apache/tomcat/util/net/LeaderFollowerWorkerThread.java Index: LeaderFollowerWorkerThread.java =================================================================== /* * Copyright 1999-2004 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.tomcat.util.net; import java.net.Socket; import org.apache.tomcat.util.threads.ThreadPoolRunnable; /* * I switched the threading model here. * * We used to have a "listener" thread and a "connection" * thread, this results in code simplicity but also a needless * thread switch. * * Instead I am now using a pool of threads, all the threads are * simmetric in their execution and no thread switch is needed. */ class LeaderFollowerWorkerThread implements ThreadPoolRunnable { /* This is not a normal Runnable - it gets attached to an existing thread, runs and when run() ends - the thread keeps running. It's better to keep the name ThreadPoolRunnable - avoid confusion. We also want to use per/thread data and avoid sync wherever possible. */ PoolTcpEndpoint endpoint; public LeaderFollowerWorkerThread(PoolTcpEndpoint endpoint) { this.endpoint = endpoint; } public Object[] getInitData() { // no synchronization overhead, but 2 array access Object obj[]=new Object[2]; obj[1]= endpoint.getConnectionHandler().init(); obj[0]=new TcpConnection(); return obj; } public void runIt(Object perThrData[]) { // Create per-thread cache if (endpoint.isRunning()) { // Loop if endpoint is paused while (endpoint.isPaused()) { try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore } } // Accept a new connection Socket s = null; try { s = endpoint.acceptSocket(); } finally { // Continue accepting on another thread... if (endpoint.isRunning()) { endpoint.tp.runIt(this); } } // Process the connection if (null != s) { endpoint.processSocket(s, (TcpConnection) perThrData[0], (Object[]) perThrData[1]); } } } } 1.1 jakarta-tomcat-connectors/util/java/org/apache/tomcat/util/net/MasterSlaveWorkerThread.java Index: MasterSlaveWorkerThread.java =================================================================== /* * Copyright 1999-2004 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.tomcat.util.net; import java.net.Socket; import org.apache.tomcat.util.threads.ThreadWithAttributes; /** * Regular master slave thread pool. Slave threads will wait for work. */ class MasterSlaveWorkerThread implements Runnable { protected PoolTcpEndpoint endpoint; protected String threadName; protected boolean stopped = false; private Object threadSync = new Object(); private Thread thread = null; private boolean available = false; private Socket socket = null; private TcpConnection con = new TcpConnection(); private Object[] threadData = null; public MasterSlaveWorkerThread(PoolTcpEndpoint endpoint, String threadName) { this.endpoint = endpoint; this.threadName = threadName; } /** * Process an incoming TCP/IP connection on the specified socket. Any * exception that occurs during processing must be logged and swallowed. * <b>NOTE</b>: This method is called from our Connector's thread. We * must assign it to our own thread so that multiple simultaneous * requests can be handled. * * @param socket TCP socket to process */ synchronized void assign(Socket socket) { // Wait for the Processor to get the previous Socket while (available) { try { wait(); } catch (InterruptedException e) { } } // Store the newly available Socket and notify our thread this.socket = socket; available = true; notifyAll(); } /** * Await a newly assigned Socket from our Connector, or <code>null</code> * if we are supposed to shut down. */ private synchronized Socket await() { // Wait for the Connector to provide a new Socket while (!available) { try { wait(); } catch (InterruptedException e) { } } // Notify the Connector that we have received this Socket Socket socket = this.socket; available = false; notifyAll(); return (socket); } /** * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. */ public void run() { // Process requests until we receive a shutdown signal while (!stopped) { // Wait for the next socket to be assigned Socket socket = await(); if (socket == null) continue; // Process the request from this socket endpoint.processSocket(socket, con, threadData); // Finish up this request endpoint.recycleWorkerThread(this); } // Tell threadStop() we have shut ourselves down successfully synchronized (threadSync) { threadSync.notifyAll(); } } /** * Start the background processing thread. */ public void start() { threadData = endpoint.getConnectionHandler().init(); thread = new ThreadWithAttributes(null, this); thread.setName(threadName); thread.setDaemon(true); thread.start(); } /** * Stop the background processing thread. */ public void stop() { stopped = true; assign(null); thread = null; threadData = null; } } 1.58 +16 -11 jakarta-tomcat-connectors/http11/src/java/org/apache/coyote/http11/Http11Protocol.java Index: Http11Protocol.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/http11/src/java/org/apache/coyote/http11/Http11Protocol.java,v retrieving revision 1.57 retrieving revision 1.58 diff -u -r1.57 -r1.58 --- Http11Protocol.java 29 Sep 2004 09:54:28 -0000 1.57 +++ Http11Protocol.java 1 Oct 2004 23:46:57 -0000 1.58 @@ -154,8 +154,13 @@ // XXX It should be possible to use a single TP tpOname=new ObjectName (domain + ":" + "type=ThreadPool,name=" + getName()); - Registry.getRegistry(null, null) - .registerComponent(tp, tpOname, null ); + if ("ms".equals(getStrategy())) { + Registry.getRegistry(null, null) + .registerComponent(ep, tpOname, null ); + } else { + Registry.getRegistry(null, null) + .registerComponent(tp, tpOname, null ); + } tp.setName(getName()); tp.setDaemon(false); tp.addThreadPoolListener(new MXPoolListener(this, tp)); @@ -242,15 +247,6 @@ // -------------------- Pool setup -------------------- - public boolean getPools(){ - return ep.isPoolOn(); - } - - public void setPools( boolean t ) { - ep.setPoolOn(t); - setAttribute("pools", "" + t); - } - public int getMaxThreads() { return ep.getMaxThreads(); } @@ -286,6 +282,15 @@ public int getThreadPriority() { return ep.getThreadPriority(); } + + public void setStrategy(String strategy) { + ep.setStrategy(strategy); + setAttribute("strategy", strategy); + } + + public String getStrategy() { + return ep.getStrategy(); + } // -------------------- Tcp setup --------------------
--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]