[ 
https://issues.apache.org/jira/browse/BEAM-13541?focusedWorklogId=702378&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-702378
 ]

ASF GitHub Bot logged work on BEAM-13541:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/Dec/21 18:18
            Start Date: 30/Dec/21 18:18
    Worklog Time Spent: 10m 
      Work Description: robertwb commented on a change in pull request #16354:
URL: https://github.com/apache/beam/pull/16354#discussion_r776826720



##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGbkResultTest.java
##########
@@ -57,6 +74,175 @@ public void runLazyResult(int cacheSize) {
     assertThat(result.getAll(new TupleTag<>("tag0")), contains(0, 2, 4));
   }
 
+  @Test
+  public void testLazyResults() {
+    TestUnionValues values = new TestUnionValues(0, 0, 1, 1, 0, 1, 1);
+    CoGbkResult result = new CoGbkResult(createSchema(5), values, 0, 2);
+    // Nothing is read until we try to iterate.
+    assertThat(values.maxPos(), equalTo(0));
+    Iterable<?> tag0iterable = result.getAll("tag0");
+    assertThat(values.maxPos(), equalTo(0));
+    tag0iterable.iterator();
+    assertThat(values.maxPos(), equalTo(0));
+
+    // Iterating reads (nearly) the minimal number of values.
+    Iterator<?> tag0 = tag0iterable.iterator();
+    tag0.next();
+    assertThat(values.maxPos(), lessThanOrEqualTo(2));
+    tag0.next();
+    assertThat(values.maxPos(), equalTo(2));
+    // Note that we're skipping over tag 1.
+    tag0.next();
+    assertThat(values.maxPos(), equalTo(5));
+
+    // Iterating again does not cause more reads.
+    Iterator<?> tag0iterAgain = tag0iterable.iterator();
+    tag0iterAgain.next();
+    tag0iterAgain.next();
+    tag0iterAgain.next();
+    assertThat(values.maxPos(), equalTo(5));
+
+    // Iterating over other tags does not cause more reads for values we have 
seen.
+    Iterator<?> tag1 = result.getAll("tag1").iterator();
+    tag1.next();
+    tag1.next();
+    assertThat(values.maxPos(), equalTo(5));
+    // However, finding the next tag1 value does require more reads.
+    tag1.next();
+    assertThat(values.maxPos(), equalTo(6));
+  }
+
+  @Test
+  @SuppressWarnings("BoxedPrimitiveEquality")
+  public void testCachedResults() {
+    // Ensure we don't fail below due to odd VM settings.

Review comment:
       java.lang.Integer.IntegerCache.high. Clarified to be more explicit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 702378)
    Time Spent: 1.5h  (was: 1h 20m)

> Use runtime information to improve CoGroupByKey caching
> -------------------------------------------------------
>
>                 Key: BEAM-13541
>                 URL: https://issues.apache.org/jira/browse/BEAM-13541
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-ideas
>            Reporter: Sunil Pedapudi
>            Assignee: Robert Bradshaw
>            Priority: P0
>             Fix For: 2.36.0
>
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Currently, CoGroupByKey creates UnionTables that are Flattened. The Flattened 
> output is processed by a GroupByKey to produce a CoGbkResult (via 
> ConstructCoGbkResultFn). 
>  
> Given the performance of CoGBK is greatly impacted based on the which 
> elements are cached in the (finitely sized) in-memory results, it would be 
> useful if CoGbkResult can use runtime information to prioritize which 
> elements are stored in-memory.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to