[ 
https://issues.apache.org/jira/browse/BEAM-13541?focusedWorklogId=701750&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-701750
 ]

ASF GitHub Bot logged work on BEAM-13541:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/Dec/21 05:03
            Start Date: 29/Dec/21 05:03
    Worklog Time Spent: 10m 
      Work Description: robertwb commented on a change in pull request #16354:
URL: https://github.com/apache/beam/pull/16354#discussion_r776154114



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
##########
@@ -361,62 +377,331 @@ private CoGbkResult(CoGbkResultSchema schema, 
List<Iterable<?>> valueMap) {
   }
 
   /**
-   * Lazily filters and recasts an {@code Iterator<RawUnionValue>} into an 
{@code Iterator<V>},
-   * where V is the type of the raw union value's contents.
+   * A re-iterable that notifies an observer at every advance, and upon 
finishing, but only once
+   * across all copies.
+   *
+   * @param <T> The value type of the underlying iterable.
    */
-  private static class UnionValueIterator<V> implements Iterator<V> {
+  private static class ObservingReiterator<T> implements Reiterator<T> {
+
+    public interface Observer<T> {
+      /**
+       * Called exactly once, across all copies before advancing this iterator.
+       *
+       * <p>The iterator rather than the element is given so that the callee 
can perform a copy if
+       * desired. This class offers a peek method to get at the current 
element without disturbing
+       * the state of this iterator.
+       */
+      void observeAt(ObservingReiterator<T> reiterator);
+
+      /** Called exactly once, across all copies, once this iterator is 
exhausted. */
+      void done();
+    }
 
-    private final int tag;
-    private final PeekingIterator<RawUnionValue> unions;
-    private final Boolean[] containsTag;
+    private PeekingReiterator<IndexingReiterator.Indexed<T>> underlying;
+    private Observer<T> observer;
 
-    private UnionValueIterator(int tag, Iterator<RawUnionValue> unions, 
Boolean[] containsTag) {
-      this.tag = tag;
-      this.unions = Iterators.peekingIterator(unions);
-      this.containsTag = containsTag;
+    // Used to keep track of what has been observed so far.
+    private final int[] lastObserved;

Review comment:
       They are zero-length arrays because we want to share these values among 
all copies of the reiterator. Basically they're like pointers. Added a comment 
to clarify. 

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
##########
@@ -361,62 +377,331 @@ private CoGbkResult(CoGbkResultSchema schema, 
List<Iterable<?>> valueMap) {
   }
 
   /**
-   * Lazily filters and recasts an {@code Iterator<RawUnionValue>} into an 
{@code Iterator<V>},
-   * where V is the type of the raw union value's contents.
+   * A re-iterable that notifies an observer at every advance, and upon 
finishing, but only once
+   * across all copies.
+   *
+   * @param <T> The value type of the underlying iterable.
    */
-  private static class UnionValueIterator<V> implements Iterator<V> {
+  private static class ObservingReiterator<T> implements Reiterator<T> {
+
+    public interface Observer<T> {
+      /**
+       * Called exactly once, across all copies before advancing this iterator.
+       *
+       * <p>The iterator rather than the element is given so that the callee 
can perform a copy if
+       * desired. This class offers a peek method to get at the current 
element without disturbing
+       * the state of this iterator.
+       */
+      void observeAt(ObservingReiterator<T> reiterator);
+
+      /** Called exactly once, across all copies, once this iterator is 
exhausted. */
+      void done();
+    }
 
-    private final int tag;
-    private final PeekingIterator<RawUnionValue> unions;
-    private final Boolean[] containsTag;
+    private PeekingReiterator<IndexingReiterator.Indexed<T>> underlying;
+    private Observer<T> observer;
 
-    private UnionValueIterator(int tag, Iterator<RawUnionValue> unions, 
Boolean[] containsTag) {
-      this.tag = tag;
-      this.unions = Iterators.peekingIterator(unions);
-      this.containsTag = containsTag;
+    // Used to keep track of what has been observed so far.
+    private final int[] lastObserved;
+    private final boolean[] doneHasRun;
+    private final PeekingReiterator[] mostAdvanced;
+
+    public ObservingReiterator(Reiterator<T> underlying, Observer<T> observer) 
{
+      this(new PeekingReiterator<>(new IndexingReiterator<>(underlying)), 
observer);
+    }
+
+    public ObservingReiterator(
+        PeekingReiterator<IndexingReiterator.Indexed<T>> underlying, 
Observer<T> observer) {
+      this(
+          underlying,
+          observer,
+          new int[] {-1},
+          new boolean[] {false},
+          new PeekingReiterator[] {underlying});
+    }
+
+    private ObservingReiterator(
+        PeekingReiterator<IndexingReiterator.Indexed<T>> underlying,
+        Observer<T> observer,
+        int[] lastObserved,
+        boolean[] doneHasRun,
+        PeekingReiterator[] mostAdvanced) {
+      this.underlying = underlying;
+      this.observer = observer;
+      this.lastObserved = lastObserved;
+      this.doneHasRun = doneHasRun;
+      this.mostAdvanced = mostAdvanced;
+    }
+
+    @Override
+    public Reiterator<T> copy() {
+      return new ObservingReiterator<T>(
+          underlying.copy(), observer, lastObserved, doneHasRun, mostAdvanced);
     }
 
     @Override
     public boolean hasNext() {
-      if (Boolean.FALSE.equals(containsTag[tag])) {
-        return false;
+      boolean hasNext = underlying.hasNext();
+      if (!hasNext && !doneHasRun[0]) {
+        mostAdvanced[0] = underlying;
+        observer.done();
+        doneHasRun[0] = true;
       }
-      advance();
-      if (unions.hasNext()) {
-        return true;
-      } else {
-        // Now that we've iterated over all the values, we can resolve all the 
"unknown" null
-        // values to false.
-        for (int i = 0; i < containsTag.length; i++) {
-          if (containsTag[i] == null) {
-            containsTag[i] = false;
-          }
-        }
-        return false;
+      return hasNext;
+    }
+
+    @Override
+    public T next() {
+      peek(); // trigger observation *before* advancing
+      return underlying.next().value;
+    }
+
+    public T peek() {
+      IndexingReiterator.Indexed<T> next = underlying.peek();
+      if (next.index > lastObserved[0]) {
+        assert next.index == lastObserved[0] + 1;
+        mostAdvanced[0] = underlying;
+        lastObserved[0] = next.index;
+        observer.observeAt(this);
       }
+      return next.value;
+    }
+
+    public void fastForward() {
+      if (underlying != mostAdvanced[0]) {
+        underlying = mostAdvanced[0].copy();
+      }
+    }
+  }
+
+  /**
+   * Assigns a monotonically increasing index to each item in teh underling 
Reiterator.

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 701750)
    Time Spent: 1h 10m  (was: 1h)

> Use runtime information to improve CoGroupByKey caching
> -------------------------------------------------------
>
>                 Key: BEAM-13541
>                 URL: https://issues.apache.org/jira/browse/BEAM-13541
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-ideas
>            Reporter: Sunil Pedapudi
>            Assignee: Robert Bradshaw
>            Priority: P0
>             Fix For: 2.36.0
>
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Currently, CoGroupByKey creates UnionTables that are Flattened. The Flattened 
> output is processed by a GroupByKey to produce a CoGbkResult (via 
> ConstructCoGbkResultFn). 
>  
> Given the performance of CoGBK is greatly impacted based on the which 
> elements are cached in the (finitely sized) in-memory results, it would be 
> useful if CoGbkResult can use runtime information to prioritize which 
> elements are stored in-memory.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to