conor       2003/07/24 07:24:07

  Modified:    src/main/org/apache/tools/ant/taskdefs Parallel.java
               docs/manual/CoreTasks parallel.html
  Log:
  Rework parallel
  Remove need for poll interval (only covered race condition between isAlive
  and notifyAll calls
  Add timeout capability
  Add flag to fail if any nested task fails without waiting for other tasks
  to complete.
  
  Revision  Changes    Path
  1.21      +116 -30   ant/src/main/org/apache/tools/ant/taskdefs/Parallel.java
  
  Index: Parallel.java
  ===================================================================
  RCS file: /home/cvs/ant/src/main/org/apache/tools/ant/taskdefs/Parallel.java,v
  retrieving revision 1.20
  retrieving revision 1.21
  diff -u -w -u -r1.20 -r1.21
  --- Parallel.java     19 Jul 2003 11:20:13 -0000      1.20
  +++ Parallel.java     24 Jul 2003 14:24:07 -0000      1.21
  @@ -98,8 +98,40 @@
       /** Total number of threads per processor to run.  */
       private int numThreadsPerProcessor = 0;
   
  -    /** Interval (in ms) to poll for finished threads. */
  -    private int pollInterval = 1000; // default is once a second
  +    private long timeout;
  +
  +    /** Indicates threads are still running and new threads can be issued */
  +    private volatile boolean stillRunning;
  +
  +    /** INdicates that the execution timedout */
  +    private boolean timedOut;
  +
  +    /**
  +     * Indicates whether failure of any of the nested tasks should end
  +     * execution
  +     */
  +    private boolean failOnAny;
  +
  +    /**
  +     * Interval to poll for completed threads when threadCount or
  +     * threadsPerProcessor is specified.  Integer in milliseconds.; optional
  +     *
  +     * @param pollInterval New value of property pollInterval.
  +     */
  +    public void setPollInterval(int pollInterval) {
  +    }
  +
  +    /**
  +     * Control whether a failure in a nested task halts execution. Note that
  +     * the task will complete but existing threads will continue to run - 
they
  +     * are not stopped
  +     *
  +     * @param failOnAny if true any nested task failure causes parallel to
  +     *        complete.
  +     */
  +    public void setFailOnAny(boolean failOnAny) {
  +        this.failOnAny = failOnAny;
  +    }
   
       /**
        * Add a nested task to execute in parallel.
  @@ -140,15 +172,20 @@
       }
   
       /**
  -     * Interval to poll for completed threads when threadCount or
  -     * threadsPerProcessor is specified.  Integer in milliseconds.; optional
  +     * Sets the timeout on this set of tasks. If the timeout is reached
  +     * before the other threads complete, the execution of this
  +     * task completes with an exception.
        *
  -     * @param pollInterval New value of property pollInterval.
  +     * Note that existing threads continue to run.
  +     *
  +     * @param timeout timeout in milliseconds.
        */
  -    public void setPollInterval(int pollInterval) {
  -        this.pollInterval = pollInterval;
  +    public void setTimeout(long timeout) {
  +        this.timeout = timeout;
       }
   
  +
  +
       /**
        * Execute the parallel tasks
        *
  @@ -181,56 +218,100 @@
        */
       private void spinThreads() throws BuildException {
           final int numTasks = nestedTasks.size();
  -        Thread[] threads = new Thread[numTasks];
           TaskRunnable[] runnables = new TaskRunnable[numTasks];
  +        stillRunning = true;
  +        timedOut = false;
  +
           int threadNumber = 0;
           for (Enumeration e = nestedTasks.elements(); e.hasMoreElements();
                threadNumber++) {
               Task nestedTask = (Task) e.nextElement();
  -            ThreadGroup group = new ThreadGroup("parallel");
  -            TaskRunnable taskRunnable
  +            runnables[threadNumber]
                   = new TaskRunnable(threadNumber, nestedTask);
  -            runnables[threadNumber] = taskRunnable;
  -            threads[threadNumber] = new Thread(group, taskRunnable);
           }
   
  -        final int maxRunning = numThreads;
  -        Thread[] running = new Thread[maxRunning];
  +        final int maxRunning = numTasks < numThreads ? numTasks : numThreads;
  +        TaskRunnable[] running = new TaskRunnable[maxRunning];
  +
           threadNumber = 0;
  +        ThreadGroup group = new ThreadGroup("parallel");
   
           // now run them in limited numbers...
  +        // start initial batch of threads
  +        for (int i = 0; i < maxRunning; ++i) {
  +            running[i] = runnables[threadNumber++];
  +            Thread thread =  new Thread(group, running[i]);
  +            thread.start();
  +        }
  +
  +        if (timeout != 0) {
  +            // start the timeout thread
  +            Thread timeoutThread = new Thread() {
  +                public synchronized void run() {
  +                    try {
  +                        wait(timeout);
  +                        synchronized(semaphore) {
  +                            stillRunning = false;
  +                            timedOut = true;
  +                            semaphore.notifyAll();
  +                        }
  +                    } catch (InterruptedException e) {
  +                        // ignore
  +                    }
  +                }
  +            };
  +            timeoutThread.start();
  +        }
  +
  +        // now find available running slots for the remaining threads
           outer:
  -        while (threadNumber < numTasks) {
  +        while (threadNumber < numTasks && stillRunning) {
               synchronized (semaphore) {
                   for (int i = 0; i < maxRunning; i++) {
  -                    if (running[i] == null || !running[i].isAlive()) {
  -                        running[i] = threads[threadNumber++];
  -                        running[i].start();
  -                        // countinue on outer while loop in case we
  -                        // used our last thread
  +                    if (running[i] == null || running[i].finished) {
  +                        running[i] = runnables[threadNumber++];
  +                        Thread thread =  new Thread(group, running[i]);
  +                        thread.start();
  +                        // countinue on outer while loop to get another
  +                        // available slot
                           continue outer;
                       }
                   }
  -                // if we got here all are running, so sleep a little
  +
  +                // if we got here all slots in use, so sleep until
  +                // something happens
                   try {
  -                    semaphore.wait(pollInterval);
  +                    semaphore.wait();
                   } catch (InterruptedException ie) {
                       // dosen't java know interruptions are rude?
  -                    // just pretend it didn't happen and go aobut out 
business.
  +                    // just pretend it didn't happen and go about out 
business.
                       // sheesh!
                   }
               }
           }
   
  -        // now join to all the threads
  +        synchronized(semaphore) {
  +            // are all threads finished
  +            outer2:
  +            while (stillRunning) {
           for (int i = 0; i < maxRunning; ++i) {
  +                    if (running[i] != null && !running[i].finished) {
  +                        //System.out.println("Thread " + i + " is still 
alive ");
  +                        // still running - wait for it
               try {
  -                if (running[i] != null) {
  -                    running[i].join();
  -                }
  +                            semaphore.wait();
               } catch (InterruptedException ie) {
                   // who would interrupt me at a time like this?
               }
  +                        continue outer2;
  +                    }
  +                }
  +                stillRunning = false;
  +            }
  +        }
  +
  +        if (timedOut) {
  +            throw new BuildException("Parallel execution timed out");
           }
   
           // now did any of the threads throw an exception
  @@ -293,6 +374,7 @@
           private Throwable exception;
           private Task task;
           private int taskNumber;
  +        boolean finished;
   
           /**
            * Construct a new TaskRunnable.<p>
  @@ -315,6 +397,10 @@
                   exception = t;
               } finally {
                   synchronized (semaphore) {
  +                    finished = true;
  +                    if (failOnAny) {
  +                        stillRunning = false;
  +                    }
                       semaphore.notifyAll();
                   }
               }
  
  
  
  1.11      +43 -30    ant/docs/manual/CoreTasks/parallel.html
  
  Index: parallel.html
  ===================================================================
  RCS file: /home/cvs/ant/docs/manual/CoreTasks/parallel.html,v
  retrieving revision 1.10
  retrieving revision 1.11
  diff -u -w -u -r1.10 -r1.11
  --- parallel.html     19 Feb 2003 07:34:48 -0000      1.10
  +++ parallel.html     24 Jul 2003 14:24:07 -0000      1.11
  @@ -31,10 +31,20 @@
     </tr>
     <tr>
       <td valign="top">pollInterval</td>
  -    <td valign="top">Maximum number of milliseconds to wait for before 
checking 
  -when waiting for available threads.</td>
  +    <td valign="top">Currently has no effect</td>
       <td align="center" valign="top">No, default is 1000</td>
     </tr>
  +  <tr>
  +    <td valign="top">timeout</td>
  +    <td valign="top">Number of milliseconds before execution is 
terminated</td>
  +    <td align="center" valign="top">No</td>
  +  </tr>
  +  <tr>
  +    <td valign="top">failonany</td>
  +    <td valign="top">If any of the nested tasks fails, execution of the task 
completes
  +                     at that point without waiting for any other tasks to 
complete.</td>
  +    <td align="center" valign="top">No</td>
  +  </tr>
   </table>
   
   <p>Parallel tasks have a number of uses in an Ant build file including:</p>
  @@ -51,16 +61,19 @@
   class file. Be sure to avoid these types of interactions within a 
   &lt;parallel&gt; task</p>
     
  -<p>The parallel task has no attributes and does not support any nested 
  -elements apart from Ant tasks. Any valid Ant task may be embedded within a 
  +<p>Any valid Ant task may be embedded within a
   parallel task, including other parallel tasks.</p>
   
   <p>Note that while the tasks within the parallel task are being run, the 
main 
  -thread will be blocked waiting for all the child threads to complete.</p>  
  -
  -<p>If any of the tasks within the &lt;parallel&gt; task fails, the remaining 
  -tasks in other threads will continue to run until all threads have 
completed. 
  -In this situation, the parallel task will also fail.</p>
  +thread will be blocked waiting for all the child threads to complete. If
  +execution is terminated by a timeout or a nested task failure when the 
failonany
  +flag is set, the parallel task will complete without waiting for other nested
  +tasks to complete in other threads.
  +</p>
  +
  +<p>If any of the tasks within the &lt;parallel&gt; task fails and failonany 
is
  +not set, the remaining tasks in other threads will continue to run until
  +all threads have completed. In this situation, the parallel task will also 
fail.</p>
   
   <p>The parallel task may be combined with the <a href="sequential.html">
   sequential</a> task to define sequences of tasks to be executed on each 
thread
  @@ -80,11 +93,11 @@
   override the value in threadCount.  If threadsPerProcessor is specified using
   any version prior to 1.4 then the value in threadCount will be used as 
is.</p>
   
  -<p>When using threadCount and threadsPerProcessor care should be taken to 
insure
  +<p>When using threadCount and threadsPerProcessor care should be taken to 
ensure
   that the build does not deadlock.  This can be caused by tasks such as 
waitFor
  -takeing up all available threads before the tasks that would unlock the 
waitfor
  +taking up all available threads before the tasks that would unlock the 
waitfor
   would occur.  This is not a repalcement for Java Language level thread
  -semantics and is best used for "embarasingly parallel" tasks.</p>
  +semantics and is best used for "embarassingly parallel" tasks.</p>
   
   <h3>Examples</h3>
   <pre>
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to