On 5/18/06, Nicolas <[email protected]> wrote:
Looks like it's coming from a deleted node in the repository, but the
workflow engine does not know about it.

Hi Nicolas,

very interesting : how are the two logs interrelated, are they taken
from the same log file ?

You're saying that this doesn't seem to affect the engine's operation,
aren't you ?

I'd love to reimplement the contentIterator of the JCRExpressionStore.
Under very heavy load (lots of processes dehydrated), it might break.

See the attached FileExpressionStore.java (sorry, sf.net's webSvn is
down), it contains a StoreIterator which doesn't fill up the memory
when iterating over the content of the pool. I think it's no big deal
adapting that to the JCRExpressionStore as this store concepts are
pretty much the same as the ones of the FileExpressionStore.

WDYT ?

John
/*
 * Copyright (c) 2005, John Mettraux, OpenWFE.org
 * All rights reserved.
 * 
 * Redistribution and use in source and binary forms, with or without 
 * modification, are permitted provided that the following conditions are met:
 * 
 * . Redistributions of source code must retain the above copyright notice, this
 *   list of conditions and the following disclaimer.  
 * 
 * . Redistributions in binary form must reproduce the above copyright notice, 
 *   this list of conditions and the following disclaimer in the documentation 
 *   and/or other materials provided with the distribution.
 * 
 * . Neither the name of the "OpenWFE" nor the names of its contributors may be
 *   used to endorse or promote products derived from this software without
 *   specific prior written permission.
 * 
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 * POSSIBILITY OF SUCH DAMAGE.
 *
 * $Id: FileExpressionStore.java 2510 2006-04-25 19:16:54Z jmettraux $
 */

//
// FileExpressionStore.java
//
// [EMAIL PROTECTED]
//
// generated with 
// jtmpl 1.0.04 20.11.2001 John Mettraux ([EMAIL PROTECTED])
//

package openwfe.org.engine.impl.expool;

import openwfe.org.Utils;
import openwfe.org.MapUtils;
import openwfe.org.ApplicationContext;
import openwfe.org.ServiceException;
import openwfe.org.xml.XmlUtils;
import openwfe.org.time.Time;
import openwfe.org.misc.DirectoryFilter;
import openwfe.org.engine.expool.PoolException;
import openwfe.org.engine.expressions.EngineEnvironmentId;
import openwfe.org.engine.expressions.ReplyException;
import openwfe.org.engine.expressions.FlowExpression;
import openwfe.org.engine.expressions.FlowExpressionId;
import openwfe.org.engine.expressions.ExpressionWithTimeOut;


/**
 * The FileExpressionStore focuses on storing expression in the filesystem.
 * It's an abstract class, as extending classes (XmlExpressionStore, 
 * SerialExpressionStore) will define how workitems are stored in their 
 * respective files. This abstract class simply tells where and in which
 * file [name].
 *
 * <p><font size=2>CVS Info :
 * <br>$Author: jmettraux $
 * <br>$Date: 2006-04-25 21:16:54 +0200 (Tue, 25 Apr 2006) $
 * <br>$Id: FileExpressionStore.java 2510 2006-04-25 19:16:54Z jmettraux $ </font>
 *
 * @author [EMAIL PROTECTED]
 */
public abstract class FileExpressionStore

    extends AbstractExpressionStore

{

    private final static org.apache.log4j.Logger log = org.apache.log4j.Logger
        .getLogger(FileExpressionStore.class.getName());

    //
    // CONSTANTS (definitions)

    /**
     * Use this parameter to indicate to a FileExpressionStore in which 
     * directory it should save its run data.
     */
    public static String P_WORK_DIRECTORY 
        = "workDirectory";

    /**
     * If there is not parameter "workDirectory" in this service configuration,
     * expressions will be stored in "work/engine/pool".
     */
    public final static String DEFAULT_WORK_DIRECTORY
        = "work/engine/pool";

    //
    // FIELDS

    private String workDirectory = null;
    private String engineEnvExpressionPath = null;

    //
    // CONSTRUCTORS

    public void init 
        (final String serviceName, 
         final ApplicationContext context, 
         final java.util.Map serviceParams)
    throws 
        ServiceException
    {
        super.init(serviceName, context, serviceParams);

        //
        // determine workDirectory

        //this.workDirectory = (String)serviceParams.get(P_WORK_DIRECTORY);

        this.workDirectory = MapUtils.getAsString
            (serviceParams, P_WORK_DIRECTORY, DEFAULT_WORK_DIRECTORY);

        this.workDirectory = Utils.getCanonicalPath
            (getContext().getApplicationDirectory(),
             this.workDirectory);

        this.workDirectory += java.io.File.separator;

        this.engineEnvExpressionPath = 
            this.workDirectory + "engine-env-expression.xml";


        log.info("init() workDirectory set to "+this.workDirectory);
    }

    //
    // ABSTRACT METHODS

    protected abstract void saveExpression (String fileName, FlowExpression fe)
        throws Exception;

    protected abstract FlowExpression loadExpression (String fileName)
        throws Exception;

    //
    // METHODS

    /**
     * Stores an expression to a file in the pool directory.
     */
    public void storeExpression (final FlowExpression fe)
        throws PoolException
    {
        log.debug("storeExpression() for "+fe.getId());

        final long start = System.currentTimeMillis();

        try
        {
            //
            // does WFD directory exist ?
            
            final java.io.File dir = 
                new java.io.File(determineDirName(fe.getId()));

            if ( ! dir.exists()) dir.mkdirs();

            //
            // store expression

            final String fileName = determineFileName(fe.getId());

            // some debug output (remove it when delete() problem is solved)
            //
            //if ((new java.io.File(fileName)).exists())
            //{
            //    log.debug
            //        ("storeExpression() "+fileName+" already exists.");
            //}

            saveExpression(fileName, fe);
        }
        catch (final Exception e)
        {
            throw new PoolException
                ("Failed to store expression", e);
        }

        final long duration = System.currentTimeMillis() - start;
        log.debug("storeExpression() took "+duration+" ms");
    }

    /**
     * Removes an expression (deletes its file) from the pool directory.
     */
    public void unstoreExpression (final FlowExpression fe)
        throws PoolException
    {
        //
        // remove expression

        final String fileName = determineFileName(fe.getId());

        log.debug("unstoreExpression() deleting file "+fileName);

        java.io.File f = new java.io.File(fileName);

        boolean result = f.delete();

        if (result == false) log.debug("unstoreExpression() delete failed");

        log.debug("unstoreExpression() unstored   "+fe.getId());
    }

    /**
     * Loads (deserializes) an expression from the pool directory.
     */
    public FlowExpression loadExpression (final FlowExpressionId fei)
        throws PoolException
    {
        log.debug("loadExpression() requested for "+fei);

        long start = System.currentTimeMillis();

        String fileName = determineFileName(fei);

        log.debug("loadExpression() fileName is "+fileName);

        FlowExpression result = null;
        try
        {
            result = loadExpression(fileName);

            //result.setApplicationContext(this.applicationContext);
        }
        catch (final Exception e)
        {
            //log.debug
            //    ("Failed to load expression", e);
            throw new PoolException
                ("Failed to load expression", e);
        }

        long duration = System.currentTimeMillis() - start;
        log.debug("loadExpression() took "+duration+" ms");

        return result;
    }

    //
    // file name determination

    private String getLastDigit (final FlowExpressionId fei)
    {
        final String wfid = fei.getParentWorkflowInstanceId();

        return wfid.substring(wfid.length()-1);
    }

    /**
     * Determines in which dir an expression should be stored.
     */
    protected String determineDirName (final FlowExpressionId fei)
    {
        //log.debug("determineDirName() fei is "+fei);

        if (EngineEnvironmentId.ID.equals(fei))
            return this.workDirectory;

        StringBuffer sb = new StringBuffer();

        sb.append(this.workDirectory);

        sb.append(Utils.ensureForFileName(fei.getWorkflowDefinitionName()));
        sb.append("--");
        sb.append(Utils.ensureForFileName(fei.getWorkflowDefinitionRevision()));

        sb.append(java.io.File.separator);

        sb.append(getLastDigit(fei));

        sb.append(java.io.File.separator);

        sb.append(fei.getParentWorkflowInstanceId());

        sb.append(java.io.File.separator);

        return sb.toString();
    }

    /**
     * Determines under which filename an expression should be stored.
     * It returns a full (absolute) path.
     */
    protected String determineFileName (final FlowExpressionId fei)
    {
        if (EngineEnvironmentId.ID.equals(fei))
            return this.engineEnvExpressionPath;

        StringBuffer sb = new StringBuffer();

        sb.append(determineDirName(fei));

        sb.append(Utils.ensureForFileName(fei.getInitialEngineId()));
        sb.append("--");
        sb.append(fei.getWorkflowInstanceId());
        sb.append("--");
        sb.append(fei.getExpressionName());
        sb.append("--");
        sb.append(fei.getExpressionId());
        sb.append(".xml");

        return sb.toString();
    }

    /**
     * This method will check if the dir is empty;
     * if yes, it will remove it.
     */
    protected boolean cleanedDir (final java.io.File dir)
    {
        log.debug("cleanedDir() called for  "+dir.getPath());

        if (dir.list().length > 0) return false;

        long lDir = -1;
        try
        {
            lDir = Long.parseLong(dir.getName());
        }
        catch (final Throwable t)
        {
            log.debug
                ("cleanedDir() not a wfid dir >"+dir.getName()+"< skipping");
            return true;
        }

        final long age = System.currentTimeMillis() - lDir;

        if (age > 60 * 1000) // 1 minute
        {
            final boolean b = dir.delete();

            if (b)
                log.debug("cleanedDir() removed dir : "+dir.getPath());
            else
                log.debug("cleanedDir() failed to remove dir : "+dir.getPath());

            return true;
        }

        log.debug("cleanedDir() did not remove young dir : "+dir.getPath());

        return false;
    }

    /**
     * This method returns an iterator on all the expression stored here
     * whose class is assignable from the given 'assignClass'.
     * This method is used by daemons checking the content of the store for
     * purge and timeout purposes.
     */
    public java.util.Iterator contentIterator
        (final Class assignClass)
    {
        return new StoreIterator(assignClass);
    }

    private boolean isNotDigitDir (final java.io.File f)
    {
        //if ( ! f.isDirectory()) return true;
            //
            // already checked by DirectoryFilter...

        if (f.getName().length() != 1) return true;
        return ( ! Character.isDigit(f.getName().charAt(0)));
    }
    
    /**
     * Returns the count of expressions stored here.
     */
    public int size ()
    {
        int count = 0;

        final java.io.File[] dirs = 
            new java.io.File(this.workDirectory)
                .listFiles(new DirectoryFilter());

        for (int i=0; i<dirs.length; i++)
        {
            //
            // roam through each expression (with timeout) file

            final java.io.File[] files = dirs[i]
                .listFiles(new FilenameFilter(".*\\.xml"));

            count += files.length;
        }

        return count;
    }

    //
    // METHODS from Service

    /**
     * Status is outputted as a JDOM element. The status is various
     * information about a service activities and state.
     */
    public org.jdom.Element getStatus ()
    {
        org.jdom.Element result = new org.jdom.Element(getName());

        result.addContent(XmlUtils.getClassElt(this));
        result.addContent(XmlUtils.getRevisionElt("$Id: FileExpressionStore.java 2510 2006-04-25 19:16:54Z jmettraux $"));

        int totalExpressionCount = 0;

        //
        // count expressions per flow instance
        
        java.util.Map expressionsPerFlow = new java.util.HashMap();
        java.util.Map expressionsPerInstance = new java.util.HashMap();

        final java.io.File workDir = 
            new java.io.File(this.workDirectory);
        final java.io.File[] flowDirs = 
            workDir.listFiles(new DirectoryFilter());

        for (int i=0; i<flowDirs.length; i++)
        {
            final java.io.File flowDir = flowDirs[i];

            log.debug("getStatus() considering dir "+flowDir);

            final java.io.File[] digitDirs = flowDir
                .listFiles(new DirectoryFilter());

            for (int j=0; j<digitDirs.length; j++)
            {
                if (isNotDigitDir(digitDirs[j])) continue;

                log.debug("getStatus() considering digitDir "+digitDirs[j]);

                final java.io.File[] wfidDirs = digitDirs[j]
                    .listFiles(new DirectoryFilter());

                for (int k=0; k<wfidDirs.length; k++)
                {
                    log.debug("getStatus() considering wfidDir "+wfidDirs[k]);

                    final java.io.File[] expFiles = 
                        wfidDirs[k].listFiles(new ExpressionFilter());

                    totalExpressionCount += expFiles.length;

                    openwfe.org.Utils.inc
                        (expressionsPerFlow, 
                         flowDir.getName(), 
                         expFiles.length);

                    for (int l=0; l<expFiles.length; l++)
                    {
                        final String fileName = expFiles[l].getName();

                        final String instanceId = wfidDirs[k].getName();

                        log.debug("getStatus() considering file "+fileName);

                        openwfe.org.Utils
                            .inc(expressionsPerInstance, instanceId);
                    }
                }
            }
        }

        //
        // output

        org.jdom.Element eFlows = new org.jdom.Element("flows");

        java.util.Iterator it = expressionsPerFlow.keySet().iterator();
        while (it.hasNext())
        {
            String flowDef = (String)it.next();
            Integer count = (Integer)expressionsPerFlow.get(flowDef);

            org.jdom.Element eFlow = new org.jdom.Element("flow");
            eFlow.setAttribute("def", flowDef);
            eFlow.setAttribute("expressions", count.toString());

            eFlows.addContent(eFlow);
        }

        it = expressionsPerInstance.keySet().iterator();
        while (it.hasNext())
        {
            String flowId = (String)it.next();
            Integer count = (Integer)expressionsPerInstance.get(flowId);

            org.jdom.Element eFlow = new org.jdom.Element("flow");
            eFlow.setAttribute("id", flowId);
            eFlow.setAttribute("expressions", count.toString());

            eFlows.addContent(eFlow);
        }

        result.addContent(eFlows);

        //
        // add total expression count

        org.jdom.Element eTotalExpressionCount = 
            new org.jdom.Element("expressionsStored");
        eTotalExpressionCount.addContent
            (new org.jdom.Text(""+totalExpressionCount));
        result.addContent(eTotalExpressionCount);

        //
        // the end

        return result;
    }

    /* *
     * A debug method : will dump the status of the store in 
     * $OPENWFE_HOME/logs/store_dump.xml
     * /
    protected void dumpStatus ()
    {
        final org.jdom.Document doc = new org.jdom.Document(getStatus());
        final String s = openwfe.org.Utils.toString(doc, null);
            // 'null' means : use default encoding

        final String dumpFileName = getContext()
            .getApplicationDirectory()+"logs/store_dump.xml";
        try
        {
            final java.io.BufferedWriter bw = new java.io.BufferedWriter
                (new java.io.FileWriter(dumpFileName));
            bw.write(s);
            bw.flush();
            bw.close();
        }
        catch (final Exception e)
        {
            log.warn
                ("Failed to dump to "+dumpFileName, e);
        }
    }
     */

    // 
    // STATIC METHODS

    private static long extractInstanceId (final String fileName)
    {
        final int i = fileName.indexOf("--");
        if (i < 0) return -1;
        try
        {
            return Long.parseLong(fileName.substring(0, i));
        }
        catch (NumberFormatException nfe)
        {
            log.warn("failed to parse '"+fileName+"' (til '--') into a long");
        }
        return -1;
    }

    //
    // INNER CLASSES

    private static class ExpressionFilter
        implements java.io.FilenameFilter
    {
        public boolean accept (final java.io.File dir, final String fileName)
        {
            return fileName.endsWith(".xml");
        }
    }

    private static class FilenameFilter
        implements java.io.FilenameFilter
    {
        private String[] shortNames = null;

        public FilenameFilter ()
        {
        }

        public FilenameFilter (final String expressionShortName)
        {
            this.shortNames = new String[] { expressionShortName };
        }

        public FilenameFilter (final String[] expressionShortNames)
        {
            this.shortNames = expressionShortNames;
        }

        public boolean accept (final java.io.File dir, final String fileName)
        {
            if ( ! fileName.endsWith(".xml")) return false;

            if (this.shortNames == null) return true;

            for (int i=0; i<this.shortNames.length; i++)
            {
                if (fileName.matches(".*--"+this.shortNames[i]+"--.*\\.xml"))
                {
                    return true;
                }
            }

            return false;
        }
    }

    protected class StoreIterator
        implements java.util.Iterator
    {
        private final org.apache.log4j.Logger log = 
            org.apache.log4j.Logger.getLogger(StoreIterator.class.getName());

        //
        // FIELDS

        private Class assignClass = null;

        private Level wfDirLevel = null;
        private Level digitDirLevel = null;
        private Level wfidDirLevel = null;
        private Level expressionLevel = null;

        private FlowExpression next = null;

        //
        // CONSTRUCTORS

        public StoreIterator (final Class assignClass)
        {
            //
            // determine short names

            this.assignClass = assignClass;

            final String[] shortNames = 
                determineExpressionShortNames(assignClass);

            //
            // set up levels

            this.wfDirLevel = 
                new Level("wf    (0)", null);
            this.digitDirLevel = 
                new Level("digit (1)", wfDirLevel);
            this.wfidDirLevel = 
                new Level("wfid  (2)", digitDirLevel);

            this.expressionLevel = 
                new Level("exp   (3)", wfidDirLevel, new FilenameFilter(shortNames));

            this.wfDirLevel.init
                (new java.io.File(FileExpressionStore.this.workDirectory));

            this.next = fetchNext();
        }

        //
        // PUBLIC METHODS

        public boolean hasNext ()
        {
            return (this.next != null);
        }

        private FlowExpression fetchNext ()
        {
            final java.io.File f = this.expressionLevel.next();

            if (f == null) return null;

            FlowExpression fe = null;
            try
            {
                fe = loadExpression(f.getPath());
            }
            catch (final Throwable t)
            {
                //log.debug("next() problem iterating, skipping... ", t);
                log.warn("next() problem iterating, skipping... "+t);

                return fetchNext();
            }

            if ( ! ExpoolUtils.isAssignableFromClass(fe, assignClass))
            {
                log.warn
                    ("next() expression of class '"+fe.getClass().getName()+
                     "' not assignable from class '"+assignClass.getName()+"'");

                return fetchNext();
            }

            return fe;
        }

        public Object next ()
            throws java.util.NoSuchElementException
        {
            final FlowExpression current = this.next;

            if (current == null) 
                throw new java.util.NoSuchElementException();

            this.next = fetchNext();

            return current;
        }

        public void remove ()
        {
            // not necessary.
        }

        //
        // METHODS

        //
        // INNER CLASSES

        protected class Level
        {
            public String name = null;

            public Level previous = null;
            public int index = -1;
            public java.io.File[] files = null;

            public java.io.FileFilter fFilter = null;
            public java.io.FilenameFilter fnFilter = null;

            public Level 
                (final String name, final Level previous)
            {
                this.name = name;
                this.previous = previous;
                this.fFilter = new DirectoryFilter();
            }

            public Level 
                (final String name, 
                 final Level previous, 
                 final java.io.FileFilter filter)
            {
                this.name = name;
                this.previous = previous;
                this.fFilter = filter;
            }

            public Level 
                (final String name, 
                 final Level previous, 
                 final java.io.FilenameFilter filter)
            {
                this.name = name;
                this.previous = previous;
                this.fnFilter = filter;
            }

            public void init (final java.io.File dir)
            {
                log.debug
                    ("init() '"+this.name+"' to "+dir.getPath());

                this.index = 0;

                if (this.fFilter != null)
                {
                    this.files = dir.listFiles(this.fFilter);
                }
                else if (this.fnFilter != null)
                {
                    this.files = dir.listFiles(this.fnFilter);
                }
                else
                {
                    this.files = dir.listFiles();
                }

                if (this.files == null)
                {
                    log.debug
                        ("init() '"+this.name+"' empty dir : "+dir.getPath());
                }
                else
                {
                    log.debug
                        ("init() '"+this.name+
                         "' this.files.length is "+this.files.length);
                }
            }

            /**
             * Returns the next element, or null if the iteration is over.
             */
            public java.io.File next ()
            {
                if (this.files == null)
                {
                    if (this.previous == null) 
                        //
                        // this is the 'top level', no previous,
                        // iteration is over
                        //
                        return null;

                    final java.io.File dir = this.previous.next();

                    if (dir == null) 
                        //
                        // iteration over (at next step)
                        //
                        return null;

                    if (cleanedDir(dir))
                        //
                        // dir was empty and out of date,
                        // it has been removed, let's iterate to next dir
                        return next();

                    init(dir);
                        //
                        // set dir as current item
                }

                return pnext();
            }
            
            /*
             * pnext = private next
             */
            private java.io.File pnext ()
            {
                if (this.index < this.files.length)
                {
                    java.io.File f = this.files[this.index];
                    this.index++;

                    log.debug
                        ("pnext() '"+this.name+"' f : "+f.getPath());
                    log.debug
                        ("pnext() '"+this.name+"' f.exists() ? "+f.exists());

                    if ( ! f.exists()) return pnext();

                    log.debug
                        ("pnext() '"+this.name+"' returning "+f.getPath());

                    return f;
                }

                this.files = null;
                return next();
            }
        }

    }

}
----------------------------------------------------------------
for list details see
http://www.magnolia.info/en/magnolia/developer.html
----------------------------------------------------------------

Reply via email to