costin      2003/02/16 18:17:15

  Modified:    jk/java/org/apache/jk/common ChannelSocket.java
  Log:
  More JMX code. The channel will act as a standalone mbean receiving messages and
  dispatching via either direct call ( JkHandler ) or JMX notifications.
  
  Also few cosmetic changes ( better name for the thread pool mbean, less verbosity )
  
  Revision  Changes    Path
  1.34      +80 -6     
jakarta-tomcat-connectors/jk/java/org/apache/jk/common/ChannelSocket.java
  
  Index: ChannelSocket.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-connectors/jk/java/org/apache/jk/common/ChannelSocket.java,v
  retrieving revision 1.33
  retrieving revision 1.34
  diff -u -r1.33 -r1.34
  --- ChannelSocket.java        28 Jan 2003 05:42:24 -0000      1.33
  +++ ChannelSocket.java        17 Feb 2003 02:17:15 -0000      1.34
  @@ -66,6 +66,13 @@
   
   import org.apache.jk.core.*;
   import org.apache.commons.modeler.Registry;
  +import javax.management.NotificationBroadcaster;
  +import javax.management.NotificationBroadcasterSupport;
  +import javax.management.NotificationListener;
  +import javax.management.NotificationFilter;
  +import javax.management.ListenerNotFoundException;
  +import javax.management.MBeanNotificationInfo;
  +import javax.management.Notification;
   
   
   /* XXX Make the 'message type' pluggable
  @@ -96,7 +103,7 @@
    * @jmx:notification-handler name="org.apache.jk.JK_RECEIVE_PACKET
    * @jmx:notification-handler name="org.apache.jk.JK_FLUSH
    */
  -public class ChannelSocket extends JkHandler {
  +public class ChannelSocket extends JkHandler implements NotificationBroadcaster {
       private static org.apache.commons.logging.Log log=
           org.apache.commons.logging.LogFactory.getLog( ChannelSocket.class );
   
  @@ -263,6 +270,7 @@
       int socketNote=1;
       int isNote=2;
       int osNote=3;
  +    int notifNote=4;
   
       public void accept( MsgContext ep ) throws IOException {
           if( sSocket==null ) return;
  @@ -301,7 +309,7 @@
           destroy();
           init();
       }
  -    
  +
       /**
        * @jmx:managed-operation
        */
  @@ -344,7 +352,8 @@
           if( serverTimeout > 0 )
               sSocket.setSoTimeout( serverTimeout );
   
  -        if( next==null ) {
  +        // XXX Reverse it -> this is a notification generator !!
  +        if( next==null && wEnv!=null ) {
               if( nextName!=null )
                   setNext( wEnv.getHandler( nextName ) );
               if( next==null )
  @@ -360,17 +369,27 @@
           if( this.domain != null ) {
               try {
                   Registry.getRegistry().registerComponent(tp, domain,"ThreadPool",
  -                        "type=ThreadPool,worker=jk,name=Jk" + port);
  +                        "type=ThreadPool,name=jk" + port);
               } catch (Exception e) {
                   log.error("Can't register threadpool" );
               }
           }
   
  +        // XXX Move to start, make sure the caller calls start
           tp.start();
           SocketAcceptor acceptAjp=new SocketAcceptor(  this );
           tp.runIt( acceptAjp);
       }
   
  +    public void start() throws IOException{
  +        if( sSocket==null )
  +            init();
  +    }
  +
  +    public void stop() throws IOException {
  +        destroy();
  +    }
  +
       public void open(MsgContext ep) throws IOException {
       }
   
  @@ -563,7 +582,7 @@
                   int status= this.receive( recv, ep );
                   if( status <= 0 ) {
                       if( status==-3)
  -                        log.info( "server has been restarted or reset this 
connection" );
  +                        log.debug( "server has been restarted or reset this 
connection" );
                       else 
                           log.warn("Closing ajp connection " + status );
                       break;
  @@ -571,6 +590,7 @@
                   ep.setLong( MsgContext.TIMER_RECEIVED, System.currentTimeMillis());
                   
                   ep.setType( 0 );
  +                // Will call next
                   status= this.invoke( recv, ep );
                   if( status!= JkHandler.OK ) {
                       log.warn("processCallbacks status " + status );
  @@ -601,11 +621,13 @@
           }
       }
   
  +    // XXX This should become handleNotification
       public int invoke( Msg msg, MsgContext ep ) throws IOException {
           int type=ep.getType();
   
           switch( type ) {
           case JkHandler.HANDLE_RECEIVE_PACKET:
  +            if( log.isDebugEnabled()) log.debug("RECEIVE_PACKET ?? ");
               return receive( msg, ep );
           case JkHandler.HANDLE_SEND_PACKET:
               return send( msg, ep );
  @@ -613,7 +635,26 @@
               return flush( msg, ep );
           }
   
  -        return next.invoke( msg, ep );
  +        if( log.isDebugEnabled() )
  +            log.debug("Call next " + type + " " + next);
  +
  +        // Send notification
  +        if( nSupport!=null ) {
  +            Notification notif=(Notification)ep.getNote(notifNote);
  +            if( notif==null ) {
  +                notif=new Notification("channelSocket.message", ep, requestCount );
  +                ep.setNote( notifNote, notif);
  +            }
  +            nSupport.sendNotification(notif);
  +        }
  +
  +        if( next != null ) {
  +            return next.invoke( msg, ep );
  +        } else {
  +            log.info("No next ");
  +        }
  +
  +        return OK;
       }
       
       public boolean isSameAddress(MsgContext ep) {
  @@ -654,6 +695,39 @@
                return (false);
        }
        return (true);
  +    }
  +
  +    public void sendNewMessageNotification(Notification notification) {
  +        if( nSupport!= null )
  +            nSupport.sendNotification(notification);
  +    }
  +
  +    private NotificationBroadcasterSupport nSupport= null;
  +
  +    public void addNotificationListener(NotificationListener listener,
  +                                        NotificationFilter filter,
  +                                        Object handback)
  +            throws IllegalArgumentException
  +    {
  +        if( nSupport==null ) nSupport=new NotificationBroadcasterSupport();
  +        nSupport.addNotificationListener(listener, filter, handback);
  +    }
  +
  +    public void removeNotificationListener(NotificationListener listener)
  +            throws ListenerNotFoundException
  +    {
  +        if( nSupport!=null)
  +            nSupport.removeNotificationListener(listener);
  +    }
  +
  +    MBeanNotificationInfo notifInfo[]=new MBeanNotificationInfo[0];
  +
  +    public void setNotificationInfo( MBeanNotificationInfo info[]) {
  +        this.notifInfo=info;
  +    }
  +
  +    public MBeanNotificationInfo[] getNotificationInfo() {
  +        return notifInfo;
       }
   }
   
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to