billbarker 2005/04/23 21:27:42 Modified: jk/java/org/apache/jk/common ChannelNioSocket.java Log: Give up on switching between blocking/non-blocking Sockets, also move the Accecpt into the Poller instead of its own thread. This is still very much experimental, and nobody should even dream of using it in production. Testing on Windows, it's very flakey. On Solaris, it's stable enough, but ChannelSocket is about 25% faster. Revision Changes Path 1.2 +231 -80 jakarta-tomcat-connectors/jk/java/org/apache/jk/common/ChannelNioSocket.java Index: ChannelNioSocket.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/jk/java/org/apache/jk/common/ChannelNioSocket.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- ChannelNioSocket.java 17 Apr 2005 03:41:08 -0000 1.1 +++ ChannelNioSocket.java 24 Apr 2005 04:27:42 -0000 1.2 @@ -18,19 +18,22 @@ import java.util.Set; import java.util.Iterator; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.nio.channels.Selector; import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; import java.nio.channels.ClosedSelectorException; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.ClosedChannelException; import java.net.URLEncoder; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; -import java.net.SocketException; import javax.management.ListenerNotFoundException; import javax.management.MBeanNotificationInfo; @@ -92,10 +95,10 @@ int maxPort=8019; // 0 for backward compat. int port=startPort; InetAddress inet; - int serverTimeout; + int serverTimeout = 0; boolean tcpNoDelay=true; // nodelay to true by default int linger=100; - int socketTimeout; + int socketTimeout = 0; private Selector selector = null; long requestCount=0; @@ -105,7 +108,6 @@ flush() is honored ( on my test, I got 367->433 RPS and 52->35ms average time with a simple servlet ) */ - static final boolean BUFFER_WRITE=false; ThreadPool tp=ThreadPool.createThreadPool(true); @@ -271,12 +273,12 @@ final int isNote=2; final int osNote=3; final int notifNote=4; - boolean paused = true; + boolean paused = false; public void pause() throws Exception { synchronized(this) { paused = true; - unLockSocket(); + //unLockSocket(); } } @@ -299,10 +301,11 @@ } } } - Socket s=sSocket.accept(); + SocketChannel sc=sSocket.getChannel().accept(); + Socket s = sc.socket(); ep.setNote( socketNote, s ); if(log.isDebugEnabled() ) - log.debug("Accepted socket " + s ); + log.debug("Accepted socket " + s +" channel " + sc.isBlocking()); if( linger > 0 ) s.setSoLinger( true, linger); if( socketTimeout > 0 ) @@ -312,12 +315,9 @@ requestCount++; - InputStream is=new BufferedInputStream(s.getInputStream()); - OutputStream os; - if( BUFFER_WRITE ) - os = new BufferedOutputStream( s.getOutputStream()); - else - os = s.getOutputStream(); + sc.configureBlocking(false); + InputStream is=new SocketInputStream(sc); + OutputStream os = new SocketOutputStream(sc); ep.setNote( isNote, is ); ep.setNote( osNote, os ); ep.setControl( tp ); @@ -349,19 +349,24 @@ } if (maxPort < startPort) maxPort = startPort; + ServerSocketChannel ssc = ServerSocketChannel.open(); + ssc.configureBlocking(false); for( int i=startPort; i<=maxPort; i++ ) { try { + InetSocketAddress iddr = null; if( inet == null ) { - sSocket = new ServerSocket( i, 0 ); + iddr = new InetSocketAddress( i); } else { - sSocket=new ServerSocket( i, 0, inet ); + iddr=new InetSocketAddress( inet, i); } + sSocket = ssc.socket(); + sSocket.bind(iddr); port=i; break; } catch( IOException ex ) { if(log.isInfoEnabled()) log.info("Port busy " + i + " " + ex.toString()); - continue; + sSocket = null; } } @@ -373,6 +378,7 @@ log.info("JK: ajp13 listening on " + getAddress() + ":" + port ); selector = Selector.open(); + ssc.register(selector, SelectionKey.OP_ACCEPT); // If this is not the base port and we are the 'main' channleSocket and // SHM didn't already set the localId - we'll set the instance id if( "channelNioSocket".equals( name ) && @@ -380,8 +386,6 @@ (wEnv.getLocalId()==0) ) { wEnv.setLocalId( port - startPort ); } - if( serverTimeout > 0 ) - sSocket.setSoTimeout( serverTimeout ); // XXX Reverse it -> this is a notification generator !! if( next==null && wEnv!=null ) { @@ -415,9 +419,8 @@ } tp.start(); - SocketAcceptor acceptAjp=new SocketAcceptor( ); - tp.runIt( acceptAjp); - + Poller pollAjp = new Poller(); + tp.runIt(pollAjp); } ObjectName tpOName; @@ -428,8 +431,6 @@ public void start() throws IOException{ if( sSocket==null ) init(); - Poller pollAjp = new Poller(); - tp.runIt(pollAjp); resume(); } @@ -460,24 +461,10 @@ public void close(MsgContext ep) throws IOException { Socket s=(Socket)ep.getNote( socketNote ); - s.close(); - } - - private void unLockSocket() throws IOException { - // Need to create a connection to unlock the accept(); - Socket s; - InetAddress ladr = inet; - - if(port == 0) - return; - if (ladr == null || "0.0.0.0".equals(ladr.getHostAddress())) { - ladr = InetAddress.getLocalHost(); + SelectionKey key = s.getChannel().keyFor(selector); + if(key != null) { + key.cancel(); } - s=new Socket(ladr, port ); - // setting soLinger to a small value will help shutdown the - // connection quicker - s.setSoLinger(true, 0); - s.close(); } @@ -489,12 +476,8 @@ return; tp.shutdown(); - if(!paused) { - unLockSocket(); - } - + selector.wakeup().close(); sSocket.close(); // XXX? - selector.close(); if( tpOName != null ) { Registry.getRegistry(null, null).unregisterComponent(tpOName); @@ -518,19 +501,27 @@ if(log.isTraceEnabled() ) log.trace("send() " + len + " " + buf[4] ); + if(buf[4] == HandlerRequest.JK_AJP13_END_RESPONSE ) { + // After this goes out, the client may send a new request + // before the thread finishes, so tell the Poller that the + // next read is new + Socket s = (Socket)ep.getNote(socketNote); + SelectionKey key = s.getChannel().keyFor(selector); + if(key != null) { + SocketConnection sc = (SocketConnection)key.attachment(); + sc.setFinished(); + } + } OutputStream os=(OutputStream)ep.getNote( osNote ); os.write( buf, 0, len ); + os.flush(); return len; } public int flush( Msg msg, MsgContext ep) throws IOException { - if( BUFFER_WRITE ) { - OutputStream os=(OutputStream)ep.getNote( osNote ); - os.flush(); - } return 0; } @@ -613,7 +604,7 @@ while(pos < len) { try { got = is.read(b, pos + offset, len - pos); - } catch(SocketException sex) { + } catch(ClosedChannelException sex) { if(pos > 0) { log.info("Error reading data after "+pos+"bytes",sex); } else { @@ -647,20 +638,20 @@ void acceptConnections() { if( log.isDebugEnabled() ) log.debug("Accepting ajp connections on " + port); - while( running ) { + if( running ) { try{ MsgContext ep=new MsgContext(); ep.setSource(this); ep.setWorkerEnv( wEnv ); this.accept(ep); - if( !running ) break; + if( !running ) return; // Since this is a long-running connection, we don't care // about the small GC SocketConnection ajpConn= new SocketConnection( ep); - tp.runIt( ajpConn ); + ajpConn.register(ep); }catch(Exception ex) { if (running) log.warn("Exception executing accept" ,ex); @@ -788,21 +779,6 @@ return notifInfo; } - protected class SocketAcceptor implements ThreadPoolRunnable { - - - SocketAcceptor( ) { - } - - public Object[] getInitData() { - return null; - } - - public void runIt(Object thD[]) { - acceptConnections(); - } - } - protected class SocketConnection implements ThreadPoolRunnable { MsgContext ep; MsgAjp recv = new MsgAjp(); @@ -818,19 +794,20 @@ } public void runIt(Object perTh[]) { - inProgress = true; - if(processConnection(ep)) { - register(ep); - } else { + if(!processConnection(ep)) { unregister(ep); } - inProgress = false; + setFinished(); } public boolean isRunning() { return inProgress; } + public synchronized void setFinished() { + inProgress = false; + } + /** Process a single ajp connection. */ boolean processConnection(MsgContext ep) { @@ -868,7 +845,31 @@ return true; } - void unregister(MsgContext ep) { + synchronized void process(SelectionKey sk) { + if(!sk.isValid()) { + return; + } + if(sk.isReadable()) { + if(!inProgress) { + inProgress = true; + tp.runIt(this); + } else { + Object is = ep.getNote(isNote); + synchronized(is) { + is.notify(); + } + } + } + if(sk.isWritable()) { + Object os = ep.getNote(osNote); + synchronized(os) { + os.notify(); + } + } + } + + + synchronized void unregister(MsgContext ep) { try{ close(ep); } catch(Exception e) { @@ -912,6 +913,8 @@ public void runIt(Object perTh[]) { while(running) { try { + if(log.isTraceEnabled()) + log.trace("Attempting to select "+selector.keys().size()+" items"); int ns = selector.select(); if(log.isTraceEnabled()) log.trace("Selecting "+ns+" channels"); @@ -920,22 +923,170 @@ Iterator it = sels.iterator(); while(it.hasNext()) { SelectionKey sk = (SelectionKey)it.next(); - SocketConnection sc = (SocketConnection)sk.attachment(); if(sk.isValid()) { - sk.cancel(); // somebody else's problem now - tp.runIt(sc); + if(sk.isAcceptable()) { + acceptConnections(); + } else { + SocketConnection sc = (SocketConnection)sk.attachment(); + sc.process(sk); + } + } else { + sk.cancel(); } + it.remove(); } } } catch(ClosedSelectorException cse) { log.debug("Selector is closed"); return; + } catch(CancelledKeyException cke) { + log.debug("Key Cancelled", cke); } catch(IOException iex) { log.warn("IO Error in select",iex); + } catch(Exception ex) { + log.warn("Error processing select",ex); } } } } + protected class SocketInputStream extends InputStream { + final int BUFFER_SIZE = 8200; + ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE); + SocketChannel channel; + + SocketInputStream(SocketChannel channel) { + this.channel = channel; + buffer.limit(0); + } + + public int available() { + return buffer.remaining(); + } + + public void mark(int readlimit) { + buffer.mark(); + } + + public boolean markSupported() { + return true; + } + + public void reset() { + buffer.reset(); + } + + public int read() throws IOException { + if(!checkAvailable(1)) { + if(fill(1) < 0) { + return -1; + } + } + return buffer.get(); + } + + private boolean checkAvailable(int nbyte) throws IOException { + return buffer.remaining() >= nbyte; + } + + private synchronized int fill(int nbyte) throws IOException { + int rem = nbyte; + int read = 0; + boolean eof = false; + buffer.clear(); + while(rem > 0) { + int count = channel.read(buffer); + if(count < 0) { + eof = true; + break; + } else if(count == 0) { + if(log.isTraceEnabled()) + log.trace("Blocking Read for "+rem+" bytes"); + try { + wait(); + }catch(InterruptedException iex) { + // ignore since can't happen + } + } + read += count; + rem -= count; + } + buffer.flip(); + return eof ? -1 : read; + } + + public int read(byte [] data) throws IOException { + return read(data, 0, data.length); + } + + public int read(byte [] data, int offset, int len) throws IOException { + int olen = len; + if(!checkAvailable(len)) { + int avail = buffer.remaining(); + if(avail > 0) { + buffer.get(data, offset, avail); + } + len -= avail; + offset += avail; + if(fill(len) < 0) { + return avail > 0 ? avail : -1; + } + } + buffer.get(data, offset, len); + return olen; + } + } + + protected class SocketOutputStream extends OutputStream { + final int BUFFER_SIZE = 8200; + ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE); + SocketChannel channel; + + SocketOutputStream(SocketChannel channel) { + this.channel = channel; + } + + public void write(int b) throws IOException { + if(!checkAvailable(1)) { + flush(); + } + buffer.put((byte)b); + } + + public void write(byte [] data) throws IOException { + write(data, 0, data.length); + } + + public void write(byte [] data, int offset, int len) throws IOException { + if(!checkAvailable(len)) { + flush(); + } + buffer.put(data, offset, len); + } + + public synchronized void flush() throws IOException { + buffer.flip(); + while(buffer.hasRemaining()) { + int count = channel.write(buffer); + if(count == 0) { + SelectionKey key = channel.keyFor(selector); + key.interestOps(SelectionKey.OP_WRITE); + log.debug("Blocking for channel write: "+buffer.remaining()); + try { + wait(); + } catch(InterruptedException iex) { + // ignore, since can't happen + } + key.interestOps(SelectionKey.OP_READ); + } + } + buffer.clear(); + } + + private boolean checkAvailable(int len) { + return buffer.remaining() > len; + } + } + }
--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]