This is an automated email from the ASF dual-hosted git repository.

rzo1 pushed a commit to branch concurency
in repository https://gitbox.apache.org/repos/asf/tomee.git

commit d05eb5dd8a73e742fc891ca154423a76ecaa7809
Author: Richard Zowalla <[email protected]>
AuthorDate: Thu Apr 2 19:34:16 2026 +0200

    Rewrite scheduled async with manual trigger loop and context preservation
    
    Two fixes for the last Web-profile TCK failures:
    
    1. Context propagation: when executor is a plain MES (not MSES),
       extract the MES's ContextServiceImpl and compose a temporary MSES
       that uses the MES's context service with the default MSES's thread
       pool. This preserves third-party context propagation (e.g. 
StringContext).
    
    2. Thread pool starvation: replace mses.schedule(Callable, Trigger)
       with a manual trigger loop that schedules directly on the delegate
       ScheduledExecutorService. This avoids TriggerTask's double context
       wrapping and thread consumption between trigger fires.
---
 .../cdi/concurrency/AsynchronousInterceptor.java   | 182 ++++++++++++++++-----
 1 file changed, 144 insertions(+), 38 deletions(-)

diff --git 
a/container/openejb-core/src/main/java/org/apache/openejb/cdi/concurrency/AsynchronousInterceptor.java
 
b/container/openejb-core/src/main/java/org/apache/openejb/cdi/concurrency/AsynchronousInterceptor.java
index 01569d9b5d..263e34aaa5 100644
--- 
a/container/openejb-core/src/main/java/org/apache/openejb/cdi/concurrency/AsynchronousInterceptor.java
+++ 
b/container/openejb-core/src/main/java/org/apache/openejb/cdi/concurrency/AsynchronousInterceptor.java
@@ -18,8 +18,8 @@ package org.apache.openejb.cdi.concurrency;
 
 import jakarta.annotation.Priority;
 import jakarta.enterprise.concurrent.Asynchronous;
+import jakarta.enterprise.concurrent.LastExecution;
 import jakarta.enterprise.concurrent.ManagedExecutorService;
-import jakarta.enterprise.concurrent.ManagedScheduledExecutorService;
 import jakarta.enterprise.concurrent.Schedule;
 import jakarta.enterprise.concurrent.ZonedTrigger;
 import jakarta.interceptor.AroundInvoke;
@@ -28,19 +28,27 @@ import jakarta.interceptor.InvocationContext;
 import org.apache.openejb.core.ivm.naming.NamingException;
 import org.apache.openejb.resource.thread.ManagedExecutorServiceImplFactory;
 import 
org.apache.openejb.resource.thread.ManagedScheduledExecutorServiceImplFactory;
+import org.apache.openejb.threads.impl.ContextServiceImpl;
+import org.apache.openejb.threads.impl.ManagedExecutorServiceImpl;
+import org.apache.openejb.threads.impl.ManagedScheduledExecutorServiceImpl;
 import org.apache.openejb.util.LogCategory;
 import org.apache.openejb.util.Logger;
 
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Method;
+import java.time.Duration;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
 import java.util.Arrays;
+import java.util.Date;
 import java.util.Map;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 @Interceptor
@@ -113,25 +121,14 @@ public class AsynchronousInterceptor {
                                           final Schedule[] schedules) throws 
Exception {
         // Per spec, the executor attribute may reference either a 
ManagedScheduledExecutorService
         // or a plain ManagedExecutorService. When a plain MES is referenced, 
fall back to the
-        // default MSES for scheduling capability (the trigger mechanism 
requires MSES).
-        ManagedScheduledExecutorService mses;
-        try {
-            mses = 
ManagedScheduledExecutorServiceImplFactory.lookup(asynchronous.executor());
-        } catch (final IllegalArgumentException e) {
-            // The executor might be a plain ManagedExecutorService — verify 
it exists,
-            // then use the default MSES for scheduling
-            try {
-                
ManagedExecutorServiceImplFactory.lookup(asynchronous.executor());
-                // MES exists — use default MSES for scheduling
-                mses = ManagedScheduledExecutorServiceImplFactory.lookup(
-                        "java:comp/DefaultManagedScheduledExecutorService");
-            } catch (final Exception fallbackEx) {
-                throw new RejectedExecutionException("Cannot lookup executor 
for scheduled async method", e);
-            }
-        }
+        // default MSES for scheduling capability but preserve the MES's 
context service.
+        final ManagedScheduledExecutorServiceImpl mses = 
resolveMses(asynchronous.executor());
 
         final ZonedTrigger trigger = ScheduleHelper.toTrigger(schedules);
         final boolean isVoid = ctx.getMethod().getReturnType() == Void.TYPE;
+        final ContextServiceImpl ctxService = (ContextServiceImpl) 
mses.getContextService();
+        final ContextServiceImpl.Snapshot snapshot = ctxService.snapshot(null);
+        final ScheduledExecutorService delegate = mses.getDelegate();
 
         // A single CompletableFuture represents ALL executions in the 
schedule.
         // Per spec: "A single future represents the completion of all 
executions in the schedule."
@@ -141,51 +138,160 @@ public class AsynchronousInterceptor {
         //   - the future is completed (via Asynchronous.Result.complete()) or 
cancelled
         final CompletableFuture<Object> outerFuture = 
mses.newIncompleteFuture();
         final AtomicReference<ScheduledFuture<?>> scheduledRef = new 
AtomicReference<>();
+        final AtomicReference<LastExecution> lastExecutionRef = new 
AtomicReference<>();
+
+        // Schedule the first execution via the manual trigger loop
+        scheduleNextExecution(delegate, snapshot, ctxService, trigger, 
outerFuture,
+                ctx, isVoid, scheduledRef, lastExecutionRef);
+
+        // Cancel the underlying scheduled task when the future completes 
externally
+        // (e.g. Asynchronous.Result.complete() or cancel())
+        outerFuture.whenComplete((final Object val, final Throwable err) -> {
+            final ScheduledFuture<?> sf = scheduledRef.get();
+            if (sf != null) {
+                sf.cancel(false);
+            }
+        });
+
+        return isVoid ? null : outerFuture;
+    }
+
+    private ManagedScheduledExecutorServiceImpl resolveMses(final String 
executorName) {
+        try {
+            return 
ManagedScheduledExecutorServiceImplFactory.lookup(executorName);
+        } catch (final IllegalArgumentException e) {
+            // The executor might be a plain ManagedExecutorService — verify 
it exists,
+            // then use the default MSES for scheduling with the MES's context 
service
+            try {
+                final ManagedExecutorServiceImpl plainMes = 
ManagedExecutorServiceImplFactory.lookup(executorName);
+                final ContextServiceImpl mesContextService = 
(ContextServiceImpl) plainMes.getContextService();
+                final ManagedScheduledExecutorServiceImpl defaultMses =
+                        
ManagedScheduledExecutorServiceImplFactory.lookup("java:comp/DefaultManagedScheduledExecutorService");
+                return new 
ManagedScheduledExecutorServiceImpl(defaultMses.getDelegate(), 
mesContextService);
+            } catch (final Exception fallbackEx) {
+                throw new RejectedExecutionException("Cannot lookup executor 
for scheduled async method", e);
+            }
+        }
+    }
+
+    private void scheduleNextExecution(final ScheduledExecutorService 
delegate, final ContextServiceImpl.Snapshot snapshot,
+                                       final ContextServiceImpl ctxService, 
final ZonedTrigger trigger,
+                                       final CompletableFuture<Object> future, 
final InvocationContext ctx,
+                                       final boolean isVoid, final 
AtomicReference<ScheduledFuture<?>> scheduledRef,
+                                       final AtomicReference<LastExecution> 
lastExecutionRef) {
+        final ZonedDateTime taskScheduledTime = ZonedDateTime.now();
+        final ZonedDateTime nextRun = 
trigger.getNextRunTime(lastExecutionRef.get(), taskScheduledTime);
+        if (nextRun == null || future.isDone()) {
+            return;
+        }
+
+        final long delayMs = Duration.between(ZonedDateTime.now(), 
nextRun).toMillis();
+
+        final ScheduledFuture<?> sf = delegate.schedule(() -> {
+            if (future.isDone()) {
+                return;
+            }
 
-        final ScheduledFuture<?> scheduledFuture = 
mses.schedule((Callable<Object>) () -> {
+            final ContextServiceImpl.State state = ctxService.enter(snapshot);
             try {
-                Asynchronous.Result.setFuture(outerFuture);
+                if (trigger.skipRun(lastExecutionRef.get(), nextRun)) {
+                    // Skipped — reschedule for the next run
+                    scheduleNextExecution(delegate, snapshot, ctxService, 
trigger, future,
+                            ctx, isVoid, scheduledRef, lastExecutionRef);
+                    return;
+                }
+
+                final ZonedDateTime runStart = ZonedDateTime.now();
+                Asynchronous.Result.setFuture(future);
                 final Object result = ctx.proceed();
+                final ZonedDateTime runEnd = ZonedDateTime.now();
+
+                // Track last execution for trigger computation
+                lastExecutionRef.set(new 
SimpleLastExecution(taskScheduledTime, runStart, runEnd, result));
 
                 if (isVoid) {
                     Asynchronous.Result.setFuture(null);
-                    return null;
+                    scheduleNextExecution(delegate, snapshot, ctxService, 
trigger, future,
+                            ctx, isVoid, scheduledRef, lastExecutionRef);
+                    return;
                 }
 
                 // Per spec: non-null return value stops the schedule
                 if (result != null) {
-                    if (result instanceof CompletionStage<?> cs && result != 
outerFuture) {
-                        cs.whenComplete((val, err) -> {
+                    if (result instanceof CompletionStage<?> cs && result != 
future) {
+                        cs.whenComplete((final Object val, final Throwable 
err) -> {
                             if (err != null) {
-                                outerFuture.completeExceptionally(err);
+                                future.completeExceptionally(err);
                             } else {
-                                outerFuture.complete(val);
+                                future.complete(val);
                             }
-                            Asynchronous.Result.setFuture(null);
                         });
                     }
                     Asynchronous.Result.setFuture(null);
-
-                    // Cancel the trigger loop — method returned non-null
-                    final ScheduledFuture<?> sf = scheduledRef.get();
-                    if (sf != null) {
-                        sf.cancel(false);
-                    }
+                    // Don't reschedule — method returned non-null
+                    return;
                 }
+
+                Asynchronous.Result.setFuture(null);
                 // null return: schedule continues
+                scheduleNextExecution(delegate, snapshot, ctxService, trigger, 
future,
+                        ctx, isVoid, scheduledRef, lastExecutionRef);
             } catch (final Exception e) {
-                outerFuture.completeExceptionally(e);
+                future.completeExceptionally(e);
                 Asynchronous.Result.setFuture(null);
+            } finally {
+                ctxService.exit(state);
             }
+        }, Math.max(0, delayMs), TimeUnit.MILLISECONDS);
+
+        scheduledRef.set(sf);
+    }
+
+    /**
+     * Simple {@link LastExecution} implementation for tracking execution 
history
+     * within the manual trigger loop.
+     */
+    private record SimpleLastExecution(ZonedDateTime scheduledStart, 
ZonedDateTime runStart,
+                                       ZonedDateTime runEnd, Object result) 
implements LastExecution {
+        @Override
+        public String getIdentityName() {
             return null;
-        }, trigger);
+        }
 
-        scheduledRef.set(scheduledFuture);
+        @Override
+        public Object getResult() {
+            return result;
+        }
 
-        // Also cancel when the future completes externally (e.g. 
Asynchronous.Result.complete())
-        outerFuture.whenComplete((val, err) -> scheduledFuture.cancel(false));
+        @Override
+        public Date getScheduledStart() {
+            return Date.from(scheduledStart.toInstant());
+        }
 
-        return isVoid ? null : outerFuture;
+        @Override
+        public ZonedDateTime getScheduledStart(final ZoneId zone) {
+            return scheduledStart.withZoneSameInstant(zone);
+        }
+
+        @Override
+        public Date getRunStart() {
+            return Date.from(runStart.toInstant());
+        }
+
+        @Override
+        public ZonedDateTime getRunStart(final ZoneId zone) {
+            return runStart.withZoneSameInstant(zone);
+        }
+
+        @Override
+        public Date getRunEnd() {
+            return Date.from(runEnd.toInstant());
+        }
+
+        @Override
+        public ZonedDateTime getRunEnd(final ZoneId zone) {
+            return runEnd.withZoneSameInstant(zone);
+        }
     }
 
     private Exception validate(final Method method) {

Reply via email to