scwhittle commented on code in PR #37723:
URL: https://github.com/apache/beam/pull/37723#discussion_r2872051240
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java:
##########
@@ -17,69 +17,166 @@
*/
package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
-import java.time.Duration;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import org.apache.commons.lang3.tuple.Pair;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ThreadSafe
@Internal
final class StreamingCommitFinalizer {
private static final Logger LOG =
LoggerFactory.getLogger(StreamingCommitFinalizer.class);
- private static final Duration DEFAULT_CACHE_ENTRY_EXPIRY =
Duration.ofMinutes(5L);
- private final Cache<Long, Runnable> commitFinalizerCache;
+
+ /** A {@link Runnable} and expiry time pair. */
+ @AutoValue
+ public abstract static class FinalizationInfo {
+ public abstract Long getId();
+
+ public abstract Instant getExpiryTime();
+
+ public abstract Runnable getCallback();
+
+ public static FinalizationInfo create(Long id, Instant expiryTime,
Runnable callback) {
+ return new AutoValue_StreamingCommitFinalizer_FinalizationInfo(id,
expiryTime, callback);
+ }
+ }
+
+ private final ReentrantLock lock = new ReentrantLock();
+ private final Condition queueMinChanged = lock.newCondition();
+
+ @GuardedBy("lock")
+ private final HashMap<Long, FinalizationInfo> commitFinalizationCallbacks =
new HashMap<>();
+
+ @GuardedBy("lock")
+ private final PriorityQueue<FinalizationInfo> cleanUpQueue =
+ new PriorityQueue<>(11,
Comparator.comparing(FinalizationInfo::getExpiryTime));
+
private final BoundedQueueExecutor finalizationExecutor;
- private StreamingCommitFinalizer(
- Cache<Long, Runnable> commitFinalizerCache, BoundedQueueExecutor
finalizationExecutor) {
- this.commitFinalizerCache = commitFinalizerCache;
- this.finalizationExecutor = finalizationExecutor;
+ private StreamingCommitFinalizer(BoundedQueueExecutor
finalizationCleanupExecutor) {
+ finalizationExecutor = finalizationCleanupExecutor;
+ finalizationCleanupExecutor.execute(this::cleanupThreadBody, 0);
+ }
+
+ private void cleanupThreadBody() {
+ lock.lock();
+ try {
+ while (true) {
+ final @Nullable FinalizationInfo minValue = cleanUpQueue.peek();
+ if (minValue == null) {
+ // Wait for an element to be added and loop to re-examine the min.
+ queueMinChanged.await();
+ continue;
+ }
+
+ Instant now = Instant.now();
+ Duration timeDifference = new Duration(now, minValue.getExpiryTime());
+ if (timeDifference.getMillis() < 0
+ || (queueMinChanged.await(timeDifference.getMillis(),
TimeUnit.MILLISECONDS)
+ && cleanUpQueue.peek() == minValue)) {
+ // The minimum element has an expiry time before now, either because
it had elapsed when
+ // we pulled it or because we awaited it, and it is still the
minimum.
+ checkState(minValue == cleanUpQueue.poll());
+ checkState(commitFinalizationCallbacks.remove(minValue.getId()) ==
minValue);
+ }
+ }
+ } catch (InterruptedException e) {
+ // We're being shutdown.
+ } finally {
+ lock.unlock();
+ }
}
static StreamingCommitFinalizer create(BoundedQueueExecutor workExecutor) {
- return new StreamingCommitFinalizer(
-
CacheBuilder.newBuilder().expireAfterWrite(DEFAULT_CACHE_ENTRY_EXPIRY).build(),
- workExecutor);
+ return new StreamingCommitFinalizer(workExecutor);
}
/**
* Stores a map of user worker generated finalization ids and callbacks to
execute once a commit
* has been successfully committed to the backing state store.
*/
- void cacheCommitFinalizers(Map<Long, Runnable> commitCallbacks) {
- commitFinalizerCache.putAll(commitCallbacks);
+ public void cacheCommitFinalizers(Map<Long, Pair<Instant, Runnable>>
callbacks) {
+ for (Map.Entry<Long, Pair<Instant, Runnable>> entry :
callbacks.entrySet()) {
+ Long finalizeId = entry.getKey();
+ final FinalizationInfo info =
+ FinalizationInfo.create(
+ finalizeId, entry.getValue().getLeft(),
entry.getValue().getRight());
+
+ lock.lock();
+ try {
+ FinalizationInfo existingInfo =
commitFinalizationCallbacks.put(finalizeId, info);
+ if (existingInfo != null) {
+ throw new IllegalStateException(
+ "Expected to not have any past callbacks for bundle "
+ + finalizeId
+ + " but had "
+ + existingInfo);
+ }
+ cleanUpQueue.add(info);
+ @SuppressWarnings("ReferenceEquality")
+ boolean newMin = cleanUpQueue.peek() == info;
+ if (newMin) {
+ queueMinChanged.signal();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
}
/**
* When this method is called, the commits associated with the provided
finalizeIds have been
* successfully persisted in the backing state store. If the commitCallback
for the finalizationId
* is still cached it is invoked.
*/
- void finalizeCommits(Iterable<Long> finalizeIds) {
- for (long finalizeId : finalizeIds) {
- @Nullable Runnable finalizeCommit =
commitFinalizerCache.getIfPresent(finalizeId);
- // NOTE: It is possible the same callback id may be removed twice if
- // windmill restarts.
- // TODO: It is also possible for an earlier finalized id to be lost.
- // We should automatically discard all older callbacks for the same
computation and key.
- if (finalizeCommit != null) {
- commitFinalizerCache.invalidate(finalizeId);
- finalizationExecutor.forceExecute(
- () -> {
- try {
- finalizeCommit.run();
- } catch (Throwable t) {
- LOG.error("Source checkpoint finalization failed:", t);
- }
- },
- 0);
+ public void finalizeCommits(Iterable<Long> finalizeIds) {
+ List<Runnable> callbacksToExecute = new ArrayList<>();
+ lock.lock();
+ try {
+ for (long finalizeId : finalizeIds) {
+ @Nullable FinalizationInfo info =
commitFinalizationCallbacks.remove(finalizeId);
+ if (info != null) {
+ checkState(cleanUpQueue.remove(info));
+ callbacksToExecute.add(info.getCallback());
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ for (Runnable callback : callbacksToExecute) {
+ try {
+ finalizationExecutor.execute(callback, 0);
+ } catch (Throwable t) {
+ LOG.error("Commit finalization failed:", t);
}
}
}
+
+ // Only exposed for tests.
Review Comment:
does package private work? ie drop public
There is also the @VisibleForTest annotation for things like this
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java:
##########
@@ -246,6 +246,7 @@ private void processWork(ComputationState computationState,
Work work) {
// Before any processing starts, call any pending OnCommit callbacks.
Nothing that requires
// cleanup should be done before this, since we might exit early here.
commitFinalizer.finalizeCommits(workItem.getSourceState().getFinalizeIdsList());
Review Comment:
would be good to double-check that the way we generate these IDs in the two
places can't collide.
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.lang3.tuple.Pair;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class StreamingCommitFinalizerTest {
+
+ private StreamingCommitFinalizer finalizer;
+ private BoundedQueueExecutor executor;
+
+ @Before
+ public void setUp() {
+ executor =
+ new BoundedQueueExecutor(
+ 10,
+ 60,
+ TimeUnit.SECONDS,
+ 10,
+ 10000000,
+ new ThreadFactoryBuilder()
+ .setNameFormat("FinalizationCallback-%d")
+ .setDaemon(true)
+ .build(),
+ /*useFairMonitor=*/ false);
+ finalizer = StreamingCommitFinalizer.create(executor);
+ }
+
+ @Test
+ public void testCreateAndInit() {
+ assertEquals(0, finalizer.cleanupQueueSize());
+ }
+
+ @Test
+ public void testCacheCommitFinalizer() {
+ Runnable callback = mock(Runnable.class);
+ finalizer.cacheCommitFinalizers(
+ ImmutableMap.of(1L,
Pair.of(Instant.now().plus(Duration.standardHours(1)), callback)));
+ assertEquals(1, finalizer.cleanupQueueSize());
+ }
+
+ @Test
+ public void testThrowErrorOnDuplicateIds() {
+ Runnable callback1 = mock(Runnable.class);
+ Instant expiry = Instant.now().plus(Duration.standardHours(1));
+ finalizer.cacheCommitFinalizers(ImmutableMap.of(1L, Pair.of(expiry,
callback1)));
+
+ Runnable callback2 = mock(Runnable.class);
+ Map<Long, Pair<Instant, Runnable>> duplicateCallback =
+ ImmutableMap.of(1L, Pair.of(expiry, callback2));
+ assertThrows(
+ IllegalStateException.class, () ->
finalizer.cacheCommitFinalizers(duplicateCallback));
+ }
+
+ @Test
+ public void testFinalizeCommits() throws Exception {
+ Runnable callback = mock(Runnable.class);
+ finalizer.cacheCommitFinalizers(
+ ImmutableMap.of(1L,
Pair.of(Instant.now().plus(Duration.standardHours(1)), callback)));
+ finalizer.finalizeCommits(Collections.singletonList(1L));
+ // The executor always has the cleanup thread running. So
elementsOutstanding == 2 while we're
+ // waiting for the finalization callback to run.
+ while (executor.elementsOutstanding() > 1) {
+ Thread.sleep(500);
+ }
+ verify(callback).run();
+ assertEquals(0, finalizer.cleanupQueueSize());
+ }
+
+ @Test
+ public void testIgnoresUnknownIds() throws Exception {
+ Runnable callback = mock(Runnable.class);
+ finalizer.cacheCommitFinalizers(
+ ImmutableMap.of(1L,
Pair.of(Instant.now().plus(Duration.standardHours(1)), callback)));
+ finalizer.finalizeCommits(Collections.singletonList(2L));
+ while (executor.elementsOutstanding() > 1) {
+ Thread.sleep(500);
+ }
+ verify(callback, never()).run();
+ assertEquals(1, finalizer.cleanupQueueSize());
+ }
+
+ @Test
+ public void testCleanupOnExpiration() throws Exception {
+ Runnable callback = mock(Runnable.class);
+ finalizer.cacheCommitFinalizers(
+ ImmutableMap.of(1L,
Pair.of(Instant.now().plus(Duration.standardHours(1)), callback)));
+ assertEquals(1, finalizer.cleanupQueueSize());
+
+ Runnable callback2 = mock(Runnable.class);
+ finalizer.cacheCommitFinalizers(
+ ImmutableMap.of(2L, Pair.of(Instant.now().plus(Duration.millis(100)),
callback2)));
+
+ Runnable callback3 = mock(Runnable.class);
+ finalizer.cacheCommitFinalizers(
+ ImmutableMap.of(3L, Pair.of(Instant.now().plus(Duration.millis(100)),
callback3)));
+
+ while (finalizer.cleanupQueueSize() > 1) {
+ // Wait until it expires
Review Comment:
nit: until the two short timeouts expire.
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java:
##########
@@ -633,4 +640,117 @@ public <T> T get(PCollectionView<T> view, final
BoundedWindow window) {
throw new IllegalArgumentException("calling getSideInput() with unknown
view");
}
}
+
+ @Test
+ public void testBundleFinalizer() throws Exception {
+ bundleSuccessCount.set(0);
+ DoFnInfo<Long, String> fnInfo =
+ DoFnInfo.forFn(
+ new WithBundleFinalizerDoFn(),
+ WindowingStrategy.globalDefault(),
+ null /* side input views */,
+ VarLongCoder.of(),
+ MAIN_OUTPUT,
+ DoFnSchemaInformation.create(),
+ Collections.emptyMap());
+ DataflowExecutionContext.DataflowStepContext userStepContext =
+ Mockito.mock(
+ DataflowExecutionContext.DataflowStepContext.class,
+ invocation -> {
+ if (invocation.getMethod().getName().equals("bundleFinalizer")) {
+ return new BundleFinalizer() {
+ @Override
+ public void afterBundleCommit(Instant expiry, Callback
callback) {
+ try {
+ callback.onBundleSuccess();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+ return invocation.getMethod().invoke(stepContext,
invocation.getArguments());
+ });
+
+ DataflowStepContext stepContextWithBundleFinalizer =
+ Mockito.mock(
+ DataflowStepContext.class,
+ invocation -> {
+ if (invocation.getMethod().getName().equals("bundleFinalizer")) {
Review Comment:
Do we need this here? Why on both on the DataflowStepContext and
DataflowExecutionContext.DataflowStepContext? Maybe some comments to help
explain
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java:
##########
@@ -633,4 +640,117 @@ public <T> T get(PCollectionView<T> view, final
BoundedWindow window) {
throw new IllegalArgumentException("calling getSideInput() with unknown
view");
}
}
+
+ @Test
+ public void testBundleFinalizer() throws Exception {
+ bundleSuccessCount.set(0);
+ DoFnInfo<Long, String> fnInfo =
+ DoFnInfo.forFn(
+ new WithBundleFinalizerDoFn(),
+ WindowingStrategy.globalDefault(),
+ null /* side input views */,
+ VarLongCoder.of(),
+ MAIN_OUTPUT,
+ DoFnSchemaInformation.create(),
+ Collections.emptyMap());
+ DataflowExecutionContext.DataflowStepContext userStepContext =
+ Mockito.mock(
+ DataflowExecutionContext.DataflowStepContext.class,
+ invocation -> {
+ if (invocation.getMethod().getName().equals("bundleFinalizer")) {
+ return new BundleFinalizer() {
+ @Override
+ public void afterBundleCommit(Instant expiry, Callback
callback) {
+ try {
+ callback.onBundleSuccess();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+ return invocation.getMethod().invoke(stepContext,
invocation.getArguments());
+ });
+
+ DataflowStepContext stepContextWithBundleFinalizer =
+ Mockito.mock(
+ DataflowStepContext.class,
+ invocation -> {
+ if (invocation.getMethod().getName().equals("bundleFinalizer")) {
+ return new BundleFinalizer() {
+ @Override
+ public void afterBundleCommit(Instant expiry, Callback
callback) {
+ try {
+ callback.onBundleSuccess();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+ if (invocation.getMethod().getName().equals("namespacedToUser"))
{
+ return userStepContext;
+ }
+ return invocation.getMethod().invoke(stepContext,
invocation.getArguments());
+ });
+
+ ParDoFn parDoFn =
+ new SimpleParDoFn<>(
+ options,
+ DoFnInstanceManagers.singleInstance(fnInfo),
+ new EmptySideInputReader(),
+ MAIN_OUTPUT,
+ ImmutableMap.of(MAIN_OUTPUT, 0),
+ stepContextWithBundleFinalizer,
+ operationContext,
+ DoFnSchemaInformation.create(),
+ Collections.emptyMap(),
+ SimpleDoFnRunnerFactory.INSTANCE);
+
+ parDoFn.startBundle(new TestReceiver());
+
+ // Process a few elements
+ for (int i = 0; i < 5; i++) {
+ parDoFn.processElement(WindowedValues.valueInGlobalWindow(1L));
+ }
+
+ parDoFn.finishBundle();
Review Comment:
what triggers the bundle being notified as successfully committed and thus
ensures all the callbacks are called before the assert below?
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.lang3.tuple.Pair;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class StreamingCommitFinalizerTest {
+
+ private StreamingCommitFinalizer finalizer;
+ private BoundedQueueExecutor executor;
+
+ @Before
+ public void setUp() {
+ executor =
+ new BoundedQueueExecutor(
+ 10,
+ 60,
+ TimeUnit.SECONDS,
+ 10,
+ 10000000,
+ new ThreadFactoryBuilder()
+ .setNameFormat("FinalizationCallback-%d")
+ .setDaemon(true)
+ .build(),
+ /*useFairMonitor=*/ false);
+ finalizer = StreamingCommitFinalizer.create(executor);
+ }
+
+ @Test
+ public void testCreateAndInit() {
+ assertEquals(0, finalizer.cleanupQueueSize());
+ }
+
+ @Test
+ public void testCacheCommitFinalizer() {
+ Runnable callback = mock(Runnable.class);
+ finalizer.cacheCommitFinalizers(
+ ImmutableMap.of(1L,
Pair.of(Instant.now().plus(Duration.standardHours(1)), callback)));
+ assertEquals(1, finalizer.cleanupQueueSize());
+ }
+
+ @Test
+ public void testThrowErrorOnDuplicateIds() {
+ Runnable callback1 = mock(Runnable.class);
+ Instant expiry = Instant.now().plus(Duration.standardHours(1));
+ finalizer.cacheCommitFinalizers(ImmutableMap.of(1L, Pair.of(expiry,
callback1)));
+
+ Runnable callback2 = mock(Runnable.class);
+ Map<Long, Pair<Instant, Runnable>> duplicateCallback =
+ ImmutableMap.of(1L, Pair.of(expiry, callback2));
+ assertThrows(
+ IllegalStateException.class, () ->
finalizer.cacheCommitFinalizers(duplicateCallback));
+ }
+
+ @Test
+ public void testFinalizeCommits() throws Exception {
+ Runnable callback = mock(Runnable.class);
+ finalizer.cacheCommitFinalizers(
Review Comment:
would be a better test if several different IDs were cached, and you
finalized one at a time and verify the right one is run
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.lang3.tuple.Pair;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class StreamingCommitFinalizerTest {
+
+ private StreamingCommitFinalizer finalizer;
+ private BoundedQueueExecutor executor;
+
+ @Before
+ public void setUp() {
+ executor =
+ new BoundedQueueExecutor(
+ 10,
+ 60,
+ TimeUnit.SECONDS,
+ 10,
+ 10000000,
+ new ThreadFactoryBuilder()
+ .setNameFormat("FinalizationCallback-%d")
+ .setDaemon(true)
+ .build(),
+ /*useFairMonitor=*/ false);
+ finalizer = StreamingCommitFinalizer.create(executor);
+ }
+
+ @Test
+ public void testCreateAndInit() {
+ assertEquals(0, finalizer.cleanupQueueSize());
+ }
+
+ @Test
+ public void testCacheCommitFinalizer() {
+ Runnable callback = mock(Runnable.class);
+ finalizer.cacheCommitFinalizers(
+ ImmutableMap.of(1L,
Pair.of(Instant.now().plus(Duration.standardHours(1)), callback)));
+ assertEquals(1, finalizer.cleanupQueueSize());
+ }
+
+ @Test
+ public void testThrowErrorOnDuplicateIds() {
+ Runnable callback1 = mock(Runnable.class);
+ Instant expiry = Instant.now().plus(Duration.standardHours(1));
+ finalizer.cacheCommitFinalizers(ImmutableMap.of(1L, Pair.of(expiry,
callback1)));
+
+ Runnable callback2 = mock(Runnable.class);
+ Map<Long, Pair<Instant, Runnable>> duplicateCallback =
+ ImmutableMap.of(1L, Pair.of(expiry, callback2));
+ assertThrows(
+ IllegalStateException.class, () ->
finalizer.cacheCommitFinalizers(duplicateCallback));
+ }
+
+ @Test
+ public void testFinalizeCommits() throws Exception {
+ Runnable callback = mock(Runnable.class);
+ finalizer.cacheCommitFinalizers(
+ ImmutableMap.of(1L,
Pair.of(Instant.now().plus(Duration.standardHours(1)), callback)));
+ finalizer.finalizeCommits(Collections.singletonList(1L));
+ // The executor always has the cleanup thread running. So
elementsOutstanding == 2 while we're
+ // waiting for the finalization callback to run.
+ while (executor.elementsOutstanding() > 1) {
+ Thread.sleep(500);
+ }
+ verify(callback).run();
Review Comment:
instead of a mock runnable how about a runnable that counts down a
CountDownLatch. then you can just block on that instead of using the less
direct elementsOutstanding
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java:
##########
@@ -633,4 +640,117 @@ public <T> T get(PCollectionView<T> view, final
BoundedWindow window) {
throw new IllegalArgumentException("calling getSideInput() with unknown
view");
}
}
+
+ @Test
+ public void testBundleFinalizer() throws Exception {
+ bundleSuccessCount.set(0);
+ DoFnInfo<Long, String> fnInfo =
+ DoFnInfo.forFn(
+ new WithBundleFinalizerDoFn(),
+ WindowingStrategy.globalDefault(),
+ null /* side input views */,
+ VarLongCoder.of(),
+ MAIN_OUTPUT,
+ DoFnSchemaInformation.create(),
+ Collections.emptyMap());
+ DataflowExecutionContext.DataflowStepContext userStepContext =
+ Mockito.mock(
+ DataflowExecutionContext.DataflowStepContext.class,
+ invocation -> {
+ if (invocation.getMethod().getName().equals("bundleFinalizer")) {
+ return new BundleFinalizer() {
+ @Override
+ public void afterBundleCommit(Instant expiry, Callback
callback) {
+ try {
+ callback.onBundleSuccess();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+ return invocation.getMethod().invoke(stepContext,
invocation.getArguments());
+ });
+
+ DataflowStepContext stepContextWithBundleFinalizer =
+ Mockito.mock(
+ DataflowStepContext.class,
+ invocation -> {
+ if (invocation.getMethod().getName().equals("bundleFinalizer")) {
+ return new BundleFinalizer() {
+ @Override
+ public void afterBundleCommit(Instant expiry, Callback
callback) {
+ try {
+ callback.onBundleSuccess();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+ if (invocation.getMethod().getName().equals("namespacedToUser"))
{
+ return userStepContext;
+ }
+ return invocation.getMethod().invoke(stepContext,
invocation.getArguments());
+ });
+
+ ParDoFn parDoFn =
+ new SimpleParDoFn<>(
+ options,
+ DoFnInstanceManagers.singleInstance(fnInfo),
+ new EmptySideInputReader(),
+ MAIN_OUTPUT,
+ ImmutableMap.of(MAIN_OUTPUT, 0),
+ stepContextWithBundleFinalizer,
+ operationContext,
+ DoFnSchemaInformation.create(),
+ Collections.emptyMap(),
+ SimpleDoFnRunnerFactory.INSTANCE);
+
+ parDoFn.startBundle(new TestReceiver());
+
+ // Process a few elements
+ for (int i = 0; i < 5; i++) {
+ parDoFn.processElement(WindowedValues.valueInGlobalWindow(1L));
+ }
+
+ parDoFn.finishBundle();
+
+ // The counter increases by 1 in StartBundle, 5 in ProcessElement, and 1
in FinishBundle.
+ // Total should be 7.
+ assertThat(getBundleSuccessCount(), equalTo(7));
Review Comment:
maybe better to just duplicate the Atomics and methods so that you can
explicitly verify that start/ 5 process /finish are called.
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.lang3.tuple.Pair;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class StreamingCommitFinalizerTest {
+
+ private StreamingCommitFinalizer finalizer;
+ private BoundedQueueExecutor executor;
+
+ @Before
+ public void setUp() {
+ executor =
+ new BoundedQueueExecutor(
+ 10,
+ 60,
+ TimeUnit.SECONDS,
+ 10,
+ 10000000,
+ new ThreadFactoryBuilder()
+ .setNameFormat("FinalizationCallback-%d")
+ .setDaemon(true)
+ .build(),
+ /*useFairMonitor=*/ false);
+ finalizer = StreamingCommitFinalizer.create(executor);
+ }
+
+ @Test
+ public void testCreateAndInit() {
+ assertEquals(0, finalizer.cleanupQueueSize());
+ }
+
+ @Test
+ public void testCacheCommitFinalizer() {
+ Runnable callback = mock(Runnable.class);
+ finalizer.cacheCommitFinalizers(
+ ImmutableMap.of(1L,
Pair.of(Instant.now().plus(Duration.standardHours(1)), callback)));
+ assertEquals(1, finalizer.cleanupQueueSize());
Review Comment:
assert the callback had no invocations
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]