On 5/18/06, Nicolas <[email protected]> wrote:
Looks like it's coming from a deleted node in the repository, but the
workflow engine does not know about it.
Hi Nicolas,
very interesting : how are the two logs interrelated, are they taken
from the same log file ?
You're saying that this doesn't seem to affect the engine's operation,
aren't you ?
I'd love to reimplement the contentIterator of the JCRExpressionStore.
Under very heavy load (lots of processes dehydrated), it might break.
See the attached FileExpressionStore.java (sorry, sf.net's webSvn is
down), it contains a StoreIterator which doesn't fill up the memory
when iterating over the content of the pool. I think it's no big deal
adapting that to the JCRExpressionStore as this store concepts are
pretty much the same as the ones of the FileExpressionStore.
WDYT ?
John
/*
* Copyright (c) 2005, John Mettraux, OpenWFE.org
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* . Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* . Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* . Neither the name of the "OpenWFE" nor the names of its contributors may be
* used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* $Id: FileExpressionStore.java 2510 2006-04-25 19:16:54Z jmettraux $
*/
//
// FileExpressionStore.java
//
// [EMAIL PROTECTED]
//
// generated with
// jtmpl 1.0.04 20.11.2001 John Mettraux ([EMAIL PROTECTED])
//
package openwfe.org.engine.impl.expool;
import openwfe.org.Utils;
import openwfe.org.MapUtils;
import openwfe.org.ApplicationContext;
import openwfe.org.ServiceException;
import openwfe.org.xml.XmlUtils;
import openwfe.org.time.Time;
import openwfe.org.misc.DirectoryFilter;
import openwfe.org.engine.expool.PoolException;
import openwfe.org.engine.expressions.EngineEnvironmentId;
import openwfe.org.engine.expressions.ReplyException;
import openwfe.org.engine.expressions.FlowExpression;
import openwfe.org.engine.expressions.FlowExpressionId;
import openwfe.org.engine.expressions.ExpressionWithTimeOut;
/**
* The FileExpressionStore focuses on storing expression in the filesystem.
* It's an abstract class, as extending classes (XmlExpressionStore,
* SerialExpressionStore) will define how workitems are stored in their
* respective files. This abstract class simply tells where and in which
* file [name].
*
* <p><font size=2>CVS Info :
* <br>$Author: jmettraux $
* <br>$Date: 2006-04-25 21:16:54 +0200 (Tue, 25 Apr 2006) $
* <br>$Id: FileExpressionStore.java 2510 2006-04-25 19:16:54Z jmettraux $ </font>
*
* @author [EMAIL PROTECTED]
*/
public abstract class FileExpressionStore
extends AbstractExpressionStore
{
private final static org.apache.log4j.Logger log = org.apache.log4j.Logger
.getLogger(FileExpressionStore.class.getName());
//
// CONSTANTS (definitions)
/**
* Use this parameter to indicate to a FileExpressionStore in which
* directory it should save its run data.
*/
public static String P_WORK_DIRECTORY
= "workDirectory";
/**
* If there is not parameter "workDirectory" in this service configuration,
* expressions will be stored in "work/engine/pool".
*/
public final static String DEFAULT_WORK_DIRECTORY
= "work/engine/pool";
//
// FIELDS
private String workDirectory = null;
private String engineEnvExpressionPath = null;
//
// CONSTRUCTORS
public void init
(final String serviceName,
final ApplicationContext context,
final java.util.Map serviceParams)
throws
ServiceException
{
super.init(serviceName, context, serviceParams);
//
// determine workDirectory
//this.workDirectory = (String)serviceParams.get(P_WORK_DIRECTORY);
this.workDirectory = MapUtils.getAsString
(serviceParams, P_WORK_DIRECTORY, DEFAULT_WORK_DIRECTORY);
this.workDirectory = Utils.getCanonicalPath
(getContext().getApplicationDirectory(),
this.workDirectory);
this.workDirectory += java.io.File.separator;
this.engineEnvExpressionPath =
this.workDirectory + "engine-env-expression.xml";
log.info("init() workDirectory set to "+this.workDirectory);
}
//
// ABSTRACT METHODS
protected abstract void saveExpression (String fileName, FlowExpression fe)
throws Exception;
protected abstract FlowExpression loadExpression (String fileName)
throws Exception;
//
// METHODS
/**
* Stores an expression to a file in the pool directory.
*/
public void storeExpression (final FlowExpression fe)
throws PoolException
{
log.debug("storeExpression() for "+fe.getId());
final long start = System.currentTimeMillis();
try
{
//
// does WFD directory exist ?
final java.io.File dir =
new java.io.File(determineDirName(fe.getId()));
if ( ! dir.exists()) dir.mkdirs();
//
// store expression
final String fileName = determineFileName(fe.getId());
// some debug output (remove it when delete() problem is solved)
//
//if ((new java.io.File(fileName)).exists())
//{
// log.debug
// ("storeExpression() "+fileName+" already exists.");
//}
saveExpression(fileName, fe);
}
catch (final Exception e)
{
throw new PoolException
("Failed to store expression", e);
}
final long duration = System.currentTimeMillis() - start;
log.debug("storeExpression() took "+duration+" ms");
}
/**
* Removes an expression (deletes its file) from the pool directory.
*/
public void unstoreExpression (final FlowExpression fe)
throws PoolException
{
//
// remove expression
final String fileName = determineFileName(fe.getId());
log.debug("unstoreExpression() deleting file "+fileName);
java.io.File f = new java.io.File(fileName);
boolean result = f.delete();
if (result == false) log.debug("unstoreExpression() delete failed");
log.debug("unstoreExpression() unstored "+fe.getId());
}
/**
* Loads (deserializes) an expression from the pool directory.
*/
public FlowExpression loadExpression (final FlowExpressionId fei)
throws PoolException
{
log.debug("loadExpression() requested for "+fei);
long start = System.currentTimeMillis();
String fileName = determineFileName(fei);
log.debug("loadExpression() fileName is "+fileName);
FlowExpression result = null;
try
{
result = loadExpression(fileName);
//result.setApplicationContext(this.applicationContext);
}
catch (final Exception e)
{
//log.debug
// ("Failed to load expression", e);
throw new PoolException
("Failed to load expression", e);
}
long duration = System.currentTimeMillis() - start;
log.debug("loadExpression() took "+duration+" ms");
return result;
}
//
// file name determination
private String getLastDigit (final FlowExpressionId fei)
{
final String wfid = fei.getParentWorkflowInstanceId();
return wfid.substring(wfid.length()-1);
}
/**
* Determines in which dir an expression should be stored.
*/
protected String determineDirName (final FlowExpressionId fei)
{
//log.debug("determineDirName() fei is "+fei);
if (EngineEnvironmentId.ID.equals(fei))
return this.workDirectory;
StringBuffer sb = new StringBuffer();
sb.append(this.workDirectory);
sb.append(Utils.ensureForFileName(fei.getWorkflowDefinitionName()));
sb.append("--");
sb.append(Utils.ensureForFileName(fei.getWorkflowDefinitionRevision()));
sb.append(java.io.File.separator);
sb.append(getLastDigit(fei));
sb.append(java.io.File.separator);
sb.append(fei.getParentWorkflowInstanceId());
sb.append(java.io.File.separator);
return sb.toString();
}
/**
* Determines under which filename an expression should be stored.
* It returns a full (absolute) path.
*/
protected String determineFileName (final FlowExpressionId fei)
{
if (EngineEnvironmentId.ID.equals(fei))
return this.engineEnvExpressionPath;
StringBuffer sb = new StringBuffer();
sb.append(determineDirName(fei));
sb.append(Utils.ensureForFileName(fei.getInitialEngineId()));
sb.append("--");
sb.append(fei.getWorkflowInstanceId());
sb.append("--");
sb.append(fei.getExpressionName());
sb.append("--");
sb.append(fei.getExpressionId());
sb.append(".xml");
return sb.toString();
}
/**
* This method will check if the dir is empty;
* if yes, it will remove it.
*/
protected boolean cleanedDir (final java.io.File dir)
{
log.debug("cleanedDir() called for "+dir.getPath());
if (dir.list().length > 0) return false;
long lDir = -1;
try
{
lDir = Long.parseLong(dir.getName());
}
catch (final Throwable t)
{
log.debug
("cleanedDir() not a wfid dir >"+dir.getName()+"< skipping");
return true;
}
final long age = System.currentTimeMillis() - lDir;
if (age > 60 * 1000) // 1 minute
{
final boolean b = dir.delete();
if (b)
log.debug("cleanedDir() removed dir : "+dir.getPath());
else
log.debug("cleanedDir() failed to remove dir : "+dir.getPath());
return true;
}
log.debug("cleanedDir() did not remove young dir : "+dir.getPath());
return false;
}
/**
* This method returns an iterator on all the expression stored here
* whose class is assignable from the given 'assignClass'.
* This method is used by daemons checking the content of the store for
* purge and timeout purposes.
*/
public java.util.Iterator contentIterator
(final Class assignClass)
{
return new StoreIterator(assignClass);
}
private boolean isNotDigitDir (final java.io.File f)
{
//if ( ! f.isDirectory()) return true;
//
// already checked by DirectoryFilter...
if (f.getName().length() != 1) return true;
return ( ! Character.isDigit(f.getName().charAt(0)));
}
/**
* Returns the count of expressions stored here.
*/
public int size ()
{
int count = 0;
final java.io.File[] dirs =
new java.io.File(this.workDirectory)
.listFiles(new DirectoryFilter());
for (int i=0; i<dirs.length; i++)
{
//
// roam through each expression (with timeout) file
final java.io.File[] files = dirs[i]
.listFiles(new FilenameFilter(".*\\.xml"));
count += files.length;
}
return count;
}
//
// METHODS from Service
/**
* Status is outputted as a JDOM element. The status is various
* information about a service activities and state.
*/
public org.jdom.Element getStatus ()
{
org.jdom.Element result = new org.jdom.Element(getName());
result.addContent(XmlUtils.getClassElt(this));
result.addContent(XmlUtils.getRevisionElt("$Id: FileExpressionStore.java 2510 2006-04-25 19:16:54Z jmettraux $"));
int totalExpressionCount = 0;
//
// count expressions per flow instance
java.util.Map expressionsPerFlow = new java.util.HashMap();
java.util.Map expressionsPerInstance = new java.util.HashMap();
final java.io.File workDir =
new java.io.File(this.workDirectory);
final java.io.File[] flowDirs =
workDir.listFiles(new DirectoryFilter());
for (int i=0; i<flowDirs.length; i++)
{
final java.io.File flowDir = flowDirs[i];
log.debug("getStatus() considering dir "+flowDir);
final java.io.File[] digitDirs = flowDir
.listFiles(new DirectoryFilter());
for (int j=0; j<digitDirs.length; j++)
{
if (isNotDigitDir(digitDirs[j])) continue;
log.debug("getStatus() considering digitDir "+digitDirs[j]);
final java.io.File[] wfidDirs = digitDirs[j]
.listFiles(new DirectoryFilter());
for (int k=0; k<wfidDirs.length; k++)
{
log.debug("getStatus() considering wfidDir "+wfidDirs[k]);
final java.io.File[] expFiles =
wfidDirs[k].listFiles(new ExpressionFilter());
totalExpressionCount += expFiles.length;
openwfe.org.Utils.inc
(expressionsPerFlow,
flowDir.getName(),
expFiles.length);
for (int l=0; l<expFiles.length; l++)
{
final String fileName = expFiles[l].getName();
final String instanceId = wfidDirs[k].getName();
log.debug("getStatus() considering file "+fileName);
openwfe.org.Utils
.inc(expressionsPerInstance, instanceId);
}
}
}
}
//
// output
org.jdom.Element eFlows = new org.jdom.Element("flows");
java.util.Iterator it = expressionsPerFlow.keySet().iterator();
while (it.hasNext())
{
String flowDef = (String)it.next();
Integer count = (Integer)expressionsPerFlow.get(flowDef);
org.jdom.Element eFlow = new org.jdom.Element("flow");
eFlow.setAttribute("def", flowDef);
eFlow.setAttribute("expressions", count.toString());
eFlows.addContent(eFlow);
}
it = expressionsPerInstance.keySet().iterator();
while (it.hasNext())
{
String flowId = (String)it.next();
Integer count = (Integer)expressionsPerInstance.get(flowId);
org.jdom.Element eFlow = new org.jdom.Element("flow");
eFlow.setAttribute("id", flowId);
eFlow.setAttribute("expressions", count.toString());
eFlows.addContent(eFlow);
}
result.addContent(eFlows);
//
// add total expression count
org.jdom.Element eTotalExpressionCount =
new org.jdom.Element("expressionsStored");
eTotalExpressionCount.addContent
(new org.jdom.Text(""+totalExpressionCount));
result.addContent(eTotalExpressionCount);
//
// the end
return result;
}
/* *
* A debug method : will dump the status of the store in
* $OPENWFE_HOME/logs/store_dump.xml
* /
protected void dumpStatus ()
{
final org.jdom.Document doc = new org.jdom.Document(getStatus());
final String s = openwfe.org.Utils.toString(doc, null);
// 'null' means : use default encoding
final String dumpFileName = getContext()
.getApplicationDirectory()+"logs/store_dump.xml";
try
{
final java.io.BufferedWriter bw = new java.io.BufferedWriter
(new java.io.FileWriter(dumpFileName));
bw.write(s);
bw.flush();
bw.close();
}
catch (final Exception e)
{
log.warn
("Failed to dump to "+dumpFileName, e);
}
}
*/
//
// STATIC METHODS
private static long extractInstanceId (final String fileName)
{
final int i = fileName.indexOf("--");
if (i < 0) return -1;
try
{
return Long.parseLong(fileName.substring(0, i));
}
catch (NumberFormatException nfe)
{
log.warn("failed to parse '"+fileName+"' (til '--') into a long");
}
return -1;
}
//
// INNER CLASSES
private static class ExpressionFilter
implements java.io.FilenameFilter
{
public boolean accept (final java.io.File dir, final String fileName)
{
return fileName.endsWith(".xml");
}
}
private static class FilenameFilter
implements java.io.FilenameFilter
{
private String[] shortNames = null;
public FilenameFilter ()
{
}
public FilenameFilter (final String expressionShortName)
{
this.shortNames = new String[] { expressionShortName };
}
public FilenameFilter (final String[] expressionShortNames)
{
this.shortNames = expressionShortNames;
}
public boolean accept (final java.io.File dir, final String fileName)
{
if ( ! fileName.endsWith(".xml")) return false;
if (this.shortNames == null) return true;
for (int i=0; i<this.shortNames.length; i++)
{
if (fileName.matches(".*--"+this.shortNames[i]+"--.*\\.xml"))
{
return true;
}
}
return false;
}
}
protected class StoreIterator
implements java.util.Iterator
{
private final org.apache.log4j.Logger log =
org.apache.log4j.Logger.getLogger(StoreIterator.class.getName());
//
// FIELDS
private Class assignClass = null;
private Level wfDirLevel = null;
private Level digitDirLevel = null;
private Level wfidDirLevel = null;
private Level expressionLevel = null;
private FlowExpression next = null;
//
// CONSTRUCTORS
public StoreIterator (final Class assignClass)
{
//
// determine short names
this.assignClass = assignClass;
final String[] shortNames =
determineExpressionShortNames(assignClass);
//
// set up levels
this.wfDirLevel =
new Level("wf (0)", null);
this.digitDirLevel =
new Level("digit (1)", wfDirLevel);
this.wfidDirLevel =
new Level("wfid (2)", digitDirLevel);
this.expressionLevel =
new Level("exp (3)", wfidDirLevel, new FilenameFilter(shortNames));
this.wfDirLevel.init
(new java.io.File(FileExpressionStore.this.workDirectory));
this.next = fetchNext();
}
//
// PUBLIC METHODS
public boolean hasNext ()
{
return (this.next != null);
}
private FlowExpression fetchNext ()
{
final java.io.File f = this.expressionLevel.next();
if (f == null) return null;
FlowExpression fe = null;
try
{
fe = loadExpression(f.getPath());
}
catch (final Throwable t)
{
//log.debug("next() problem iterating, skipping... ", t);
log.warn("next() problem iterating, skipping... "+t);
return fetchNext();
}
if ( ! ExpoolUtils.isAssignableFromClass(fe, assignClass))
{
log.warn
("next() expression of class '"+fe.getClass().getName()+
"' not assignable from class '"+assignClass.getName()+"'");
return fetchNext();
}
return fe;
}
public Object next ()
throws java.util.NoSuchElementException
{
final FlowExpression current = this.next;
if (current == null)
throw new java.util.NoSuchElementException();
this.next = fetchNext();
return current;
}
public void remove ()
{
// not necessary.
}
//
// METHODS
//
// INNER CLASSES
protected class Level
{
public String name = null;
public Level previous = null;
public int index = -1;
public java.io.File[] files = null;
public java.io.FileFilter fFilter = null;
public java.io.FilenameFilter fnFilter = null;
public Level
(final String name, final Level previous)
{
this.name = name;
this.previous = previous;
this.fFilter = new DirectoryFilter();
}
public Level
(final String name,
final Level previous,
final java.io.FileFilter filter)
{
this.name = name;
this.previous = previous;
this.fFilter = filter;
}
public Level
(final String name,
final Level previous,
final java.io.FilenameFilter filter)
{
this.name = name;
this.previous = previous;
this.fnFilter = filter;
}
public void init (final java.io.File dir)
{
log.debug
("init() '"+this.name+"' to "+dir.getPath());
this.index = 0;
if (this.fFilter != null)
{
this.files = dir.listFiles(this.fFilter);
}
else if (this.fnFilter != null)
{
this.files = dir.listFiles(this.fnFilter);
}
else
{
this.files = dir.listFiles();
}
if (this.files == null)
{
log.debug
("init() '"+this.name+"' empty dir : "+dir.getPath());
}
else
{
log.debug
("init() '"+this.name+
"' this.files.length is "+this.files.length);
}
}
/**
* Returns the next element, or null if the iteration is over.
*/
public java.io.File next ()
{
if (this.files == null)
{
if (this.previous == null)
//
// this is the 'top level', no previous,
// iteration is over
//
return null;
final java.io.File dir = this.previous.next();
if (dir == null)
//
// iteration over (at next step)
//
return null;
if (cleanedDir(dir))
//
// dir was empty and out of date,
// it has been removed, let's iterate to next dir
return next();
init(dir);
//
// set dir as current item
}
return pnext();
}
/*
* pnext = private next
*/
private java.io.File pnext ()
{
if (this.index < this.files.length)
{
java.io.File f = this.files[this.index];
this.index++;
log.debug
("pnext() '"+this.name+"' f : "+f.getPath());
log.debug
("pnext() '"+this.name+"' f.exists() ? "+f.exists());
if ( ! f.exists()) return pnext();
log.debug
("pnext() '"+this.name+"' returning "+f.getPath());
return f;
}
this.files = null;
return next();
}
}
}
}
----------------------------------------------------------------
for list details see
http://www.magnolia.info/en/magnolia/developer.html
----------------------------------------------------------------