[ https://issues.apache.org/jira/browse/BEAM-13541?focusedWorklogId=701750&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-701750 ]
ASF GitHub Bot logged work on BEAM-13541: ----------------------------------------- Author: ASF GitHub Bot Created on: 29/Dec/21 05:03 Start Date: 29/Dec/21 05:03 Worklog Time Spent: 10m Work Description: robertwb commented on a change in pull request #16354: URL: https://github.com/apache/beam/pull/16354#discussion_r776154114 ########## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java ########## @@ -361,62 +377,331 @@ private CoGbkResult(CoGbkResultSchema schema, List<Iterable<?>> valueMap) { } /** - * Lazily filters and recasts an {@code Iterator<RawUnionValue>} into an {@code Iterator<V>}, - * where V is the type of the raw union value's contents. + * A re-iterable that notifies an observer at every advance, and upon finishing, but only once + * across all copies. + * + * @param <T> The value type of the underlying iterable. */ - private static class UnionValueIterator<V> implements Iterator<V> { + private static class ObservingReiterator<T> implements Reiterator<T> { + + public interface Observer<T> { + /** + * Called exactly once, across all copies before advancing this iterator. + * + * <p>The iterator rather than the element is given so that the callee can perform a copy if + * desired. This class offers a peek method to get at the current element without disturbing + * the state of this iterator. + */ + void observeAt(ObservingReiterator<T> reiterator); + + /** Called exactly once, across all copies, once this iterator is exhausted. */ + void done(); + } - private final int tag; - private final PeekingIterator<RawUnionValue> unions; - private final Boolean[] containsTag; + private PeekingReiterator<IndexingReiterator.Indexed<T>> underlying; + private Observer<T> observer; - private UnionValueIterator(int tag, Iterator<RawUnionValue> unions, Boolean[] containsTag) { - this.tag = tag; - this.unions = Iterators.peekingIterator(unions); - this.containsTag = containsTag; + // Used to keep track of what has been observed so far. + private final int[] lastObserved; Review comment: They are zero-length arrays because we want to share these values among all copies of the reiterator. Basically they're like pointers. Added a comment to clarify. ########## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java ########## @@ -361,62 +377,331 @@ private CoGbkResult(CoGbkResultSchema schema, List<Iterable<?>> valueMap) { } /** - * Lazily filters and recasts an {@code Iterator<RawUnionValue>} into an {@code Iterator<V>}, - * where V is the type of the raw union value's contents. + * A re-iterable that notifies an observer at every advance, and upon finishing, but only once + * across all copies. + * + * @param <T> The value type of the underlying iterable. */ - private static class UnionValueIterator<V> implements Iterator<V> { + private static class ObservingReiterator<T> implements Reiterator<T> { + + public interface Observer<T> { + /** + * Called exactly once, across all copies before advancing this iterator. + * + * <p>The iterator rather than the element is given so that the callee can perform a copy if + * desired. This class offers a peek method to get at the current element without disturbing + * the state of this iterator. + */ + void observeAt(ObservingReiterator<T> reiterator); + + /** Called exactly once, across all copies, once this iterator is exhausted. */ + void done(); + } - private final int tag; - private final PeekingIterator<RawUnionValue> unions; - private final Boolean[] containsTag; + private PeekingReiterator<IndexingReiterator.Indexed<T>> underlying; + private Observer<T> observer; - private UnionValueIterator(int tag, Iterator<RawUnionValue> unions, Boolean[] containsTag) { - this.tag = tag; - this.unions = Iterators.peekingIterator(unions); - this.containsTag = containsTag; + // Used to keep track of what has been observed so far. + private final int[] lastObserved; + private final boolean[] doneHasRun; + private final PeekingReiterator[] mostAdvanced; + + public ObservingReiterator(Reiterator<T> underlying, Observer<T> observer) { + this(new PeekingReiterator<>(new IndexingReiterator<>(underlying)), observer); + } + + public ObservingReiterator( + PeekingReiterator<IndexingReiterator.Indexed<T>> underlying, Observer<T> observer) { + this( + underlying, + observer, + new int[] {-1}, + new boolean[] {false}, + new PeekingReiterator[] {underlying}); + } + + private ObservingReiterator( + PeekingReiterator<IndexingReiterator.Indexed<T>> underlying, + Observer<T> observer, + int[] lastObserved, + boolean[] doneHasRun, + PeekingReiterator[] mostAdvanced) { + this.underlying = underlying; + this.observer = observer; + this.lastObserved = lastObserved; + this.doneHasRun = doneHasRun; + this.mostAdvanced = mostAdvanced; + } + + @Override + public Reiterator<T> copy() { + return new ObservingReiterator<T>( + underlying.copy(), observer, lastObserved, doneHasRun, mostAdvanced); } @Override public boolean hasNext() { - if (Boolean.FALSE.equals(containsTag[tag])) { - return false; + boolean hasNext = underlying.hasNext(); + if (!hasNext && !doneHasRun[0]) { + mostAdvanced[0] = underlying; + observer.done(); + doneHasRun[0] = true; } - advance(); - if (unions.hasNext()) { - return true; - } else { - // Now that we've iterated over all the values, we can resolve all the "unknown" null - // values to false. - for (int i = 0; i < containsTag.length; i++) { - if (containsTag[i] == null) { - containsTag[i] = false; - } - } - return false; + return hasNext; + } + + @Override + public T next() { + peek(); // trigger observation *before* advancing + return underlying.next().value; + } + + public T peek() { + IndexingReiterator.Indexed<T> next = underlying.peek(); + if (next.index > lastObserved[0]) { + assert next.index == lastObserved[0] + 1; + mostAdvanced[0] = underlying; + lastObserved[0] = next.index; + observer.observeAt(this); } + return next.value; + } + + public void fastForward() { + if (underlying != mostAdvanced[0]) { + underlying = mostAdvanced[0].copy(); + } + } + } + + /** + * Assigns a monotonically increasing index to each item in teh underling Reiterator. Review comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 701750) Time Spent: 1h 10m (was: 1h) > Use runtime information to improve CoGroupByKey caching > ------------------------------------------------------- > > Key: BEAM-13541 > URL: https://issues.apache.org/jira/browse/BEAM-13541 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas > Reporter: Sunil Pedapudi > Assignee: Robert Bradshaw > Priority: P0 > Fix For: 2.36.0 > > Time Spent: 1h 10m > Remaining Estimate: 0h > > Currently, CoGroupByKey creates UnionTables that are Flattened. The Flattened > output is processed by a GroupByKey to produce a CoGbkResult (via > ConstructCoGbkResultFn). > > Given the performance of CoGBK is greatly impacted based on the which > elements are cached in the (finitely sized) in-memory results, it would be > useful if CoGbkResult can use runtime information to prioritize which > elements are stored in-memory. -- This message was sent by Atlassian Jira (v8.20.1#820001)