fhanik 2004/06/04 13:22:27 Modified: catalina/src/share/org/apache/catalina/startup ClusterRuleSet.java modules/cluster/src/share/org/apache/catalina/cluster CatalinaCluster.java modules/cluster/src/share/org/apache/catalina/cluster/session DeltaManager.java SimpleTcpReplicationManager.java modules/cluster/src/share/org/apache/catalina/cluster/tcp SimpleTcpCluster.java Added: modules/cluster/src/share/org/apache/catalina/cluster ClusterDeployer.java modules/cluster/src/share/org/apache/catalina/cluster/deploy FarmWarDeployer.java FileChangeListener.java FileMessage.java FileMessageFactory.java UndeployMessage.java WarWatcher.java Log: Added in the ability to do farm deployment/undeployment of war files to all members in the cluster Upon startup, no state is replicated. Also, I wanna refator some messaging before we move on, I do that once we have a branch Revision Changes Path 1.3 +10 -1 jakarta-tomcat-catalina/catalina/src/share/org/apache/catalina/startup/ClusterRuleSet.java Index: ClusterRuleSet.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/catalina/src/share/org/apache/catalina/startup/ClusterRuleSet.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- ClusterRuleSet.java 27 Feb 2004 14:58:48 -0000 1.2 +++ ClusterRuleSet.java 4 Jun 2004 20:22:27 -0000 1.3 @@ -115,6 +115,15 @@ digester.addSetNext(prefix + "Valve", "addValve", "org.apache.catalina.Valve"); + + digester.addObjectCreate(prefix + "Deployer", + null, // MUST be specified in the element + "className"); + digester.addSetProperties(prefix + "Deployer"); + digester.addSetNext(prefix + "Deployer", + "setClusterDeployer", + "org.apache.catalina.cluster.ClusterDeployer"); + //Cluster configuration end 1.6 +15 -1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/CatalinaCluster.java Index: CatalinaCluster.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/CatalinaCluster.java,v retrieving revision 1.5 retrieving revision 1.6 diff -u -r1.5 -r1.6 --- CatalinaCluster.java 29 May 2004 02:36:12 -0000 1.5 +++ CatalinaCluster.java 4 Jun 2004 20:22:27 -0000 1.6 @@ -23,6 +23,7 @@ import org.apache.catalina.Logger; import org.apache.catalina.Valve; import org.apache.commons.logging.Log; +import org.apache.catalina.Manager; /** * A <b>CatalinaCluster</b> interface allows to plug in and out the @@ -78,6 +79,12 @@ */ public Member[] getMembers(); + /** + * Return the member that represents this node. + * @return Member + */ + public Member getLocalMember(); + public void setClusterSender(ClusterSender sender); public ClusterSender getClusterSender(); @@ -95,5 +102,12 @@ public void addClusterListener(MessageListener listener); public void removeClusterListener(MessageListener listener); + + public void setClusterDeployer(ClusterDeployer deployer); + + public ClusterDeployer getClusterDeployer(); + + public Manager getManager(String name); + public void removeManager(String name); } 1.1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/ClusterDeployer.java Index: ClusterDeployer.java =================================================================== /* * Copyright 1999,2004 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.catalina.cluster; /** * A <b>ClusterDeployer</b> interface allows to plug in and out the * different deployment implementations * * @author Filip Hanik * @version $Revision: 1.1 $, $Date: 2004/06/04 20:22:27 $ */ import org.apache.catalina.LifecycleException; import org.apache.catalina.Deployer; import java.io.IOException; import java.net.URL; public interface ClusterDeployer extends MessageListener { /** * Returns the cluster the cluster deployer is associated with * @return CatalinaCluster */ public CatalinaCluster getCluster(); /** * Associates the cluster deployer with a cluster * @param cluster CatalinaCluster */ public void setCluster(CatalinaCluster cluster); /** * Descriptive information about this component implementation. */ public String info = "ClusterDeployer/1.0"; /** * Start the cluster deployer, the owning container will invoke this * @throws Exception - if failure to start cluster */ public void start() throws Exception; /** * Stops the cluster deployer, the owning container will invoke this * @throws LifecycleException */ public void stop() throws LifecycleException; /** * Sets the deployer for this cluster deployer to use. * @param deployer Deployer */ public void setDeployer(Deployer deployer); /** * Install a new web application, whose web application archive is at the * specified URL, into this container and all the other * members of the cluster with the specified context path. * A context path of "" (the empty string) should be used for the root * application for this container. Otherwise, the context path must * start with a slash. * <p> * If this application is successfully installed locally, * a ContainerEvent of type * <code>INSTALL_EVENT</code> will be sent to all registered listeners, * with the newly created <code>Context</code> as an argument. * * @param contextPath The context path to which this application should * be installed (must be unique) * @param war A URL of type "jar:" that points to a WAR file, or type * "file:" that points to an unpacked directory structure containing * the web application to be installed * * @exception IllegalArgumentException if the specified context path * is malformed (it must be "" or start with a slash) * @exception IllegalStateException if the specified context path * is already attached to an existing web application * @exception IOException if an input/output error was encountered * during installation */ public void install(String contextPath, URL war) throws IOException; /** * Remove an existing web application, attached to the specified context * path. If this application is successfully removed, a * ContainerEvent of type <code>REMOVE_EVENT</code> will be sent to all * registered listeners, with the removed <code>Context</code> as * an argument. Deletes the web application war file and/or directory * if they exist in the Host's appBase. * * @param contextPath The context path of the application to be removed * @param undeploy boolean flag to remove web application from server * * @exception IllegalArgumentException if the specified context path * is malformed (it must be "" or start with a slash) * @exception IllegalArgumentException if the specified context path does * not identify a currently installed web application * @exception IOException if an input/output error occurs during * removal */ public void remove(String contextPath, boolean undeploy) throws IOException; } 1.1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/deploy/FarmWarDeployer.java Index: FarmWarDeployer.java =================================================================== /* * Copyright 1999,2004 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.catalina.cluster.deploy; import org.apache.catalina.cluster.ClusterDeployer; import org.apache.catalina.cluster.ClusterMessage; import org.apache.catalina.cluster.CatalinaCluster; import org.apache.catalina.LifecycleException; import org.apache.catalina.Deployer; import java.io.File; import java.net.URL; import java.io.IOException; import org.apache.catalina.cluster.Member; import java.util.HashMap; /** * <p> * A farm war deployer is a class that is able to * deploy/undeploy web applications in WAR form * within the cluster.</p> * Any host can act as the admin, and will have three directories * <ul> * <li> deployDir - the directory where we watch for changes</li> * <li> applicationDir - the directory where we install applications</li> * <li> tempDir - a temporaryDirectory to store binary data when downloading a war * from the cluster </li> * </ul> * Currently we only support deployment of WAR files since they are easier to send * across the wire. * * @author Filip Hanik * @version 1.0 */ public class FarmWarDeployer implements ClusterDeployer, FileChangeListener { /*--Static Variables----------------------------------------*/ public static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(FarmWarDeployer.class); /*--Instance Variables--------------------------------------*/ protected CatalinaCluster cluster = null; protected Deployer deployer = null; protected boolean started = false; //default 5 seconds protected HashMap fileFactories = new HashMap(); protected String deployDir; protected String tempDir; protected String watchDir; protected boolean watchEnabled = false; protected WarWatcher watcher = null; /*--Constructor---------------------------------------------*/ public FarmWarDeployer() { } /*--Logic---------------------------------------------------*/ public void start() throws Exception { if (started)return; getCluster().addClusterListener(this); if (watchEnabled) { watcher = new WarWatcher(this, new File(getWatchDir()), (long) 5000); Thread t = new Thread(watcher); t.start(); log.info("Cluster deployment is watching " + getWatchDir() + " for changes."); } //end if started = true; log.info("Cluster FarmWarDeployer started."); } public void stop() throws LifecycleException { started = false; getCluster().removeClusterListener(this); if (watcher != null) watcher.stop(); log.info("Cluster FarmWarDeployer stopped."); } public void cleanDeployDir() { throw new java.lang.UnsupportedOperationException( "Method cleanDeployDir() not yet implemented."); } /** * Callback from the cluster, when a message is received, * The cluster will broadcast it invoking the messageReceived * on the receiver. * @param msg ClusterMessage - the message received from the cluster */ public void messageReceived(ClusterMessage msg) { try { if (msg instanceof FileMessage && msg != null) { FileMessage fmsg = (FileMessage) msg; FileMessageFactory factory = getFactory(fmsg); if (factory.writeMessage(fmsg)) { //last message received String name = factory.getFile().getName(); if (!name.endsWith(".war")) name = name + ".war"; File deployable = new File(getDeployDir(), name); factory.getFile().renameTo(deployable); try { if (getDeployer().findDeployedApp(fmsg.getContextPath()) != null) getDeployer().remove(fmsg.getContextPath(), true); } catch (Exception x) { log.info( "Error removing existing context before installing a new one.", x); } getDeployer().install(fmsg.getContextPath(), deployable.toURL()); removeFactory(fmsg); } //end if } else if (msg instanceof UndeployMessage && msg != null) { UndeployMessage umsg = (UndeployMessage) msg; if (getDeployer().findDeployedApp(umsg.getContextPath()) != null) getDeployer().remove(umsg.getContextPath(), umsg.getUndeploy()); } //end if } catch (java.io.IOException x) { log.error("Unable to read farm deploy file message.", x); } } public synchronized FileMessageFactory getFactory(FileMessage msg) throws java.io.FileNotFoundException, java.io.IOException { File tmpFile = new File(msg.getFileName()); File writeToFile = new File(getTempDir(), tmpFile.getName()); FileMessageFactory factory = (FileMessageFactory) fileFactories.get(msg. getFileName()); if (factory == null) { factory = FileMessageFactory.getInstance(writeToFile, true); fileFactories.put(msg.getFileName(), factory); } return factory; } public void removeFactory(FileMessage msg) { fileFactories.remove(msg.getFileName()); } /** * Before the cluster invokes messageReceived the * cluster will ask the receiver to accept or decline the message, * In the future, when messages get big, the accept method will only take * a message header * @param msg ClusterMessage * @return boolean - returns true to indicate that messageReceived * should be invoked. If false is returned, the messageReceived method * will not be invoked. */ public boolean accept(ClusterMessage msg) { return (msg instanceof FileMessage) || (msg instanceof UndeployMessage); } /** * Install a new web application, whose web application archive is at the * specified URL, into this container and all the other * members of the cluster with the specified context path. * A context path of "" (the empty string) should be used for the root * application for this container. Otherwise, the context path must * start with a slash. * <p> * If this application is successfully installed locally, * a ContainerEvent of type * <code>INSTALL_EVENT</code> will be sent to all registered listeners, * with the newly created <code>Context</code> as an argument. * * @param contextPath The context path to which this application should * be installed (must be unique) * @param war A URL of type "jar:" that points to a WAR file, or type * "file:" that points to an unpacked directory structure containing * the web application to be installed * * @exception IllegalArgumentException if the specified context path * is malformed (it must be "" or start with a slash) * @exception IllegalStateException if the specified context path * is already attached to an existing web application * @exception IOException if an input/output error was encountered * during installation */ public void install(String contextPath, URL war) throws IOException { if (getDeployer().findDeployedApp(contextPath) != null) getDeployer().remove(contextPath, true); //step 1. Install it locally getDeployer().install(contextPath, war); //step 2. Send it to each member in the cluster Member[] members = getCluster().getMembers(); Member localMember = getCluster().getLocalMember(); FileMessageFactory factory = FileMessageFactory.getInstance(new File( war.getFile()), false); FileMessage msg = new FileMessage(localMember, war.getFile(), contextPath); msg = factory.readMessage(msg); while (msg != null) { for (int i = 0; i < members.length; i++) { getCluster().send(msg, members[i]); } //for msg = factory.readMessage(msg); } //while } /** * Remove an existing web application, attached to the specified context * path. If this application is successfully removed, a * ContainerEvent of type <code>REMOVE_EVENT</code> will be sent to all * registered listeners, with the removed <code>Context</code> as * an argument. Deletes the web application war file and/or directory * if they exist in the Host's appBase. * * @param contextPath The context path of the application to be removed * @param undeploy boolean flag to remove web application from server * * @exception IllegalArgumentException if the specified context path * is malformed (it must be "" or start with a slash) * @exception IllegalArgumentException if the specified context path does * not identify a currently installed web application * @exception IOException if an input/output error occurs during * removal */ public void remove(String contextPath, boolean undeploy) throws IOException { log.info("Cluster wide remove of web app " + contextPath); //step 1. Remove it locally if (getDeployer().findDeployedApp(contextPath) != null) getDeployer().remove(contextPath, undeploy); //step 2. Send it to each member in the cluster Member[] members = getCluster().getMembers(); Member localMember = getCluster().getLocalMember(); UndeployMessage msg = new UndeployMessage(localMember, System.currentTimeMillis(), "Undeploy:" + contextPath + ":" + System.currentTimeMillis(), contextPath, undeploy); cluster.send(msg); } public void fileModified(File newWar) { try { File deployWar = new File(getDeployDir(),newWar.getName()); copy(newWar,deployWar); String contextName = "/" + deployWar.getName().substring(0, deployWar.getName().lastIndexOf(".war")); log.info("Installing webapp[" + contextName + "] from " + deployWar.getAbsolutePath()); try { remove(contextName, true); } catch (Exception x) { log.error("No removal", x); } install(contextName, deployWar.toURL()); } catch (Exception x) { log.error("Unable to install WAR file", x); } } public void fileRemoved(File removeWar) { try { String contextName = "/" + removeWar.getName().substring(0, removeWar.getName().lastIndexOf(".war")); log.info("Removing webapp[" + contextName + "]"); remove(contextName, true); } catch (Exception x) { log.error("Unable to remove WAR file", x); } } /*--Instance Getters/Setters--------------------------------*/ public CatalinaCluster getCluster() { return cluster; } public void setCluster(CatalinaCluster cluster) { this.cluster = cluster; } public void setDeployer(Deployer deployer) { this.deployer = deployer; } public boolean equals(Object listener) { return super.equals(listener); } public int hashCode() { return super.hashCode(); } public String getDeployDir() { return deployDir; } public void setDeployDir(String deployDir) { this.deployDir = deployDir; } public String getTempDir() { return tempDir; } public void setTempDir(String tempDir) { this.tempDir = tempDir; } public Deployer getDeployer() { return deployer; } public String getWatchDir() { return watchDir; } public void setWatchDir(String watchDir) { this.watchDir = watchDir; } public boolean isWatchEnabled() { return watchEnabled; } public boolean getWatchEnabled() { return watchEnabled; } public void setWatchEnabled(boolean watchEnabled) { this.watchEnabled = watchEnabled; } /** /** * Copy a file to the specified temp directory. This is required only * because Jasper depends on it. */ private boolean copy(File from, File to) { try { if ( !to.exists() ) to.createNewFile(); java.io.FileInputStream is = new java.io.FileInputStream(from); java.io.FileOutputStream os = new java.io.FileOutputStream(to,false); byte[] buf = new byte[4096]; while (true) { int len = is.read(buf); if (len < 0) break; os.write(buf, 0, len); } is.close(); os.close(); } catch (IOException e) { log.error("Unable to copy file from:"+from+" to:"+to,e); return false; } return true; } } 1.1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/deploy/FileChangeListener.java Index: FileChangeListener.java =================================================================== /* * Copyright 1999,2004 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.catalina.cluster.deploy; import java.io.File; public interface FileChangeListener { public void fileModified(File f); public void fileRemoved(File f); } 1.1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/deploy/FileMessage.java Index: FileMessage.java =================================================================== /* * Copyright 1999,2004 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.catalina.cluster.deploy; import org.apache.catalina.cluster.ClusterMessage; import org.apache.catalina.cluster.Member; import java.io.Externalizable; import java.io.Serializable; import java.io.IOException; import java.io.ObjectOutput; import java.io.ObjectInput; /** * Contains the data for a file being transferred over TCP, this is * essentially a fragment of a file, read and written by the FileMessageFactory * @author Filip Hanik * @version 1.0 */ public class FileMessage implements ClusterMessage, Serializable { private int messageNumber; private byte[] data; private int dataLength; private org.apache.catalina.cluster.Member address; private long timestamp; private long totalLength; private long totalNrOfMsgs; private String fileName; private String contextPath; public FileMessage(Member source, String fileName, String contextPath) { this.address=source; this.fileName=fileName; this.contextPath=contextPath; } /* public void writeExternal(ObjectOutput out) throws IOException { } public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { } */ public int getMessageNumber() { return messageNumber; } public void setMessageNumber(int messageNumber) { this.messageNumber = messageNumber; } public long getTotalNrOfMsgs() { return totalNrOfMsgs; } public void setTotalNrOfMsgs(long totalNrOfMsgs) { this.totalNrOfMsgs = totalNrOfMsgs; } public byte[] getData() { return data; } public void setData(byte[] data, int length) { this.data = data; this.dataLength = length; } public int getDataLength() { return dataLength; } public void setDataLength(int dataLength) { this.dataLength = dataLength; } public long getTotalLength() { return totalLength; } public void setTotalLength(long totalLength) { this.totalLength = totalLength; } public org.apache.catalina.cluster.Member getAddress() { return address; } public void setAddress(org.apache.catalina.cluster.Member address) { this.address = address; } public String getUniqueId() { StringBuffer result = new StringBuffer(getFileName()); result.append("#-#"); result.append(getMessageNumber()); result.append("#-#"); result.append(System.currentTimeMillis()); return result.toString(); } public long getTimestamp() { return timestamp; } public void setTimestamp(long timestamp) { this.timestamp = timestamp; } public String getFileName() { return fileName; } public void setFileName(String fileName) { this.fileName = fileName; } public String getContextPath() { return contextPath; } } 1.1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/deploy/FileMessageFactory.java Index: FileMessageFactory.java =================================================================== /* * Copyright 1999,2004 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.catalina.cluster.deploy; import java.io.File; import java.io.IOException; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.FileNotFoundException; /** * This factory is used to read files and write files by splitting them up into smaller * messages. So that entire files don't have to be read into memory.<BR> * The factory can be used as a reader or writer but not both at the same time. * When done reading or writing the factory will close the input or output streams * and mark the factory as closed. It is not possible to use it after that.<BR> * To force a cleanup, call cleanup() from the calling object.<BR> * This class is not thread safe. * @author Filip Hanik * @version 1.0 */ public class FileMessageFactory { /** * The number of bytes that we read from file */ public static final int READ_SIZE = 1024*10; //10kb /** * The file that we are reading/writing */ protected File file = null; /** * True means that we are writing with this factory. * False means that we are reading with this factory */ protected boolean openForWrite; /** * Once the factory is used, it can not be reused. */ protected boolean closed = false; /** * When openForWrite=false, the input stream * is held by this variable */ protected FileInputStream in; /** * When openForWrite=true, the output stream * is held by this variable */ protected FileOutputStream out; /** * The number of messages we have read or written */ protected int nrOfMessagesProcessed = 0; /** * The total size of the file */ protected long size = 0; /** * The total number of packets that we split this file into */ protected long totalNrOfMessages = 0; /** * The bytes that we hold the data in, not thread safe. */ protected byte[] data = new byte[READ_SIZE]; /** * Private constructor, either instantiates a factory to read or write.<BR> * When openForWrite==true, then a the file, f, will be created and an output * stream is opened to write to it.<BR> * When openForWrite==false, an input stream is opened, the file has to exist. * @param f File - the file to be read/written * @param openForWrite boolean - true means we are writing to the file, false * means we are reading from the file * @throws FileNotFoundException - if the file to be read doesn't exist * @throws IOException - if the system fails to open input/output streams to the file * or if it fails to create the file to be written to. */ private FileMessageFactory(File f, boolean openForWrite) throws FileNotFoundException, IOException{ this.file = f; this.openForWrite = openForWrite; if ( openForWrite ) { if (!file.exists()) file.createNewFile(); out = new FileOutputStream(f); } else { size = file.length(); totalNrOfMessages = (size / READ_SIZE) + 1; in = new FileInputStream(f); }//end if } /** * Creates a factory to read or write from a file. * When opening for read, the readMessage can be invoked, and when * opening for write the writeMessage can be invoked. * @param f File - the file to be read or written * @param openForWrite boolean - true, means we are writing to the file, false means we are * reading from it * @throws FileNotFoundException - if the file to be read doesn't exist * @throws IOException - if it fails to create the file that is to be written * @return FileMessageFactory */ public static FileMessageFactory getInstance(File f, boolean openForWrite) throws FileNotFoundException, IOException { return new FileMessageFactory(f,openForWrite); } /** * Reads file data into the file message and sets the * size, totalLength, totalNrOfMsgs and the message number<BR> * If EOF is reached, the factory returns null, and closes itself, * otherwise the same message is returned as was passed in. * This makes sure that not more memory is ever used. * To remember, neither the file message or the factory are thread safe. * dont hand off the message to one thread and read the same with another. * @param f FileMessage - the message to be populated with file data * @throws IllegalArgumentException - if the factory is for writing or is closed * @throws IOException - if a file read exception occurs * @return FileMessage - returns the same message passed in as a parameter, or null if EOF */ public FileMessage readMessage(FileMessage f) throws IllegalArgumentException, IOException { checkState(false); int length = in.read(data); if ( length == -1 ) { cleanup(); return null; } else { f.setData(data, length); f.setTotalLength(size); f.setTotalNrOfMsgs(totalNrOfMessages); f.setMessageNumber(++nrOfMessagesProcessed); return f; }//end if } /** * Writes a message to file. If (msg.getMessageNumber() == msg.getTotalNrOfMsgs()) * the output stream will be closed after writing. * @param msg FileMessage - message containing data to be written * @throws IllegalArgumentException - if the factory is opened for read or closed * @throws IOException - if a file write error occurs * @return returns true if the file is complete and outputstream is closed, false otherwise. */ public boolean writeMessage(FileMessage msg) throws IllegalArgumentException, IOException { if ( !openForWrite ) throw new IllegalArgumentException("Can't write message, this factory is reading."); out.write(msg.getData(),0,msg.getDataLength()); nrOfMessagesProcessed++; out.flush(); if ( msg.getMessageNumber() == msg.getTotalNrOfMsgs() ) { out.close(); cleanup(); return true; }//end if return false; }//writeMessage /** * Closes the factory, its streams and sets all its references to null */ public void cleanup() { if ( in != null ) try { in.close(); } catch ( Exception ignore ){} if ( out != null ) try { out.close(); } catch ( Exception ignore ){} in = null; out = null; size = 0; closed = true; data = null; nrOfMessagesProcessed = 0; totalNrOfMessages = 0; } /** * Check to make sure the factory is able to perform the * function it is asked to do. Invoked by readMessage/writeMessage before * those methods proceed. * @param openForWrite boolean * @throws IllegalArgumentException */ protected void checkState(boolean openForWrite) throws IllegalArgumentException { if ( this.openForWrite != openForWrite ) { cleanup(); if ( openForWrite ) throw new IllegalArgumentException("Can't write message, this factory is reading."); else throw new IllegalArgumentException("Can't read message, this factory is writing."); } if ( this.closed ) { cleanup(); throw new IllegalArgumentException("Factory has been closed."); } } /** * Example usage. * @param args String[], args[0] - read from filename, args[1] write to filename * @throws Exception */ public static void main(String[] args) throws Exception { System.out.println("Usage: FileMessageFactory fileToBeRead fileToBeWritten"); System.out.println("Usage: This will make a copy of the file on the local file system"); FileMessageFactory read = getInstance(new File(args[0]),false); FileMessageFactory write = getInstance(new File(args[1]),true); FileMessage msg = new FileMessage(null,args[0],args[0]); msg = read.readMessage(msg); System.out.println("Expecting to write " + msg.getTotalNrOfMsgs()+" messages."); int cnt = 0; while ( msg != null ) { write.writeMessage(msg); cnt++; msg = read.readMessage(msg); }//while System.out.println("Actually wrote " + cnt+" messages."); }///main public File getFile() { return file; } } 1.1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/deploy/UndeployMessage.java Index: UndeployMessage.java =================================================================== /* * Copyright 1999,2004 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.catalina.cluster.deploy; import org.apache.catalina.cluster.ClusterMessage; import org.apache.catalina.cluster.Member; import java.io.Serializable; public class UndeployMessage implements ClusterMessage,Serializable { private Member address; private long timestamp; private String uniqueId; private String contextPath; private boolean undeploy; public UndeployMessage() {} //for serialization public UndeployMessage(Member address, long timestamp, String uniqueId, String contextPath, boolean undeploy) { this.address = address; this.timestamp= timestamp; this.undeploy = undeploy; this.uniqueId = uniqueId; this.undeploy = undeploy; this.contextPath = contextPath; } public Member getAddress() { return address; } public void setAddress(Member address) { this.address = address; } public long getTimestamp() { return timestamp; } public void setTimestamp(long timestamp) { this.timestamp = timestamp; } public String getUniqueId() { return uniqueId; } public void setUniqueId(String uniqueId) { this.uniqueId = uniqueId; } public String getContextPath() { return contextPath; } public void setContextPath(String contextPath) { this.contextPath = contextPath; } public boolean getUndeploy() { return undeploy; } public void setUndeploy(boolean undeploy) { this.undeploy = undeploy; } } 1.1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/deploy/WarWatcher.java Index: WarWatcher.java =================================================================== /* * Copyright 1999,2004 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.catalina.cluster.deploy; import java.io.File; import java.util.HashMap; import java.util.Map; import java.util.Iterator; /** * <p>The <b>WarWatcher</b> watches the deployDir for changes made to * the directory (adding new WAR files->deploy or remove WAR files->undeploy) * And notifies a listener of the changes made</p> * * @author Filip Hanik * @version 1.0 */ public class WarWatcher implements Runnable { /*--Static Variables----------------------------------------*/ public static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( WarWatcher.class ); /*--Instance Variables--------------------------------------*/ /** * Directory to watch for war files */ protected File deployDir = null; /** * Parent to be notified of changes */ protected FileChangeListener listener = null; /** * * Check interval */ protected long interval = 5000; //5 seconds /** * Run status */ protected boolean alive = true; /** * Currently deployed files */ protected Map currentStatus = new HashMap(); /*--Constructor---------------------------------------------*/ public WarWatcher(FileChangeListener listener, File deployDir, long interval) { this.listener = listener; this.deployDir = deployDir; this.interval = interval; } /*--Logic---------------------------------------------------*/ public void run() { while ( alive ) { try { File[] list = deployDir.listFiles(new WarFilter()); //first make sure all the files are listed in our current status for ( int i=0; i<list.length; i++ ) { addWarInfo(list[i]); }//for //check all the status codes and update the FarmDeployer for ( Iterator i=currentStatus.entrySet().iterator(); i.hasNext();) { Map.Entry entry = (Map.Entry)i.next(); WarInfo info = (WarInfo)entry.getValue(); int check = info.check(); if ( check == 1 ) { listener.fileModified(info.getWar()); } else if ( check == -1 ) { listener.fileRemoved(info.getWar()); //no need to keep in memory currentStatus.remove(info.getWar()); }//end if }//for //sleep for the set interval Thread.sleep(interval); } catch ( Exception x ) { log.error("Unable to watch for WAR deployments in cluster.",x); } }//while }//run protected void addWarInfo(File f) { WarInfo info = (WarInfo)currentStatus.get(f.getAbsolutePath()); if ( info == null ) { info = new WarInfo(f); info.setLastState(-1); //assume file is non existent currentStatus.put(f.getAbsolutePath(),info); } } public void stop() { alive = false; currentStatus.clear(); listener = null; } /*--Inner classes-------------------------------------------*/ /** * File name filter for war files */ protected class WarFilter implements java.io.FilenameFilter { public boolean accept(File path, String name) { if ( name == null ) return false; return name.endsWith(".war"); } } /** * File information on existing WAR files */ protected class WarInfo { protected File war = null; protected long lastChecked = 0; protected long lastState = 0; public WarInfo(File war) { this.war = war; this.lastChecked = war.lastModified(); if ( !war.exists() ) lastState = -1; } public boolean modified() { return war.exists() && war.lastModified() > lastChecked; } public boolean exists() { return war.exists(); } /** * Returns * 1 if the file has been added/modified, * 0 if the file is unchanged and * -1 if the file has been removed * @return int 1=file added; 0=unchanged; -1=file removed */ public int check() { //file unchanged by default int result = 0; if ( modified() ) { //file has changed - timestamp result = 1; lastState = result; } else if ( (!exists()) && (!(lastState==-1)) ) { //file was removed result = -1; lastState = result; } else if ( (lastState==-1) && exists() ) { //file was added result = 1; lastState = result; } this.lastChecked = System.currentTimeMillis(); return result; } public File getWar() { return war; } public int hashCode() { return war.getAbsolutePath().hashCode(); } public boolean equals(Object other) { if ( other instanceof WarInfo ) { WarInfo wo = (WarInfo)other; return wo.getWar().equals(getWar()); } else { return false; } } protected void setLastState(int lastState) { this.lastState = lastState; } } } 1.27 +2 -1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java Index: DeltaManager.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java,v retrieving revision 1.26 retrieving revision 1.27 diff -u -r1.26 -r1.27 --- DeltaManager.java 29 May 2004 02:43:58 -0000 1.26 +++ DeltaManager.java 4 Jun 2004 20:22:27 -0000 1.27 @@ -718,6 +718,7 @@ if( initialized ) { destroy(); } + getCluster().removeManager(getName()); } 1.29 +1 -0 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/SimpleTcpReplicationManager.java Index: SimpleTcpReplicationManager.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/SimpleTcpReplicationManager.java,v retrieving revision 1.28 retrieving revision 1.29 diff -u -r1.28 -r1.29 --- SimpleTcpReplicationManager.java 2 Jun 2004 14:10:01 -0000 1.28 +++ SimpleTcpReplicationManager.java 4 Jun 2004 20:22:27 -0000 1.29 @@ -484,6 +484,7 @@ //stop the javagroup channel try { + cluster.removeManager(getName()); // mReplicationListener.stopListening(); // mReplicationTransmitter.stop(); // service.stop(); 1.41 +75 -5 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java Index: SimpleTcpCluster.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java,v retrieving revision 1.40 retrieving revision 1.41 diff -u -r1.40 -r1.41 --- SimpleTcpCluster.java 29 May 2004 02:36:12 -0000 1.40 +++ SimpleTcpCluster.java 4 Jun 2004 20:22:27 -0000 1.41 @@ -22,6 +22,7 @@ import java.net.UnknownHostException; import java.io.IOException; import java.util.HashMap; +import java.util.Vector; import org.apache.catalina.ServerFactory; import org.apache.catalina.core.StandardServer; @@ -56,6 +57,8 @@ import org.apache.catalina.cluster.Constants; import org.apache.catalina.cluster.ClusterReceiver; import org.apache.catalina.cluster.ClusterSender; +import org.apache.catalina.cluster.ClusterDeployer; + import org.apache.commons.logging.Log; @@ -184,6 +187,12 @@ */ private org.apache.catalina.cluster.ClusterReceiver clusterReceiver; private org.apache.catalina.Valve valve; + private org.apache.catalina.cluster.ClusterDeployer clusterDeployer; + + /** + * Listeners of messages + */ + protected Vector clusterListeners = new Vector(); // ------------------------------------------------------------- Properties @@ -291,6 +300,15 @@ return members; } + /** + * Return the member that represents this node. + * @return Member + */ + public Member getLocalMember() { + return membershipService.getLocalMember(); + } + + @@ -299,6 +317,7 @@ public synchronized Manager createManager(String name) { log.debug("Creating ClusterManager for context "+name + " using class "+getManagerClassName()); + System.out.println("\n\n\n\nCreating ClusterManager for context "+name + " using class "+getManagerClassName()+"\n\n\n\n"); ClusterManager manager = null; try { manager = (ClusterManager)getClass().getClassLoader().loadClass(getManagerClassName()).newInstance(); @@ -312,8 +331,18 @@ manager.setExpireSessionsOnShutdown(expireSessionsOnShutdown); manager.setUseDirtyFlag(useDirtyFlag); managers.put(name,manager); + return manager; } + + public void removeManager(String name) { + managers.remove(name); + } + + public Manager getManager(String name) { + return (Manager)managers.get(name); + } + // ------------------------------------------------------ Lifecycle Methods @@ -378,6 +407,22 @@ membershipService.setLocalMemberProperties(clusterReceiver.getHost(),clusterReceiver.getPort()); membershipService.addMembershipListener(this); membershipService.start(); + //set the deployer. + try { + if ( clusterDeployer != null ) { + clusterDeployer.setCluster(this); + Object deployer = MethodUtils.invokeMethod( + getContainer(), + "getDeployer", + new Object[0], + new Class[0]); + clusterDeployer.setDeployer( (org.apache.catalina.Deployer) + deployer); + clusterDeployer.start(); + } + } catch (Throwable x) { + log.fatal("Unable to retrieve the container deployer. Cluster deployment disabled.",x); + } //catch this.started = true; } catch ( Exception x ) { log.error("Unable to start cluster.",x); @@ -455,6 +500,9 @@ } catch (Exception x ) { log.error("Unable to stop cluster receiver.",x); } + if ( clusterDeployer != null ) { + clusterDeployer.stop(); + } started = false; } @@ -529,8 +577,19 @@ else log.warn("Context manager doesn't exist:" + ctxname); }//end if - } else - log.warn("Received invalid message myobj="+myobj); + } else { + //invoke all the listeners + for ( int i=0; i<clusterListeners.size(); i++ ) { + MessageListener listener = (MessageListener)clusterListeners.elementAt(i); + if ( myobj!=null && + myobj instanceof ClusterMessage && + listener.accept((ClusterMessage)myobj) ) { + listener.messageReceived((ClusterMessage)myobj); + }//end if + + }//for + }//end if + } catch ( Exception x ) { log.error("Unable to deserialize session message.",x); } @@ -657,12 +716,23 @@ } public void addClusterListener(MessageListener listener) { - //TO DO + if ( !clusterListeners.contains(listener) ) { + clusterListeners.addElement(listener); + } } public void removeClusterListener(MessageListener listener) { - //TO DO + clusterListeners.removeElement(listener); + } + public org.apache.catalina.cluster.ClusterDeployer getClusterDeployer() { + return clusterDeployer; + } + public void setClusterDeployer(org.apache.catalina.cluster.ClusterDeployer clusterDeployer) { + this.clusterDeployer = clusterDeployer; } + + + private class MemberComparator implements java.util.Comparator {
--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]