jfclere 02/01/05 02:03:43 Modified: jk/java/org/apache/jk/server JkMain.java jk/native2 build.xml Added: jk/java/org/apache/jk/common ChannelUnixSocket.java jk/native2/tomcat UnixSocket.c Log: Add code for AF_UNIX sockets. Revision Changes Path 1.1 jakarta-tomcat-connectors/jk/java/org/apache/jk/common/ChannelUnixSocket.java Index: ChannelUnixSocket.java =================================================================== /* * ==================================================================== * * The Apache Software License, Version 1.1 * * Copyright (c) 1999 The Apache Software Foundation. All rights * reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, if * any, must include the following acknowlegement: * "This product includes software developed by the * Apache Software Foundation (http://www.apache.org/)." * Alternately, this acknowlegement may appear in the software itself, * if and wherever such third-party acknowlegements normally appear. * * 4. The names "The Jakarta Project", "Tomcat", and "Apache Software * Foundation" must not be used to endorse or promote products derived * from this software without prior written permission. For written * permission, please contact [EMAIL PROTECTED] * * 5. Products derived from this software may not be called "Apache" * nor may "Apache" appear in their names without prior written * permission of the Apache Group. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. * * [Additional notices, if required by prior licensing conditions] * */ package org.apache.jk.common; import java.io.*; import java.net.*; import java.util.*; import org.apache.tomcat.util.buf.*; import org.apache.tomcat.util.http.*; import org.apache.tomcat.util.threads.*; import org.apache.jk.core.*; /* XXX Make the 'message type' pluggable */ /* A lot of the 'original' behavior is hardcoded - this uses Ajp13 wire protocol, TCP, Ajp14 API etc. As we add other protocols/transports/APIs this will change, the current goal is to get the same level of functionality as in the original jk connector. */ /** * Jk2 can use multiple protocols/transports. * Various container adapters should load this object ( as a bean ), * set configurations and use it. Note that the connector will handle * all incoming protocols - it's not specific to ajp1x. The protocol * is abstracted by Endpoint/Message/Channel. */ /** Accept ( and send ) messages via Unix Socket (AF_UNIX). * The AF_UNIX is not supported by JAVA so we need a piece of native code. * * @author Costin Manolache * @author Jean-Frederic Clere (Well, I have copied Costin's code and ideas). */ public class ChannelUnixSocket extends Channel { /* XXX do not have port/Address */ // int port; // InetAddress inet; int serverTimeout; boolean tcpNoDelay; int linger=100; int socketTimeout; Worker worker; ThreadPool tp=new ThreadPool(); /* ==================== socket options ==================== */ public ThreadPool getThreadPool() { return tp; } public void setPort( int port ) { // this.port=port; } public void setWorker( Worker w ) { worker=w; } public Worker getWorker() { return worker; } public void setAddress(InetAddress inet) { // this.inet=inet; // XXX we have to set the filename for the AF_UNIX socket. } /** * Sets the timeout in ms of the server sockets created by this * server. This method allows the developer to make servers * more or less responsive to having their server sockets * shut down. * * <p>By default this value is 1000ms. */ public void setServerTimeout(int timeout) { this.serverTimeout = timeout; } public void setTcpNoDelay( boolean b ) { tcpNoDelay=b; } public void setSoLinger( int i ) { linger=i; } public void setSoTimeout( int i ) { socketTimeout=i; } UnixSocketServer sSocket; // File descriptor of the AF_UNIX server socket. /* ==================== ==================== */ int socketNote=1; // File descriptor. int isNote=2; // Input Stream. int osNote=3; // Output Stream. public void accept( Endpoint ep ) throws IOException { UnixSocket s =sSocket.accept(); ep.setNote( socketNote, s ); if(dL>0 ) d("Accepted socket " + s ); if( linger > 0 ) s.setSoLinger( true, linger); InputStream is=new UnixSocketIn(s.fd); OutputStream os=new UnixSocketOut(s.fd); ep.setNote( isNote, is ); ep.setNote( osNote, os ); } public void init() throws IOException { // I have put it here... // It load libunixsocket.so in jre/lib/i386 in my Linux using a Sun JVM. System.loadLibrary("unixsocket"); sSocket=new UnixSocketServer(); // port ); if( serverTimeout > 0 ) sSocket.setSoTimeout( serverTimeout ); // Run a thread that will accept connections. tp.start(); SocketAcceptor acceptAjp=new SocketAcceptor( this ); tp.runIt( acceptAjp); } public void open(Endpoint ep) throws IOException { } public void close(Endpoint ep) throws IOException { Socket s=(Socket)ep.getNote( socketNote ); s.close(); } public void destroy() throws IOException { try { tp.shutdown(); // Need to create a connection to unlock the accept(); // I do not if this is needed but the file should be removed. /* Socket s; if (inet == null) { s=new Socket("127.0.0.1", port ); }else{ s=new Socket(inet, port ); // setting soLinger to a small value will help shutdown the // connection quicker s.setSoLinger(true, 0); } s.close(); */ sSocket.close(); // XXX? } catch(Exception e) { e.printStackTrace(); } } public void write( Endpoint ep, byte[] b, int offset, int len) throws IOException { OutputStream os=(OutputStream)ep.getNote( osNote ); os.write( b, offset, len ); } /** * Read N bytes from the InputStream, and ensure we got them all * Under heavy load we could experience many fragmented packets * just read Unix Network Programming to recall that a call to * read didn't ensure you got all the data you want * * from read() Linux manual * * On success, the number of bytes read is returned (zero indicates end of file), * and the file position is advanced by this number. * It is not an error if this number is smaller than the number of bytes requested; * this may happen for example because fewer bytes * are actually available right now (maybe because we were close to end-of-file, * or because we are reading from a pipe, or from a * terminal), or because read() was interrupted by a signal. * On error, -1 is returned, and errno is set appropriately. In this * case it is left unspecified whether the file position (if any) changes. * **/ public int read( Endpoint ep, byte[] b, int offset, int len) throws IOException { InputStream is=(InputStream)ep.getNote( isNote ); int pos = 0; int got; if (dL > 5) { d("reading # " + b + " " + (b==null ? 0: b.length) + " " + offset + " " + len); } while(pos < len) { got = is.read(b, pos + offset, len - pos); if (dL > 5) { d("read got # " + got); } // connection just closed by remote. if (got <= 0) { // This happens periodically, as apache restarts // periodically. // It should be more gracefull ! - another feature for Ajp14 return -3; } pos += got; } return pos; } public Endpoint createEndpoint() { return new Endpoint(); } boolean running=true; /** Accept incoming connections, dispatch to the thread pool */ void acceptConnections() { if( dL>0 ) d("Accepting ajp connections"); while( running ) { try { Endpoint ep=this.createEndpoint(); this.accept(ep); SocketConnection ajpConn= new SocketConnection(this, ep); tp.runIt( ajpConn ); } catch( Exception ex ) { ex.printStackTrace(); } } } /** Process a single ajp connection. */ void processConnection(Endpoint ep) { if( dL > 0 ) d( "New ajp connection "); try { MsgAjp recv=new MsgAjp(); while( running ) { recv.receive( this, ep ); int status=we.processCallbacks( this, ep, recv ); } this.close( ep ); } catch( Exception ex ) { ex.printStackTrace(); } } private static final int dL=10; private static void d(String s ) { System.err.println( "ChannelUnixSocket: " + s ); } } class SocketAcceptor implements ThreadPoolRunnable { ChannelUnixSocket wajp; SocketAcceptor(ChannelUnixSocket wajp ) { this.wajp=wajp; } public Object[] getInitData() { return null; } public void runIt(Object thD[]) { wajp.acceptConnections(); } } class SocketConnection implements ThreadPoolRunnable { ChannelUnixSocket wajp; Endpoint ep; SocketConnection(ChannelUnixSocket wajp, Endpoint ep) { this.wajp=wajp; this.ep=ep; } public Object[] getInitData() { return null; } public void runIt(Object perTh[]) { wajp.processConnection(ep); } } /** * Native AF_UNIX socket server */ class UnixSocketServer { private int fd; // From socket + bind. private static native int createSocketNative (String filename); private static native int acceptNative (int fd); private static native int closeNative (int fd); private static native int setSoTimeoutNative (int fd,int value); public UnixSocketServer() throws IOException { fd = createSocketNative("/usr/tmp/apache_socket"); if (fd<0) throw new IOException(); } public UnixSocket accept() throws IOException { UnixSocket socket = new UnixSocket(); socket.fd = acceptNative(this.fd); if (socket.fd<0) throw new IOException(); return socket; } public void setSoTimeout(int value) throws SocketException { if (setSoTimeoutNative(this.fd, value)<0) throw new SocketException(); } public void close() { closeNative(this.fd); } } /** * Native AF_UNIX socket */ class UnixSocket { public int fd; // From accept. private static native int setSoLingerNative (int fd, int l_onoff, int l_linger); public void setSoLinger (boolean bool ,int l_linger) throws IOException { int l_onoff=0; if (bool) l_onoff = 1; if (setSoLingerNative(fd,l_onoff,l_linger)<0) throw new IOException(); } } /** * Provide an InputStream for the native socket. */ class UnixSocketIn extends InputStream { private int fd; // From openNative. private static native int readNative (int fd, byte buf [], int len); public int read() throws IOException { byte buf [] = new byte[1]; if (readNative(fd,buf,1) < 0) throw new IOException(); return 0xff & buf[0]; } public UnixSocketIn(int fd) { this.fd = fd; } } /** * Provide an OutputStream for the native socket. */ class UnixSocketOut extends OutputStream { private int fd; // From openNative. private static native int writeNative (int fd, byte buf [], int len); public void write(int value) throws IOException { byte buf [] = new byte[] { (byte) value }; if (writeNative(fd,buf,1) < 0) throw new IOException(); } public UnixSocketOut(int fd) { this.fd = fd; } } 1.2 +3 -0 jakarta-tomcat-connectors/jk/java/org/apache/jk/server/JkMain.java Index: JkMain.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/jk/java/org/apache/jk/server/JkMain.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- JkMain.java 31 Dec 2001 19:09:59 -0000 1.1 +++ JkMain.java 5 Jan 2002 10:03:43 -0000 1.2 @@ -87,7 +87,10 @@ public void start() throws IOException { ChannelSocket csocket=new ChannelSocket(); csocket.setPort( 8009 ); + /* + ChannelUnixSocket csocket=new ChannelUnixSocket(); // JFC tests wEnv.addChannel( csocket ); + */ WorkerDummy wdummy=new WorkerDummy(); csocket.setWorker( wdummy ); 1.9 +30 -1 jakarta-tomcat-connectors/jk/native2/build.xml Index: build.xml =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/jk/native2/build.xml,v retrieving revision 1.8 retrieving revision 1.9 diff -u -r1.8 -r1.9 --- build.xml 31 Dec 2001 22:25:03 -0000 1.8 +++ build.xml 5 Jan 2002 10:03:43 -0000 1.9 @@ -43,7 +43,7 @@ <!-- ==================== Targets ==================== --> - <target name="main" depends="init,apache20,jni,apache13"> + <target name="main" depends="init,apache20,jni,apache13,unixsocket"> </target> <target name="init" > @@ -406,4 +406,33 @@ </delete> </target> + <target name="unixsocket" depends="init" if="useunixsocket"> + <mkdir dir="${jk.build}/WEB-INF/jk2/unixsocket" /> + <so sofile="libunixsocket" + buildDir="${jk.build}/WEB-INF/jk2/unixsocket" + optimize="${so.optimize}" + debug="${so.debug}" + profile="${so.profile}"> + + <src dir="."> + <include name="tomcat/UnixSocket.c" /> + <!-- When APR will provide the AF_UNIX socket support --> + <!-- + <include name="common/apr/*.c" if="HAVE_APR" /> + --> + </src> + <includes> + <include name="${java.home}/../include" /> + + <!-- Platform specific includes --> + <include name="${java.home}/../include/netware" if="netware" /> + <include name="${java.home}/../include/win32" if="win32" /> + <include name="${java.home}/../include/linux" if="linux" /> + </includes> + <depends> + <fileset dir="${native.dir}/common" includes="*.h" /> + </depends> + + </so> + </target> </project> 1.1 jakarta-tomcat-connectors/jk/native2/tomcat/UnixSocket.c Index: UnixSocket.c =================================================================== #include <sys/types.h> #include <sys/stat.h> #include <sys/socket.h> #include <fcntl.h> #include <errno.h> #include <sys/un.h> /* for sockaddr_un */ #include <unistd.h> #include <jni.h> /* * Native routines for * package org.apache.jk.common */ /** * createSocketNative: Creates a AF_UNIX socketserver. */ JNIEXPORT jint JNICALL Java_org_apache_jk_common_UnixSocketServer_createSocketNative ( JNIEnv *env, jobject ignored, jstring filename ) { int sd; const char *real_filename; jboolean flag; struct sockaddr_un unix_addr; mode_t omask; int rc; // Convert java encoding in machine one. real_filename = (*env)->GetStringUTFChars (env, filename, &flag); if (real_filename == 0) return -EINVAL; // remove the exist socket. if (unlink(real_filename) < 0 && errno != ENOENT) { // The socket cannot be remove... Well I hope that no problems ;-) } // create the socket. sd = socket(AF_UNIX, SOCK_STREAM, 0); if (sd<0) return -errno; memset(&unix_addr, 0, sizeof(unix_addr)); unix_addr.sun_family = AF_UNIX; strcpy(unix_addr.sun_path, real_filename); omask = umask(0117); /* so that only Apache can use socket */ rc = bind(sd, (struct sockaddr *)&unix_addr, sizeof(unix_addr)); umask(omask); /* can't fail, so can't clobber errno */ if (rc<0) return -errno; if (listen(sd, 100) < 0) { return -errno; } (*env)->ReleaseStringUTFChars (env, filename, real_filename); return sd; } /** * accept(). Accepts incomming calls on socket. */ JNIEXPORT jint JNICALL Java_org_apache_jk_common_UnixSocketServer_acceptNative ( JNIEnv *env, jobject ignored, jint sd ) { struct sockaddr_un unix_addr; int sd2; int len; len = sizeof(unix_addr); sd2 = accept(sd, (struct sockaddr *)&unix_addr, &len); if (sd2<0) return - errno; return(sd2); } /** * set the socket timeout (read/write) or accept() ? */ JNIEXPORT jint JNICALL Java_org_apache_jk_common_UnixSocketServer_setSoTimeoutNative ( JNIEnv *env, jobject ignored, jint sd ) { // setsockopt or setitimer? } /** * close the socket (remove the file?) */ JNIEXPORT jint JNICALL Java_org_apache_jk_common_UnixSocketServer_closeNative ( JNIEnv *env, jobject ignored, jint sd ) { close(sd); } /** * setSoLinger */ JNIEXPORT jint JNICALL Java_org_apache_jk_common_UnixSocket_setSoLingerNative ( JNIEnv *env, jobject ignored, jint sd, jint l_onoff, jint l_linger ) { struct linger { int l_onoff; /* linger active */ int l_linger; /* how many seconds to linger for */ } lin; lin.l_onoff = l_onoff; lin.l_linger = l_linger; if (setsockopt(sd, SOL_SOCKET, SO_LINGER, &lin, sizeof(lin))<0) return -errno; return 0; } /** * read from the socket. */ JNIEXPORT jint JNICALL Java_org_apache_jk_common_UnixSocketIn_readNative ( JNIEnv *env, jobject ignored, jint fd, jbyteArray buf, jint len ) { int retval; jbyte *buffer; jboolean isCopy; buffer = (*env)->GetByteArrayElements (env, buf, &isCopy); retval = read(fd,buffer,len); (*env)->ReleaseByteArrayElements (env, buf, buffer, 0); return retval; } /** * write to the socket. */ JNIEXPORT jint JNICALL Java_org_apache_jk_common_UnixSocketOut_writeNative ( JNIEnv *env, jobject ignored, jint fd, jbyteArray buf, jint len ) { int retval; jbyte *buffer; jboolean isCopy; buffer = (*env)->GetByteArrayElements (env, buf, &isCopy); retval = write(fd,buffer,len); (*env)->ReleaseByteArrayElements (env, buf, buffer, 0); return retval; }
-- To unsubscribe, e-mail: <mailto:[EMAIL PROTECTED]> For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>