costin 2003/02/16 18:17:15 Modified: jk/java/org/apache/jk/common ChannelSocket.java Log: More JMX code. The channel will act as a standalone mbean receiving messages and dispatching via either direct call ( JkHandler ) or JMX notifications. Also few cosmetic changes ( better name for the thread pool mbean, less verbosity ) Revision Changes Path 1.34 +80 -6 jakarta-tomcat-connectors/jk/java/org/apache/jk/common/ChannelSocket.java Index: ChannelSocket.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/jk/java/org/apache/jk/common/ChannelSocket.java,v retrieving revision 1.33 retrieving revision 1.34 diff -u -r1.33 -r1.34 --- ChannelSocket.java 28 Jan 2003 05:42:24 -0000 1.33 +++ ChannelSocket.java 17 Feb 2003 02:17:15 -0000 1.34 @@ -66,6 +66,13 @@ import org.apache.jk.core.*; import org.apache.commons.modeler.Registry; +import javax.management.NotificationBroadcaster; +import javax.management.NotificationBroadcasterSupport; +import javax.management.NotificationListener; +import javax.management.NotificationFilter; +import javax.management.ListenerNotFoundException; +import javax.management.MBeanNotificationInfo; +import javax.management.Notification; /* XXX Make the 'message type' pluggable @@ -96,7 +103,7 @@ * @jmx:notification-handler name="org.apache.jk.JK_RECEIVE_PACKET * @jmx:notification-handler name="org.apache.jk.JK_FLUSH */ -public class ChannelSocket extends JkHandler { +public class ChannelSocket extends JkHandler implements NotificationBroadcaster { private static org.apache.commons.logging.Log log= org.apache.commons.logging.LogFactory.getLog( ChannelSocket.class ); @@ -263,6 +270,7 @@ int socketNote=1; int isNote=2; int osNote=3; + int notifNote=4; public void accept( MsgContext ep ) throws IOException { if( sSocket==null ) return; @@ -301,7 +309,7 @@ destroy(); init(); } - + /** * @jmx:managed-operation */ @@ -344,7 +352,8 @@ if( serverTimeout > 0 ) sSocket.setSoTimeout( serverTimeout ); - if( next==null ) { + // XXX Reverse it -> this is a notification generator !! + if( next==null && wEnv!=null ) { if( nextName!=null ) setNext( wEnv.getHandler( nextName ) ); if( next==null ) @@ -360,17 +369,27 @@ if( this.domain != null ) { try { Registry.getRegistry().registerComponent(tp, domain,"ThreadPool", - "type=ThreadPool,worker=jk,name=Jk" + port); + "type=ThreadPool,name=jk" + port); } catch (Exception e) { log.error("Can't register threadpool" ); } } + // XXX Move to start, make sure the caller calls start tp.start(); SocketAcceptor acceptAjp=new SocketAcceptor( this ); tp.runIt( acceptAjp); } + public void start() throws IOException{ + if( sSocket==null ) + init(); + } + + public void stop() throws IOException { + destroy(); + } + public void open(MsgContext ep) throws IOException { } @@ -563,7 +582,7 @@ int status= this.receive( recv, ep ); if( status <= 0 ) { if( status==-3) - log.info( "server has been restarted or reset this connection" ); + log.debug( "server has been restarted or reset this connection" ); else log.warn("Closing ajp connection " + status ); break; @@ -571,6 +590,7 @@ ep.setLong( MsgContext.TIMER_RECEIVED, System.currentTimeMillis()); ep.setType( 0 ); + // Will call next status= this.invoke( recv, ep ); if( status!= JkHandler.OK ) { log.warn("processCallbacks status " + status ); @@ -601,11 +621,13 @@ } } + // XXX This should become handleNotification public int invoke( Msg msg, MsgContext ep ) throws IOException { int type=ep.getType(); switch( type ) { case JkHandler.HANDLE_RECEIVE_PACKET: + if( log.isDebugEnabled()) log.debug("RECEIVE_PACKET ?? "); return receive( msg, ep ); case JkHandler.HANDLE_SEND_PACKET: return send( msg, ep ); @@ -613,7 +635,26 @@ return flush( msg, ep ); } - return next.invoke( msg, ep ); + if( log.isDebugEnabled() ) + log.debug("Call next " + type + " " + next); + + // Send notification + if( nSupport!=null ) { + Notification notif=(Notification)ep.getNote(notifNote); + if( notif==null ) { + notif=new Notification("channelSocket.message", ep, requestCount ); + ep.setNote( notifNote, notif); + } + nSupport.sendNotification(notif); + } + + if( next != null ) { + return next.invoke( msg, ep ); + } else { + log.info("No next "); + } + + return OK; } public boolean isSameAddress(MsgContext ep) { @@ -654,6 +695,39 @@ return (false); } return (true); + } + + public void sendNewMessageNotification(Notification notification) { + if( nSupport!= null ) + nSupport.sendNotification(notification); + } + + private NotificationBroadcasterSupport nSupport= null; + + public void addNotificationListener(NotificationListener listener, + NotificationFilter filter, + Object handback) + throws IllegalArgumentException + { + if( nSupport==null ) nSupport=new NotificationBroadcasterSupport(); + nSupport.addNotificationListener(listener, filter, handback); + } + + public void removeNotificationListener(NotificationListener listener) + throws ListenerNotFoundException + { + if( nSupport!=null) + nSupport.removeNotificationListener(listener); + } + + MBeanNotificationInfo notifInfo[]=new MBeanNotificationInfo[0]; + + public void setNotificationInfo( MBeanNotificationInfo info[]) { + this.notifInfo=info; + } + + public MBeanNotificationInfo[] getNotificationInfo() { + return notifInfo; } }
--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]