[ https://issues.apache.org/jira/browse/BEAM-13541?focusedWorklogId=702423&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-702423 ]
ASF GitHub Bot logged work on BEAM-13541: ----------------------------------------- Author: ASF GitHub Bot Created on: 30/Dec/21 19:44 Start Date: 30/Dec/21 19:44 Worklog Time Spent: 10m Work Description: robertwb commented on a change in pull request #16354: URL: https://github.com/apache/beam/pull/16354#discussion_r776847987 ########## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java ########## @@ -361,62 +377,332 @@ private CoGbkResult(CoGbkResultSchema schema, List<Iterable<?>> valueMap) { } /** - * Lazily filters and recasts an {@code Iterator<RawUnionValue>} into an {@code Iterator<V>}, - * where V is the type of the raw union value's contents. + * A re-iterable that notifies an observer at every advance, and upon finishing, but only once + * across all copies. + * + * @param <T> The value type of the underlying iterable. */ - private static class UnionValueIterator<V> implements Iterator<V> { + private static class ObservingReiterator<T> implements Reiterator<T> { + + public interface Observer<T> { + /** + * Called exactly once, across all copies before advancing this iterator. + * + * <p>The iterator rather than the element is given so that the callee can perform a copy if + * desired. This class offers a peek method to get at the current element without disturbing + * the state of this iterator. + */ + void observeAt(ObservingReiterator<T> reiterator); + + /** Called exactly once, across all copies, once this iterator is exhausted. */ + void done(); + } - private final int tag; - private final PeekingIterator<RawUnionValue> unions; - private final Boolean[] containsTag; + private PeekingReiterator<IndexingReiterator.Indexed<T>> underlying; + private Observer<T> observer; - private UnionValueIterator(int tag, Iterator<RawUnionValue> unions, Boolean[] containsTag) { - this.tag = tag; - this.unions = Iterators.peekingIterator(unions); - this.containsTag = containsTag; + // Used to keep track of what has been observed so far. + // These are arrays to facilitate sharing values among all copies of the same root Reiterator. + private final int[] lastObserved; + private final boolean[] doneHasRun; + private final PeekingReiterator[] mostAdvanced; + + public ObservingReiterator(Reiterator<T> underlying, Observer<T> observer) { + this(new PeekingReiterator<>(new IndexingReiterator<>(underlying)), observer); + } + + public ObservingReiterator( + PeekingReiterator<IndexingReiterator.Indexed<T>> underlying, Observer<T> observer) { + this( + underlying, + observer, + new int[] {-1}, + new boolean[] {false}, + new PeekingReiterator[] {underlying}); + } + + private ObservingReiterator( + PeekingReiterator<IndexingReiterator.Indexed<T>> underlying, + Observer<T> observer, + int[] lastObserved, + boolean[] doneHasRun, + PeekingReiterator[] mostAdvanced) { + this.underlying = underlying; + this.observer = observer; + this.lastObserved = lastObserved; + this.doneHasRun = doneHasRun; + this.mostAdvanced = mostAdvanced; + } + + @Override + public Reiterator<T> copy() { + return new ObservingReiterator<T>( + underlying.copy(), observer, lastObserved, doneHasRun, mostAdvanced); } @Override public boolean hasNext() { - if (Boolean.FALSE.equals(containsTag[tag])) { - return false; + boolean hasNext = underlying.hasNext(); + if (!hasNext && !doneHasRun[0]) { + mostAdvanced[0] = underlying; + observer.done(); + doneHasRun[0] = true; } - advance(); - if (unions.hasNext()) { - return true; - } else { - // Now that we've iterated over all the values, we can resolve all the "unknown" null - // values to false. - for (int i = 0; i < containsTag.length; i++) { - if (containsTag[i] == null) { - containsTag[i] = false; - } - } - return false; + return hasNext; + } + + @Override + public T next() { + peek(); // trigger observation *before* advancing + return underlying.next().value; + } + + public T peek() { + IndexingReiterator.Indexed<T> next = underlying.peek(); + if (next.index > lastObserved[0]) { + assert next.index == lastObserved[0] + 1; + mostAdvanced[0] = underlying; + lastObserved[0] = next.index; + observer.observeAt(this); } + return next.value; + } + + public void fastForward() { + if (underlying != mostAdvanced[0]) { + underlying = mostAdvanced[0].copy(); + } + } + } + + /** + * Assigns a monotonically increasing index to each item in the underling Reiterator. + * + * @param <T> The value type of the underlying iterable. + */ + private static class IndexingReiterator<T> implements Reiterator<IndexingReiterator.Indexed<T>> { + + private Reiterator<T> underlying; + private int index; + + public IndexingReiterator(Reiterator<T> underlying) { + this(underlying, 0); + } + + public IndexingReiterator(Reiterator<T> underlying, int start) { + this.underlying = underlying; + this.index = start; } @Override - @SuppressWarnings("unchecked") - public V next() { - advance(); - return (V) unions.next().getValue(); + public IndexingReiterator<T> copy() { + return new IndexingReiterator(underlying.copy(), index); } - private void advance() { - while (unions.hasNext()) { - int curTag = unions.peek().getUnionTag(); - containsTag[curTag] = true; - if (curTag == tag) { - break; - } - unions.next(); + @Override + public boolean hasNext() { + return underlying.hasNext(); + } + + @Override + public Indexed<T> next() { + return new Indexed<T>(index++, underlying.next()); + } + + public static class Indexed<T> { + public final int index; + public final T value; + + public Indexed(int index, T value) { + this.index = index; + this.value = value; + } + } + } + + /** + * Adapts an Reiterator, giving it a peek() method that can be used to observe the next element + * without consuming it. + * + * @param <T> The value type of the underlying iterable. + */ + private static class PeekingReiterator<T> implements Reiterator<T> { + private Reiterator<T> underlying; + private T next; + private boolean nextIsValid; + + public PeekingReiterator(Reiterator<T> underlying) { + this(underlying, null, false); + } + + private PeekingReiterator(Reiterator<T> underlying, T next, boolean nextIsValid) { + this.underlying = underlying; + this.next = next; + this.nextIsValid = nextIsValid; + } + + @Override + public PeekingReiterator<T> copy() { + return new PeekingReiterator(underlying.copy(), next, nextIsValid); + } + + @Override + public boolean hasNext() { + return nextIsValid || underlying.hasNext(); + } + + @Override + public T next() { + if (nextIsValid) { + nextIsValid = false; + return next; + } else { + return underlying.next(); + } + } + + public T peek() { + if (!nextIsValid) { + next = underlying.next(); + nextIsValid = true; + } + return next; + } + } + + /** + * An Iterable corresponding to a single tag. + * + * <p>The values in this iterable are populated lazily via the offer method as tip advances for + * any tag. + * + * @param <T> The value type of the corresponging tag. + */ + private static class TagIterable<T> implements Iterable<T> { + int tag; + int cacheSize; + Supplier<Boolean> forceCache; Review comment: Leftover from a refactor, removing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 702423) Time Spent: 2h 20m (was: 2h 10m) > Use runtime information to improve CoGroupByKey caching > ------------------------------------------------------- > > Key: BEAM-13541 > URL: https://issues.apache.org/jira/browse/BEAM-13541 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas > Reporter: Sunil Pedapudi > Assignee: Robert Bradshaw > Priority: P0 > Fix For: 2.36.0 > > Time Spent: 2h 20m > Remaining Estimate: 0h > > Currently, CoGroupByKey creates UnionTables that are Flattened. The Flattened > output is processed by a GroupByKey to produce a CoGbkResult (via > ConstructCoGbkResultFn). > > Given the performance of CoGBK is greatly impacted based on the which > elements are cached in the (finitely sized) in-memory results, it would be > useful if CoGbkResult can use runtime information to prioritize which > elements are stored in-memory. -- This message was sent by Atlassian Jira (v8.20.1#820001)