rdsr commented on a change in pull request #45: Lazily submit tasks in 
ParallelIterable and add cancellation.
URL: https://github.com/apache/incubator-iceberg/pull/45#discussion_r240885973
 
 

 ##########
 File path: core/src/main/java/com/netflix/iceberg/util/ParallelIterable.java
 ##########
 @@ -19,73 +19,124 @@
 
 package com.netflix.iceberg.util;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.netflix.iceberg.io.CloseableGroup;
+import java.io.Closeable;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
-public class ParallelIterable<T> implements Iterable<T> {
+public class ParallelIterable<T> extends CloseableGroup implements Iterable<T> 
{
   private final Iterable<Iterable<T>> iterables;
-  private final ExecutorService trackingPool;
   private final ExecutorService workerPool;
 
   public ParallelIterable(Iterable<Iterable<T>> iterables,
-                          ExecutorService trackingPool,
                           ExecutorService workerPool) {
     this.iterables = iterables;
-    this.trackingPool = trackingPool;
     this.workerPool = workerPool;
   }
 
   @Override
   public Iterator<T> iterator() {
-    return new ParallelIterator<>(iterables, trackingPool, workerPool);
+    ParallelIterator<T> iter = new ParallelIterator<>(iterables, workerPool);
+    addCloseable(iter);
+    return iter;
   }
 
-  private static class ParallelIterator<T> implements Iterator<T> {
+  private static class ParallelIterator<T> implements Iterator<T>, Closeable {
+    private final Iterator<Runnable> tasks;
+    private final ExecutorService workerPool;
+    private final Future<?>[] taskFutures;
     private final ConcurrentLinkedQueue<T> queue = new 
ConcurrentLinkedQueue<>();
-    private final Future<?> taskFuture;
-
-    public ParallelIterator(Iterable<Iterable<T>> iterables,
-                            ExecutorService trackingPool,
-                            ExecutorService workerPool) {
-      this.taskFuture = trackingPool.submit(() -> {
-        Tasks.foreach(iterables)
-            .noRetry().stopOnFailure().throwFailureWhenFinished()
-            .executeWith(workerPool)
-            .run(iterable -> {
-              for (T item : iterable) {
-                queue.add(item);
-              }
-            });
-        return true;
-      });
+    private boolean closed = false;
+
+    private ParallelIterator(Iterable<Iterable<T>> iterables,
+                             ExecutorService workerPool) {
+      this.tasks = Iterables.transform(iterables, iterable ->
+          (Runnable) () -> {
+            for (T item : iterable) {
+              queue.add(item);
+            }
+          }).iterator();
+      this.workerPool = workerPool;
+      // submit 2 tasks per worker at a time
+      this.taskFutures = new Future[2 * ThreadPools.WORKER_THREAD_POOL_SIZE];
+    }
+
+    @Override
+    public void close() {
+      // cancel background tasks
+      for (int i = 0; i < taskFutures.length; i += 1) {
+        if (taskFutures[i] != null && !taskFutures[i].isDone()) {
+          taskFutures[i].cancel(true);
+        }
+      }
+      this.closed = true;
+    }
+
+    /**
+     * Checks on running tasks and submits new tasks if needed.
+     * <p>
+     * This should not be called after {@link #close()}.
+     *
+     * @return true if there are pending tasks, false otherwise
+     */
+    private boolean checkTasks() {
+      boolean hasRunningTask = false;
+
+      for (int i = 0; i < taskFutures.length; i += 1) {
+        if (taskFutures[i] == null || taskFutures[i].isDone()) {
+          taskFutures[i] = submitNextTask();
+        }
+
+        if (taskFutures[i] != null) {
+          hasRunningTask = true;
+        }
+      }
+
+      return tasks.hasNext() || hasRunningTask;
+    }
+
+    private Future<?> submitNextTask() {
+      if (tasks.hasNext()) {
+        return workerPool.submit(tasks.next());
+      }
+      return null;
     }
 
     @Override
     public synchronized boolean hasNext() {
+      Preconditions.checkState(!closed, "Already closed");
+
+      // if the consumer is processing records more slowly than the producers, 
then this check will
+      // prevent tasks from being submitted. while the producers are running, 
this will always
+      // return here before running checkTasks. when enough of the tasks are 
finished that the
+      // consumer catches up, then lots of new tasks will be submitted at 
once. this behavior is
+      // okay because it ensures that records are not stacking up waiting to 
be consumed and taking
+      // up memory.
+      //
+      // consumers that process results quickly will periodically exhaust the 
queue and submit new
+      // tasks when checkTasks runs. fast consumers should not be delayed.
+      if (!queue.isEmpty()) {
+        return true;
+      }
+
       // this cannot conclude that there are no more records until tasks have 
finished. while some
       // are running, return true when there is at least one item to return.
-      while (!taskFuture.isDone()) {
+      while (checkTasks()) {
 
 Review comment:
   Not sure I completely follow the logic here
   
   When `checkTasks()` returns true, we know for sure that some datafiles will 
be added to the queue (since either we submitted new tasks or there are already 
tasks running). 
   
   I wonder if a slightly different approach, which could be slightly more 
lazier, will help here. Can we block on the queue until its size > 0 instead of 
the while loop on `checkTasks()`?  In this way, we do not keep on scheduling 
more tasks until our queue gets some datafiles. Seems like this may be more in 
line with the general idea of the patch  [lazily parsing manifests and 
computing datafiles] as it would only submit necessary number of tasks.
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to