scwhittle commented on code in PR #37304:
URL: https://github.com/apache/beam/pull/37304#discussion_r2774345061
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/FinalizeBundleHandler.java:
##########
@@ -64,55 +65,53 @@ public static CallbackRegistration create(
}
private final ConcurrentMap<String, Collection<CallbackRegistration>>
bundleFinalizationCallbacks;
+ private final ReentrantLock cleanupLock = new ReentrantLock();
+ private final Condition queueMinChanged = cleanupLock.newCondition();
+
+ @GuardedBy("cleanupLock")
private final PriorityQueue<TimestampedValue<String>> cleanUpQueue;
@SuppressWarnings("unused")
- private final Future<Void> cleanUpResult;
+ private final Future<?> cleanUpResult;
+ @SuppressWarnings("methodref.receiver.bound")
public FinalizeBundleHandler(ExecutorService executorService) {
this.bundleFinalizationCallbacks = new ConcurrentHashMap<>();
this.cleanUpQueue =
new PriorityQueue<>(11,
Comparator.comparing(TimestampedValue::getTimestamp));
- // Wait until we have at least one element. We are notified on each element
- // being added.
- // Wait until the current time has past the expiry time for the head of the
- // queue.
- // We are notified on each element being added.
- // Wait until we have at least one element. We are notified on each element
- // being added.
- // Wait until the current time has past the expiry time for the head of the
- // queue.
- // We are notified on each element being added.
- cleanUpResult =
- executorService.submit(
- (Callable<Void>)
- () -> {
- while (true) {
- synchronized (cleanUpQueue) {
- TimestampedValue<String> expiryTime =
cleanUpQueue.peek();
-
- // Wait until we have at least one element. We are
notified on each element
- // being added.
- while (expiryTime == null) {
- cleanUpQueue.wait();
- expiryTime = cleanUpQueue.peek();
- }
-
- // Wait until the current time has past the expiry time
for the head of the
- // queue.
- // We are notified on each element being added.
- Instant now = Instant.now();
- while (expiryTime.getTimestamp().isAfter(now)) {
- Duration timeDifference = new Duration(now,
expiryTime.getTimestamp());
- cleanUpQueue.wait(timeDifference.getMillis());
- expiryTime = cleanUpQueue.peek();
- now = Instant.now();
- }
-
-
bundleFinalizationCallbacks.remove(cleanUpQueue.poll().getValue());
- }
- }
- });
+
+ cleanUpResult = executorService.submit(this::cleanupThreadBody);
+ }
+
+ private void cleanupThreadBody() {
+ cleanupLock.lock();
+ try {
+ while (true) {
+ final @Nullable TimestampedValue<String> minValue =
cleanUpQueue.peek();
+ if (minValue == null) {
+ // Wait for an element to be added and loop to re-examine the min.
+ queueMinChanged.await();
+ continue;
+ }
+
+ Instant now = Instant.now();
+ Duration timeDifference = new Duration(now, minValue.getTimestamp());
+ if (timeDifference.getMillis() > 0
+ && queueMinChanged.await(timeDifference.getMillis(),
TimeUnit.MILLISECONDS)) {
+ // If the time didn't elapse, loop to re-examine the min.
+ continue;
+ }
+
+ // The minimum element has an expiry time before now.
+ // It may or may not actually be present in the map if the
finalization has already been
+ // completed.
+ bundleFinalizationCallbacks.remove(minValue.getValue());
Review Comment:
Fixed the problem and added a better test. I also changed to start removing
things from the cleanup queue as if we have arbitrary timeouts they could be
large and we would be wasting memory on things that completed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]