scwhittle commented on code in PR #37723:
URL: https://github.com/apache/beam/pull/37723#discussion_r2876903407
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java:
##########
@@ -117,21 +145,26 @@ public void testIgnoresUnknownIds() throws Exception {
@Test
public void testCleanupOnExpiration() throws Exception {
- Runnable callback = mock(Runnable.class);
+ CountDownLatch callback1Executed = new CountDownLatch(1);
finalizer.cacheCommitFinalizers(
- ImmutableMap.of(1L,
Pair.of(Instant.now().plus(Duration.standardHours(1)), callback)));
+ ImmutableMap.of(
+ 1L,
+ Pair.of(
+ Instant.now().plus(Duration.standardHours(1)),
+ () -> callback1Executed.countDown())));
assertEquals(1, finalizer.cleanupQueueSize());
Runnable callback2 = mock(Runnable.class);
- finalizer.cacheCommitFinalizers(
- ImmutableMap.of(2L, Pair.of(Instant.now().plus(Duration.millis(100)),
callback2)));
-
Runnable callback3 = mock(Runnable.class);
+ Instant shortTimeout = Instant.now().plus(Duration.millis(100));
finalizer.cacheCommitFinalizers(
- ImmutableMap.of(3L, Pair.of(Instant.now().plus(Duration.millis(100)),
callback3)));
+ ImmutableMap.<Long, Pair<Instant, Runnable>>builder()
+ .put(2L, Pair.of(shortTimeout, callback2))
+ .put(3L, Pair.of(shortTimeout, callback3))
+ .build());
while (finalizer.cleanupQueueSize() > 1) {
- // Wait until it expires
+ // Wait until the two 100ms timeouts expire.
Thread.sleep(500);
Review Comment:
nit: might as well make this 110 or something that will likely be enough and
shorter, gradle doesn't have great parallelism
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java:
##########
@@ -716,41 +711,36 @@ public void afterBundleCommit(Instant expiry, Callback
callback) {
parDoFn.finishBundle();
- // The counter increases by 1 in StartBundle, 5 in ProcessElement, and 1
in FinishBundle.
- // Total should be 7.
- assertThat(getBundleSuccessCount(), equalTo(7));
+ assertThat(startBundleCount.get(), equalTo(1));
+ assertThat(processElementCount.get(), equalTo(5));
+ assertThat(finishBundleCount.get(), equalTo(1));
}
- private static final AtomicInteger bundleSuccessCount = new AtomicInteger(0);
-
- static void increaseBundleSuccessCount() {
- bundleSuccessCount.incrementAndGet();
- }
-
- static int getBundleSuccessCount() {
- return bundleSuccessCount.get();
- }
+ private static final AtomicInteger startBundleCount = new AtomicInteger(0);
Review Comment:
nit: move these into WithBundleFinalizerDoFn to scope them better?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java:
##########
@@ -170,8 +170,7 @@ public void finalizeCommits(Iterable<Long> finalizeIds) {
}
}
- // Only exposed for tests.
- public int cleanupQueueSize() {
+ int cleanupQueueSize() {
Review Comment:
I still think VisibleForTesting is nice to show why it's not private
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]