fhanik      2004/06/04 13:22:27

  Modified:    catalina/src/share/org/apache/catalina/startup
                        ClusterRuleSet.java
               modules/cluster/src/share/org/apache/catalina/cluster
                        CatalinaCluster.java
               modules/cluster/src/share/org/apache/catalina/cluster/session
                        DeltaManager.java SimpleTcpReplicationManager.java
               modules/cluster/src/share/org/apache/catalina/cluster/tcp
                        SimpleTcpCluster.java
  Added:       modules/cluster/src/share/org/apache/catalina/cluster
                        ClusterDeployer.java
               modules/cluster/src/share/org/apache/catalina/cluster/deploy
                        FarmWarDeployer.java FileChangeListener.java
                        FileMessage.java FileMessageFactory.java
                        UndeployMessage.java WarWatcher.java
  Log:
  Added in the ability to do farm deployment/undeployment of war files to all members 
in the cluster
  Upon startup, no state is replicated.
  Also, I wanna refator some messaging before we move on, I do that once we have a 
branch
  
  Revision  Changes    Path
  1.3       +10 -1     
jakarta-tomcat-catalina/catalina/src/share/org/apache/catalina/startup/ClusterRuleSet.java
  
  Index: ClusterRuleSet.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/catalina/src/share/org/apache/catalina/startup/ClusterRuleSet.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- ClusterRuleSet.java       27 Feb 2004 14:58:48 -0000      1.2
  +++ ClusterRuleSet.java       4 Jun 2004 20:22:27 -0000       1.3
  @@ -115,6 +115,15 @@
           digester.addSetNext(prefix + "Valve",
                               "addValve",
                               "org.apache.catalina.Valve");
  +        
  +        digester.addObjectCreate(prefix + "Deployer",
  +                                 null, // MUST be specified in the element
  +                                 "className");
  +        digester.addSetProperties(prefix + "Deployer");
  +        digester.addSetNext(prefix + "Deployer",
  +                            "setClusterDeployer",
  +                            "org.apache.catalina.cluster.ClusterDeployer");
  +
   
   
           //Cluster configuration end
  
  
  
  1.6       +15 -1     
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/CatalinaCluster.java
  
  Index: CatalinaCluster.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/CatalinaCluster.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- CatalinaCluster.java      29 May 2004 02:36:12 -0000      1.5
  +++ CatalinaCluster.java      4 Jun 2004 20:22:27 -0000       1.6
  @@ -23,6 +23,7 @@
   import org.apache.catalina.Logger;
   import org.apache.catalina.Valve;
   import org.apache.commons.logging.Log;
  +import org.apache.catalina.Manager;
   
   /**
    * A <b>CatalinaCluster</b> interface allows to plug in and out the 
  @@ -78,6 +79,12 @@
        */
       public Member[] getMembers();
       
  +    /**
  +     * Return the member that represents this node.
  +     * @return Member
  +     */
  +    public Member getLocalMember();
  +    
       public void setClusterSender(ClusterSender sender);
       
       public ClusterSender getClusterSender();
  @@ -95,5 +102,12 @@
       public void addClusterListener(MessageListener listener);
       
       public void removeClusterListener(MessageListener listener);
  +    
  +    public void setClusterDeployer(ClusterDeployer deployer);
  +    
  +    public ClusterDeployer getClusterDeployer();
  +    
  +    public Manager getManager(String name);
  +    public void removeManager(String name);
       
   }
  
  
  
  1.1                  
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/ClusterDeployer.java
  
  Index: ClusterDeployer.java
  ===================================================================
  /*
   * Copyright 1999,2004 The Apache Software Foundation.
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *      http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  
  package org.apache.catalina.cluster;
  
  /**
   * A <b>ClusterDeployer</b> interface allows to plug in and out the
   * different deployment implementations
   *
   * @author Filip Hanik
   * @version $Revision: 1.1 $, $Date: 2004/06/04 20:22:27 $
   */
  import org.apache.catalina.LifecycleException;
  import org.apache.catalina.Deployer;
  import java.io.IOException;
  import java.net.URL;
  
  public interface ClusterDeployer extends MessageListener {
      /**
       * Returns the cluster the cluster deployer is associated with
       * @return CatalinaCluster
       */
      public CatalinaCluster getCluster();
  
      /**
       * Associates the cluster deployer with a cluster
       * @param cluster CatalinaCluster
       */
      public void setCluster(CatalinaCluster cluster);
  
      /**
       * Descriptive information about this component implementation.
       */
      public String info = "ClusterDeployer/1.0";
      /**
       * Start the cluster deployer, the owning container will invoke this
       * @throws Exception - if failure to start cluster
       */
      public void start() throws Exception;
  
      /**
       * Stops the cluster deployer, the owning container will invoke this
       * @throws LifecycleException
       */
      public void stop() throws LifecycleException;
  
      /**
       * Sets the deployer for this cluster deployer to use.
       * @param deployer Deployer
       */
      public void setDeployer(Deployer deployer);
  
      /**
       * Install a new web application, whose web application archive is at the
       * specified URL, into this container and all the other
       * members of the cluster with the specified context path.
       * A context path of "" (the empty string) should be used for the root
       * application for this container.  Otherwise, the context path must
       * start with a slash.
       * <p>
       * If this application is successfully installed locally, 
       * a ContainerEvent of type
       * <code>INSTALL_EVENT</code> will be sent to all registered listeners,
       * with the newly created <code>Context</code> as an argument.
       *
       * @param contextPath The context path to which this application should
       *  be installed (must be unique)
       * @param war A URL of type "jar:" that points to a WAR file, or type
       *  "file:" that points to an unpacked directory structure containing
       *  the web application to be installed
       *
       * @exception IllegalArgumentException if the specified context path
       *  is malformed (it must be "" or start with a slash)
       * @exception IllegalStateException if the specified context path
       *  is already attached to an existing web application
       * @exception IOException if an input/output error was encountered
       *  during installation
       */
      public void install(String contextPath, URL war) throws IOException;
  
      /**
       * Remove an existing web application, attached to the specified context
       * path.  If this application is successfully removed, a
       * ContainerEvent of type <code>REMOVE_EVENT</code> will be sent to all
       * registered listeners, with the removed <code>Context</code> as
       * an argument. Deletes the web application war file and/or directory
       * if they exist in the Host's appBase.
       *
       * @param contextPath The context path of the application to be removed
       * @param undeploy boolean flag to remove web application from server
       *
       * @exception IllegalArgumentException if the specified context path
       *  is malformed (it must be "" or start with a slash)
       * @exception IllegalArgumentException if the specified context path does
       *  not identify a currently installed web application
       * @exception IOException if an input/output error occurs during
       *  removal
       */
      public void remove(String contextPath, boolean undeploy) throws IOException;
  }
  
  
  
  1.1                  
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/deploy/FarmWarDeployer.java
  
  Index: FarmWarDeployer.java
  ===================================================================
  /*
   * Copyright 1999,2004 The Apache Software Foundation.
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *      http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  
  package org.apache.catalina.cluster.deploy;
  
  import org.apache.catalina.cluster.ClusterDeployer;
  import org.apache.catalina.cluster.ClusterMessage;
  import org.apache.catalina.cluster.CatalinaCluster;
  import org.apache.catalina.LifecycleException;
  import org.apache.catalina.Deployer;
  import java.io.File;
  import java.net.URL;
  import java.io.IOException;
  import org.apache.catalina.cluster.Member;
  import java.util.HashMap;
  
  /**
   * <p>
   * A farm war deployer is a class that is able to
   * deploy/undeploy web applications in WAR form
   * within the cluster.</p>
   * Any host can act as the admin, and will have three directories
   * <ul>
   * <li> deployDir - the directory where we watch for changes</li>
   * <li> applicationDir - the directory where we install applications</li>
   * <li> tempDir - a temporaryDirectory to store binary data when downloading a war
   *      from the cluster </li>
   * </ul>
   * Currently we only support deployment of WAR files since they are easier to send
   * across the wire.
   *
   * @author Filip Hanik
   * @version 1.0
   */
  public class FarmWarDeployer implements ClusterDeployer, FileChangeListener {
      /*--Static Variables----------------------------------------*/
      public static org.apache.commons.logging.Log log =
          org.apache.commons.logging.LogFactory.getLog(FarmWarDeployer.class);
      /*--Instance Variables--------------------------------------*/
      protected CatalinaCluster cluster = null;
      protected Deployer deployer = null;
      protected boolean started = false; //default 5 seconds
      protected HashMap fileFactories = new HashMap();
      protected String deployDir;
      protected String tempDir;
      protected String watchDir;
      protected boolean watchEnabled = false;
      protected WarWatcher watcher = null;
      /*--Constructor---------------------------------------------*/
      public FarmWarDeployer() {
      }
  
      /*--Logic---------------------------------------------------*/
      public void start() throws Exception {
          if (started)return;
          getCluster().addClusterListener(this);
          if (watchEnabled) {
              watcher = new WarWatcher(this, new File(getWatchDir()), (long) 5000);
              Thread t = new Thread(watcher);
              t.start();
              log.info("Cluster deployment is watching " + getWatchDir() +
                       " for changes.");
          } //end if
          started = true;
          log.info("Cluster FarmWarDeployer started.");
      }
  
      public void stop() throws LifecycleException {
          started = false;
          getCluster().removeClusterListener(this);
          if (watcher != null) watcher.stop();
          log.info("Cluster FarmWarDeployer stopped.");
      }
  
      public void cleanDeployDir() {
          throw new java.lang.UnsupportedOperationException(
              "Method cleanDeployDir() not yet implemented.");
      }
  
      /**
       * Callback from the cluster, when a message is received,
       * The cluster will broadcast it invoking the messageReceived
       * on the receiver.
       * @param msg ClusterMessage - the message received from the cluster
       */
      public void messageReceived(ClusterMessage msg) {
          try {
              if (msg instanceof FileMessage && msg != null) {
                  FileMessage fmsg = (FileMessage) msg;
                  FileMessageFactory factory = getFactory(fmsg);
                  if (factory.writeMessage(fmsg)) {
                      //last message received
                      String name = factory.getFile().getName();
                      if (!name.endsWith(".war"))
                          name = name + ".war";
                      File deployable = new File(getDeployDir(), name);
                      factory.getFile().renameTo(deployable);
                      try {
                          if (getDeployer().findDeployedApp(fmsg.getContextPath()) != 
null)
                              getDeployer().remove(fmsg.getContextPath(), true);
                      } catch (Exception x) {
                          log.info(
                              "Error removing existing context before installing a new 
one.",
                              x);
                      }
                      getDeployer().install(fmsg.getContextPath(),
                                            deployable.toURL());
                      removeFactory(fmsg);
                  } //end if
              } else if (msg instanceof UndeployMessage && msg != null) {
                  UndeployMessage umsg = (UndeployMessage) msg;
                  if (getDeployer().findDeployedApp(umsg.getContextPath()) != null)
                      getDeployer().remove(umsg.getContextPath(),
                                           umsg.getUndeploy());
              } //end if
          } catch (java.io.IOException x) {
              log.error("Unable to read farm deploy file message.", x);
          }
      }
  
      public synchronized FileMessageFactory getFactory(FileMessage msg) throws
          java.io.FileNotFoundException, java.io.IOException {
          File tmpFile = new File(msg.getFileName());
          File writeToFile = new File(getTempDir(), tmpFile.getName());
          FileMessageFactory factory = (FileMessageFactory) fileFactories.get(msg.
              getFileName());
          if (factory == null) {
              factory = FileMessageFactory.getInstance(writeToFile, true);
              fileFactories.put(msg.getFileName(), factory);
          }
          return factory;
      }
  
      public void removeFactory(FileMessage msg) {
          fileFactories.remove(msg.getFileName());
      }
  
      /**
       * Before the cluster invokes messageReceived the
       * cluster will ask the receiver to accept or decline the message,
       * In the future, when messages get big, the accept method will only take
       * a message header
       * @param msg ClusterMessage
       * @return boolean - returns true to indicate that messageReceived
       * should be invoked. If false is returned, the messageReceived method
       * will not be invoked.
       */
      public boolean accept(ClusterMessage msg) {
          return (msg instanceof FileMessage) ||
              (msg instanceof UndeployMessage);
      }
  
      /**
       * Install a new web application, whose web application archive is at the
       * specified URL, into this container and all the other
       * members of the cluster with the specified context path.
       * A context path of "" (the empty string) should be used for the root
       * application for this container.  Otherwise, the context path must
       * start with a slash.
       * <p>
       * If this application is successfully installed locally,
       * a ContainerEvent of type
       * <code>INSTALL_EVENT</code> will be sent to all registered listeners,
       * with the newly created <code>Context</code> as an argument.
       *
       * @param contextPath The context path to which this application should
       *  be installed (must be unique)
       * @param war A URL of type "jar:" that points to a WAR file, or type
       *  "file:" that points to an unpacked directory structure containing
       *  the web application to be installed
       *
       * @exception IllegalArgumentException if the specified context path
       *  is malformed (it must be "" or start with a slash)
       * @exception IllegalStateException if the specified context path
       *  is already attached to an existing web application
       * @exception IOException if an input/output error was encountered
       *  during installation
       */
      public void install(String contextPath, URL war) throws IOException {
          if (getDeployer().findDeployedApp(contextPath) != null)
              getDeployer().remove(contextPath, true);
              //step 1. Install it locally
          getDeployer().install(contextPath, war);
          //step 2. Send it to each member in the cluster
          Member[] members = getCluster().getMembers();
          Member localMember = getCluster().getLocalMember();
          FileMessageFactory factory = FileMessageFactory.getInstance(new File(
              war.getFile()), false);
          FileMessage msg = new FileMessage(localMember, war.getFile(),
                                            contextPath);
          msg = factory.readMessage(msg);
          while (msg != null) {
              for (int i = 0; i < members.length; i++) {
                  getCluster().send(msg, members[i]);
              } //for
              msg = factory.readMessage(msg);
          } //while
      }
  
      /**
       * Remove an existing web application, attached to the specified context
       * path.  If this application is successfully removed, a
       * ContainerEvent of type <code>REMOVE_EVENT</code> will be sent to all
       * registered listeners, with the removed <code>Context</code> as
       * an argument. Deletes the web application war file and/or directory
       * if they exist in the Host's appBase.
       *
       * @param contextPath The context path of the application to be removed
       * @param undeploy boolean flag to remove web application from server
       *
       * @exception IllegalArgumentException if the specified context path
       *  is malformed (it must be "" or start with a slash)
       * @exception IllegalArgumentException if the specified context path does
       *  not identify a currently installed web application
       * @exception IOException if an input/output error occurs during
       *  removal
       */
      public void remove(String contextPath, boolean undeploy) throws IOException {
          log.info("Cluster wide remove of web app " + contextPath);
          //step 1. Remove it locally
          if (getDeployer().findDeployedApp(contextPath) != null)
              getDeployer().remove(contextPath, undeploy);
              //step 2. Send it to each member in the cluster
          Member[] members = getCluster().getMembers();
          Member localMember = getCluster().getLocalMember();
          UndeployMessage msg = new UndeployMessage(localMember,
                                                    System.currentTimeMillis(),
                                                    "Undeploy:" + contextPath +
                                                    ":" +
                                                    System.currentTimeMillis(),
                                                    contextPath,
                                                    undeploy);
          cluster.send(msg);
      }
  
      public void fileModified(File newWar) {
          try {
              File deployWar = new File(getDeployDir(),newWar.getName());
              copy(newWar,deployWar);
              String contextName = "/" + deployWar.getName().substring(0,
                  deployWar.getName().lastIndexOf(".war"));
              log.info("Installing webapp[" + contextName + "] from " + 
deployWar.getAbsolutePath());
              try {
                  remove(contextName, true);
              } catch (Exception x) {
                  log.error("No removal", x);
              }
              install(contextName, deployWar.toURL());
          } catch (Exception x) {
              log.error("Unable to install WAR file", x);
          }
      }
  
      public void fileRemoved(File removeWar) {
          try {
              String contextName = "/" + removeWar.getName().substring(0,
                  removeWar.getName().lastIndexOf(".war"));
              log.info("Removing webapp[" + contextName + "]");
              remove(contextName, true);
          } catch (Exception x) {
              log.error("Unable to remove WAR file", x);
          }
      }
  
      /*--Instance Getters/Setters--------------------------------*/
      public CatalinaCluster getCluster() {
          return cluster;
      }
  
      public void setCluster(CatalinaCluster cluster) {
          this.cluster = cluster;
      }
  
      public void setDeployer(Deployer deployer) {
          this.deployer = deployer;
      }
  
      public boolean equals(Object listener) {
          return super.equals(listener);
      }
  
      public int hashCode() {
          return super.hashCode();
      }
  
      public String getDeployDir() {
          return deployDir;
      }
  
      public void setDeployDir(String deployDir) {
          this.deployDir = deployDir;
      }
  
      public String getTempDir() {
          return tempDir;
      }
  
      public void setTempDir(String tempDir) {
          this.tempDir = tempDir;
      }
  
      public Deployer getDeployer() {
          return deployer;
      }
  
      public String getWatchDir() {
          return watchDir;
      }
  
      public void setWatchDir(String watchDir) {
          this.watchDir = watchDir;
      }
  
      public boolean isWatchEnabled() {
          return watchEnabled;
      }
  
      public boolean getWatchEnabled() {
          return watchEnabled;
      }
  
      public void setWatchEnabled(boolean watchEnabled) {
          this.watchEnabled = watchEnabled;
      }
      
      /**
  
  
  
      /**
       * Copy a file to the specified temp directory. This is required only
       * because Jasper depends on it.
       */
      private boolean copy(File from, File to) {
          try {
              if ( !to.exists() ) to.createNewFile();
              java.io.FileInputStream is = new java.io.FileInputStream(from);
              java.io.FileOutputStream os = new java.io.FileOutputStream(to,false);
              byte[] buf = new byte[4096];
              while (true) {
                  int len = is.read(buf);
                  if (len < 0)
                      break;
                  os.write(buf, 0, len);
              }
              is.close();
              os.close();
          } catch (IOException e) {
              log.error("Unable to copy file from:"+from+" to:"+to,e);
              return false;
          }
          return true;
      }
  
  }
  
  
  
  1.1                  
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/deploy/FileChangeListener.java
  
  Index: FileChangeListener.java
  ===================================================================
  /*
   * Copyright 1999,2004 The Apache Software Foundation.
   * 
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   * 
   *      http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  
  package org.apache.catalina.cluster.deploy;
  import java.io.File;
  
  public interface FileChangeListener {
      public void fileModified(File f);
      public void fileRemoved(File f);
  }
  
  
  
  1.1                  
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/deploy/FileMessage.java
  
  Index: FileMessage.java
  ===================================================================
  /*
   * Copyright 1999,2004 The Apache Software Foundation.
   * 
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   * 
   *      http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  
  package org.apache.catalina.cluster.deploy;
  
  import org.apache.catalina.cluster.ClusterMessage;
  import org.apache.catalina.cluster.Member;
  import java.io.Externalizable;
  import java.io.Serializable;
  import java.io.IOException;
  import java.io.ObjectOutput;
  import java.io.ObjectInput;
  
  /**
   * Contains the data for a file being transferred over TCP, this is 
   * essentially a fragment of a file, read and written by the FileMessageFactory
   * @author Filip Hanik
   * @version 1.0
   */
  
  public class FileMessage implements ClusterMessage, Serializable {
      private int messageNumber;
      private byte[] data;
      private int dataLength;
      private org.apache.catalina.cluster.Member address;
      
      private long timestamp;
      private long totalLength;
      private long totalNrOfMsgs;
      private String fileName;
      private String contextPath;
      
      public FileMessage(Member source,
                         String fileName,
                         String contextPath) {
          this.address=source;
          this.fileName=fileName;
          this.contextPath=contextPath;
      }
      
      /*
      public void writeExternal(ObjectOutput out) throws IOException {
                     
      }
      
      public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
                    
      }
      */
     
      public int getMessageNumber() {
          return messageNumber;
      }
      public void setMessageNumber(int messageNumber) {
          this.messageNumber = messageNumber;
      }
      public long getTotalNrOfMsgs() {
          return totalNrOfMsgs;
      }
      public void setTotalNrOfMsgs(long totalNrOfMsgs) {
          this.totalNrOfMsgs = totalNrOfMsgs;
      }
      public byte[] getData() {
          return data;
      }
      public void setData(byte[] data, int length) {
          this.data = data;
          this.dataLength = length;
      }
      public int getDataLength() {
          return dataLength;
      }
      public void setDataLength(int dataLength) {
          this.dataLength = dataLength;
      }
      public long getTotalLength() {
          return totalLength;
      }
      public void setTotalLength(long totalLength) {
          this.totalLength = totalLength;
      }
      public org.apache.catalina.cluster.Member getAddress() {
          return address;
      }
      public void setAddress(org.apache.catalina.cluster.Member address) {
          this.address = address;
      }
      public String getUniqueId() {
          StringBuffer result = new StringBuffer(getFileName());
          result.append("#-#");
          result.append(getMessageNumber());
          result.append("#-#");
          result.append(System.currentTimeMillis());
          return result.toString();
      }
  
      public long getTimestamp() {
          return timestamp;
      }
      public void setTimestamp(long timestamp) {
          this.timestamp = timestamp;
      }
      public String getFileName() {
          return fileName;
      }
      public void setFileName(String fileName) {
          this.fileName = fileName;
      }
      public String getContextPath() {
          return contextPath;
      }
  
  
  }
  
  
  
  1.1                  
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/deploy/FileMessageFactory.java
  
  Index: FileMessageFactory.java
  ===================================================================
  /*
   * Copyright 1999,2004 The Apache Software Foundation.
   * 
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   * 
   *      http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  
  package org.apache.catalina.cluster.deploy;
  import java.io.File;
  import java.io.IOException;
  import java.io.FileInputStream;
  import java.io.FileOutputStream;
  import java.io.FileNotFoundException;
  /**
   * This factory is used to read files and write files by splitting them up into 
smaller
   * messages. So that entire files don't have to be read into memory.<BR>
   * The factory can be used as a reader or writer but not both at the same time.
   * When done reading or writing the factory will close the input or output streams
   * and mark the factory as closed. It is not possible to use it after that.<BR>
   * To force a cleanup, call cleanup() from the calling object.<BR>
   * This class is not thread safe. 
   * @author Filip Hanik
   * @version 1.0
   */
  public class FileMessageFactory {
      
      /**
       * The number of bytes that we read from file
       */
      public static final int READ_SIZE = 1024*10; //10kb
      
      /**
       * The file that we are reading/writing
       */
      protected File file = null;
      
      /**
       * True means that we are writing with this factory.
       * False means that we are reading with this factory
       */
      protected boolean openForWrite;
      
      /**
       * Once the factory is used, it can not be reused.
       */
      protected boolean closed = false;
      
      /**
       * When openForWrite=false, the input stream 
       * is held by this variable
       */
      protected FileInputStream in;
      
      /**
       * When openForWrite=true, the output stream 
       * is held by this variable
       */
      protected FileOutputStream out;
      
      /**
       * The number of messages we have read or written
       */
      protected int nrOfMessagesProcessed = 0;
      
      /**
       * The total size of the file
       */
      protected long size = 0;
      
      /**
       * The total number of packets that we split this file into
       */
      protected long totalNrOfMessages = 0;
      
      /**
       * The bytes that we hold the data in, not thread safe.
       */
      protected byte[] data = new byte[READ_SIZE];
      
      /**
       * Private constructor, either instantiates a factory to read or write.<BR>
       * When openForWrite==true, then a the file, f, will be created and an output 
       * stream is opened to write to it.<BR>
       * When openForWrite==false, an input stream is opened, the file has to exist.
       * @param f File - the file to be read/written
       * @param openForWrite boolean - true means we are writing to the file, false 
       * means we are reading from the file
       * @throws FileNotFoundException - if the file to be read doesn't exist
       * @throws IOException - if the system fails to open input/output streams to the 
file
       * or if it fails to create the file to be written to.
       */
      private FileMessageFactory(File f, boolean openForWrite) 
          throws FileNotFoundException, IOException{
          this.file = f;
          this.openForWrite = openForWrite;
          if ( openForWrite ) {
              if (!file.exists()) file.createNewFile();
              out = new FileOutputStream(f);
          } else {
              size = file.length();
              totalNrOfMessages = (size / READ_SIZE) + 1;
              in = new FileInputStream(f);
          }//end if
              
      }
      
      /**
       * Creates a factory to read or write from a file.
       * When opening for read, the readMessage can be invoked, and when
       * opening for write the writeMessage can be invoked.
       * @param f File - the file to be read or written
       * @param openForWrite boolean - true, means we are writing to the file, false 
means we are 
       * reading from it 
       * @throws FileNotFoundException - if the file to be read doesn't exist
       * @throws IOException - if it fails to create the file that is to be written
       * @return FileMessageFactory 
       */
      public static FileMessageFactory getInstance(File f, boolean openForWrite) 
          throws FileNotFoundException, IOException {
          return new FileMessageFactory(f,openForWrite);
      }
      
      /**
       * Reads file data into the file message and sets the 
       * size, totalLength, totalNrOfMsgs and the message number<BR>
       * If EOF is reached, the factory returns null, and closes itself,
       * otherwise the same message is returned as was passed in.
       * This makes sure that not more memory is ever used. 
       * To remember, neither the file message or the factory are thread safe.
       * dont hand off the message to one thread and read the same with another. 
       * @param f FileMessage - the message to be populated with file data
       * @throws IllegalArgumentException - if the factory is for writing or is closed
       * @throws IOException - if a file read exception occurs
       * @return FileMessage - returns the same message passed in as a parameter, or 
null if EOF
       */
      public FileMessage readMessage(FileMessage f) throws IllegalArgumentException, 
IOException {
          checkState(false);
          int length = in.read(data);
          if ( length == -1 ) {
              cleanup();
              return null;
          } else {
              f.setData(data, length);
              f.setTotalLength(size);
              f.setTotalNrOfMsgs(totalNrOfMessages);
              f.setMessageNumber(++nrOfMessagesProcessed);
              return f;
          }//end if
      }
      
      /**
       * Writes a message to file. If (msg.getMessageNumber() == 
msg.getTotalNrOfMsgs())
       * the output stream will be closed after writing.  
       * @param msg FileMessage - message containing data to be written
       * @throws IllegalArgumentException - if the factory is opened for read or closed
       * @throws IOException - if a file write error occurs
       * @return returns true if the file is complete and outputstream is closed, 
false otherwise.
       */
      public boolean writeMessage(FileMessage msg) throws IllegalArgumentException, 
IOException {
          if ( !openForWrite ) throw new IllegalArgumentException("Can't write 
message, this factory is reading.");
          out.write(msg.getData(),0,msg.getDataLength());
          nrOfMessagesProcessed++;
          out.flush();
          if ( msg.getMessageNumber() == msg.getTotalNrOfMsgs() ) {
              out.close();
              cleanup();
              return true;
          }//end if
          return false;
      }//writeMessage
      
      /**
       * Closes the factory, its streams and sets all its references to null
       */
      public void cleanup() {
          if ( in != null ) try { in.close(); } catch ( Exception ignore ){}
          if ( out != null ) try { out.close(); } catch ( Exception ignore ){}
          in = null;
          out = null;
          size = 0;
          closed = true;
          data = null;
          nrOfMessagesProcessed = 0;
          totalNrOfMessages = 0;
      }
      
      /**
       * Check to make sure the factory is able to perform the 
       * function it is asked to do. Invoked by readMessage/writeMessage before
       * those methods proceed.
       * @param openForWrite boolean
       * @throws IllegalArgumentException
       */
      protected void checkState(boolean openForWrite) throws IllegalArgumentException {
          if ( this.openForWrite != openForWrite ) {
              cleanup();
              if ( openForWrite )  
                  throw new IllegalArgumentException("Can't write message, this 
factory is reading.");
              else 
                  throw new IllegalArgumentException("Can't read message, this factory 
is writing.");
          }
          if ( this.closed ) {
              cleanup();
              throw new IllegalArgumentException("Factory has been closed.");
          }
      }
          
      /**
       * Example usage.
       * @param args String[], args[0] - read from filename, args[1] write to filename
       * @throws Exception
       */
      public static void main(String[] args) throws Exception {
          
          
          System.out.println("Usage: FileMessageFactory fileToBeRead fileToBeWritten");
          System.out.println("Usage: This will make a copy of the file on the local 
file system");
          FileMessageFactory read = getInstance(new File(args[0]),false);
          FileMessageFactory write = getInstance(new File(args[1]),true);
          FileMessage msg = new FileMessage(null,args[0],args[0]);
          msg = read.readMessage(msg);
          System.out.println("Expecting to write " + msg.getTotalNrOfMsgs()+" 
messages.");
          int cnt = 0;
          while ( msg != null ) {
              write.writeMessage(msg);
              cnt++;
              msg = read.readMessage(msg);
          }//while
          System.out.println("Actually wrote " + cnt+" messages.");
      }///main
      
      public File getFile() {
          return file;
      }
      
      
      
  
  }
  
  
  
  1.1                  
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/deploy/UndeployMessage.java
  
  Index: UndeployMessage.java
  ===================================================================
  /*
   * Copyright 1999,2004 The Apache Software Foundation.
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *      http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  
  package org.apache.catalina.cluster.deploy;
  
  import org.apache.catalina.cluster.ClusterMessage;
  import org.apache.catalina.cluster.Member;
  import java.io.Serializable;
  public class UndeployMessage implements ClusterMessage,Serializable {
      private Member address;
      private long timestamp;
      private String uniqueId;
      private String contextPath;
      private boolean undeploy;
      
      public UndeployMessage() {} //for serialization
      public UndeployMessage(Member address,
                             long timestamp,
                             String uniqueId,
                             String contextPath,
                             boolean undeploy) {
          this.address  = address;
          this.timestamp= timestamp;
          this.undeploy = undeploy;
          this.uniqueId = uniqueId;
          this.undeploy = undeploy;
          this.contextPath = contextPath;
      }
  
      public Member getAddress() {
          return address;
      }
  
      public void setAddress(Member address) {
          this.address = address;
      }
  
      public long getTimestamp() {
          return timestamp;
      }
  
      public void setTimestamp(long timestamp) {
          this.timestamp = timestamp;
      }
  
      public String getUniqueId() {
          return uniqueId;
      }
  
      public void setUniqueId(String uniqueId) {
          this.uniqueId = uniqueId;
      }
  
      public String getContextPath() {
          return contextPath;
      }
  
      public void setContextPath(String contextPath) {
          this.contextPath = contextPath;
      }
  
      public boolean getUndeploy() {
          return undeploy;
      }
  
      public void setUndeploy(boolean undeploy) {
          this.undeploy = undeploy;
      }
  }
  
  
  
  1.1                  
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/deploy/WarWatcher.java
  
  Index: WarWatcher.java
  ===================================================================
  /*
   * Copyright 1999,2004 The Apache Software Foundation.
   * 
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   * 
   *      http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  
  package org.apache.catalina.cluster.deploy;
  import java.io.File;
  import java.util.HashMap;
  import java.util.Map;
  import java.util.Iterator;
  /**
   * <p>The <b>WarWatcher</b> watches the deployDir for changes made to
   * the directory (adding new WAR files->deploy or remove WAR files->undeploy)
   * And notifies a listener of the changes made</p>    
   * 
   * @author Filip Hanik
   * @version 1.0
   */
  
  public class WarWatcher implements Runnable {
  
      /*--Static Variables----------------------------------------*/
      public static org.apache.commons.logging.Log log =
          org.apache.commons.logging.LogFactory.getLog( WarWatcher.class );
  
      /*--Instance Variables--------------------------------------*/
      /**
       * Directory to watch for war files
       */
      protected File deployDir = null;
  
      /**
       * Parent to be notified of changes
       */
      protected FileChangeListener listener = null;
      
      /**
       * 
       * Check interval
       */
      protected long interval = 5000; //5 seconds
      
      /**
       * Run status 
       */
      protected boolean alive = true;
      
      /**
       * Currently deployed files
       */
      protected Map currentStatus = new HashMap();
      
      /*--Constructor---------------------------------------------*/
  
      
      public WarWatcher(FileChangeListener listener,
                        File deployDir,
                        long interval) {
          this.listener = listener;
          this.deployDir = deployDir;
          this.interval = interval;
      }
  
      /*--Logic---------------------------------------------------*/
      
      public void run() {
          while ( alive ) {
              try {
                  File[] list = deployDir.listFiles(new WarFilter());
                  //first make sure all the files are listed in our current status
                  for ( int i=0; i<list.length; i++ ) {
                      addWarInfo(list[i]);
                  }//for
                  
                  //check all the status codes and update the FarmDeployer
                  for ( Iterator i=currentStatus.entrySet().iterator(); i.hasNext();) {
                      Map.Entry entry = (Map.Entry)i.next();
                      WarInfo info = (WarInfo)entry.getValue();
                      int check = info.check(); 
                      if ( check == 1 ) {
                          listener.fileModified(info.getWar());
                      } else if ( check == -1 ) {
                          listener.fileRemoved(info.getWar());
                          //no need to keep in memory
                          currentStatus.remove(info.getWar());
                      }//end if
                  }//for
  
                  //sleep for the set interval
                  Thread.sleep(interval);
              } catch ( Exception x ) {
                  log.error("Unable to watch for WAR deployments in cluster.",x);
              }
          }//while
      }//run
      
      protected void addWarInfo(File f) { 
          WarInfo info = (WarInfo)currentStatus.get(f.getAbsolutePath());
          if ( info == null ) {
              info = new WarInfo(f);
              info.setLastState(-1); //assume file is non existent
              currentStatus.put(f.getAbsolutePath(),info);
          }
      }
      
      public void stop() {
          alive = false;
          currentStatus.clear();
          listener = null;
      }
      
      
      
      /*--Inner classes-------------------------------------------*/
      
  
      
      
      /**
       * File name filter for war files
       */
      protected class WarFilter implements java.io.FilenameFilter {
          public boolean accept(File path, String name) {
              if ( name == null ) return false;
              return name.endsWith(".war");
          }
      }
      
      
      /**
       * File information on existing WAR files
       */
      protected class WarInfo {
          protected File war = null;
          protected long lastChecked = 0;
          protected long lastState = 0;
          
          public WarInfo(File war) {
              this.war = war;
              this.lastChecked = war.lastModified();
              if ( !war.exists() ) lastState = -1;
          }
          
          public boolean modified() {
              return war.exists() && 
                  war.lastModified() > lastChecked;
          }
              
          public boolean exists() {
              return war.exists();
          }
          
          /**
           * Returns 
           * 1 if the file has been added/modified, 
           * 0 if the file is unchanged and 
           * -1 if the file has been removed
           * @return int 1=file added; 0=unchanged; -1=file removed
           */
          public int check() {
              //file unchanged by default
              int result = 0;
              
              if ( modified() ) {
                  //file has changed - timestamp
                  result = 1;
                  lastState = result;
              } else if ( (!exists()) && (!(lastState==-1)) ) {
                  //file was removed
                  result = -1;
                  lastState = result;
              } else if ( (lastState==-1) && exists() ) {
                  //file was added
                  result = 1;
                  lastState = result;
              }
              this.lastChecked = System.currentTimeMillis();
              return result;
          }
          
          public File getWar() {
              return war;
          }
          
          public int hashCode() {
              return war.getAbsolutePath().hashCode();
          }
          
          public boolean equals(Object other) {
              if ( other instanceof WarInfo ) {
                  WarInfo wo = (WarInfo)other;
                  return wo.getWar().equals(getWar());
              } else {
                  return false;
              }
          }
          
          protected void setLastState(int lastState) {
              this.lastState = lastState;
          }
          
      }
      
      
  
  }
  
  
  
  1.27      +2 -1      
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java
  
  Index: DeltaManager.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java,v
  retrieving revision 1.26
  retrieving revision 1.27
  diff -u -r1.26 -r1.27
  --- DeltaManager.java 29 May 2004 02:43:58 -0000      1.26
  +++ DeltaManager.java 4 Jun 2004 20:22:27 -0000       1.27
  @@ -718,6 +718,7 @@
           if( initialized ) {
               destroy();
           }
  +        getCluster().removeManager(getName());
       }
   
   
  
  
  
  1.29      +1 -0      
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/SimpleTcpReplicationManager.java
  
  Index: SimpleTcpReplicationManager.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/SimpleTcpReplicationManager.java,v
  retrieving revision 1.28
  retrieving revision 1.29
  diff -u -r1.28 -r1.29
  --- SimpleTcpReplicationManager.java  2 Jun 2004 14:10:01 -0000       1.28
  +++ SimpleTcpReplicationManager.java  4 Jun 2004 20:22:27 -0000       1.29
  @@ -484,6 +484,7 @@
           //stop the javagroup channel
           try
           {
  +            cluster.removeManager(getName());
   //            mReplicationListener.stopListening();
   //            mReplicationTransmitter.stop();
   //            service.stop();
  
  
  
  1.41      +75 -5     
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java
  
  Index: SimpleTcpCluster.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java,v
  retrieving revision 1.40
  retrieving revision 1.41
  diff -u -r1.40 -r1.41
  --- SimpleTcpCluster.java     29 May 2004 02:36:12 -0000      1.40
  +++ SimpleTcpCluster.java     4 Jun 2004 20:22:27 -0000       1.41
  @@ -22,6 +22,7 @@
   import java.net.UnknownHostException;
   import java.io.IOException;
   import java.util.HashMap;
  +import java.util.Vector;
   
   import org.apache.catalina.ServerFactory;
   import org.apache.catalina.core.StandardServer;
  @@ -56,6 +57,8 @@
   import org.apache.catalina.cluster.Constants;
   import org.apache.catalina.cluster.ClusterReceiver;
   import org.apache.catalina.cluster.ClusterSender;
  +import org.apache.catalina.cluster.ClusterDeployer;
  +
   
   
   import org.apache.commons.logging.Log;
  @@ -184,6 +187,12 @@
        */
       private org.apache.catalina.cluster.ClusterReceiver clusterReceiver;
       private org.apache.catalina.Valve valve;
  +    private org.apache.catalina.cluster.ClusterDeployer clusterDeployer;
  +    
  +    /**
  +     * Listeners of messages
  +     */
  +    protected Vector clusterListeners = new Vector();
   
       // ------------------------------------------------------------- Properties
   
  @@ -291,6 +300,15 @@
           return members;
       }
       
  +    /**
  +     * Return the member that represents this node.
  +     * @return Member
  +     */
  +    public Member getLocalMember() {
  +        return membershipService.getLocalMember();
  +    }
  +
  +    
   
   
   
  @@ -299,6 +317,7 @@
   
       public synchronized Manager createManager(String name) {
           log.debug("Creating ClusterManager for context "+name + " using class 
"+getManagerClassName());
  +        System.out.println("\n\n\n\nCreating ClusterManager for context "+name + " 
using class "+getManagerClassName()+"\n\n\n\n");
           ClusterManager manager = null;
           try {
               manager = 
(ClusterManager)getClass().getClassLoader().loadClass(getManagerClassName()).newInstance();
  @@ -312,8 +331,18 @@
           manager.setExpireSessionsOnShutdown(expireSessionsOnShutdown);
           manager.setUseDirtyFlag(useDirtyFlag);
           managers.put(name,manager);
  +        
           return manager;
       }
  +    
  +    public void removeManager(String name) {
  +        managers.remove(name);
  +    }
  +    
  +    public Manager getManager(String name) {
  +        return (Manager)managers.get(name);
  +    }
  +    
   
   
       // ------------------------------------------------------ Lifecycle Methods
  @@ -378,6 +407,22 @@
               
membershipService.setLocalMemberProperties(clusterReceiver.getHost(),clusterReceiver.getPort());
               membershipService.addMembershipListener(this);
               membershipService.start();
  +            //set the deployer.
  +            try {
  +                if ( clusterDeployer != null ) {
  +                    clusterDeployer.setCluster(this);
  +                    Object deployer = MethodUtils.invokeMethod(
  +                        getContainer(),
  +                        "getDeployer",
  +                        new Object[0],
  +                        new Class[0]);
  +                    clusterDeployer.setDeployer( (org.apache.catalina.Deployer)
  +                                                deployer);
  +                   clusterDeployer.start();
  +                }
  +            } catch (Throwable x) {
  +                log.fatal("Unable to retrieve the container deployer. Cluster 
deployment disabled.",x);
  +            } //catch
               this.started = true;
           } catch ( Exception x ) {
               log.error("Unable to start cluster.",x);
  @@ -455,6 +500,9 @@
           } catch (Exception x ) {
               log.error("Unable to stop cluster receiver.",x);
           }
  +        if ( clusterDeployer != null ) {
  +            clusterDeployer.stop();
  +        }
           started = false;
       }
   
  @@ -529,8 +577,19 @@
                       else
                           log.warn("Context manager doesn't exist:" + ctxname);
                   }//end if
  -            }  else
  -                log.warn("Received invalid message myobj="+myobj);
  +            }  else {
  +                //invoke all the listeners
  +                for ( int i=0; i<clusterListeners.size(); i++ ) {
  +                    MessageListener listener = 
(MessageListener)clusterListeners.elementAt(i);
  +                    if ( myobj!=null &&
  +                         myobj instanceof ClusterMessage && 
  +                         listener.accept((ClusterMessage)myobj) ) {
  +                        listener.messageReceived((ClusterMessage)myobj);
  +                    }//end if
  +                    
  +                }//for
  +            }//end if
  +                
           } catch ( Exception x ) {
               log.error("Unable to deserialize session message.",x);
           }
  @@ -657,12 +716,23 @@
       }
       
       public void addClusterListener(MessageListener listener) {
  -        //TO DO
  +        if ( !clusterListeners.contains(listener) ) {
  +            clusterListeners.addElement(listener);
  +        }
       }
   
       public void removeClusterListener(MessageListener listener) {
  -        //TO DO
  +        clusterListeners.removeElement(listener);
  +    }
  +    public org.apache.catalina.cluster.ClusterDeployer getClusterDeployer() {
  +        return clusterDeployer;
  +    }
  +    public void setClusterDeployer(org.apache.catalina.cluster.ClusterDeployer 
clusterDeployer) {
  +        this.clusterDeployer = clusterDeployer;
       }
  +    
  + 
  +
       
       
       private class MemberComparator implements java.util.Comparator {
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to