This is an automated email from the ASF dual-hosted git repository.

kenhuuu pushed a commit to branch tx-idle-to
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit e5736c3332b7086b1285552073d349961a9edffa
Author: Ken Hu <[email protected]>
AuthorDate: Wed Jun 24 19:39:11 2026 -0700

    Suspend the HTTP transaction idle timer while an operation is running
    
    The idle timeout was armed on request arrival rather than when the
    transaction went idle, so a single operation running longer than the timeout
    tripped it mid-execution, contradicting the documented promise that active
    transactions are unaffected. A long operation should be bounded by
    evaluationTimeout; the idle timer should only reclaim abandoned 
transactions.
    
    The per-transaction executor is now a ThreadPoolExecutor(1,1) whose
    before/afterExecute hooks suspend the idle timer while work runs and re-arm 
it
    only once the worker parks with an empty queue. This gives a reliable
    running-vs-idle signal without wrapping submitted tasks, which would break 
the
    evaluation-timeout interrupt that relies on cancelling the real FutureTask.
    
    transactionTimeout is renamed to idleTransactionTimeout to reflect its 
actual
    meaning (renamed outright as the feature is unreleased), and now honors 0 as
    "disabled" to match its documentation.
    
    Assisted-by: Claude Code:claude-opus-4-8
---
 .../apache/tinkerpop/gremlin/server/Settings.java  |   9 +-
 .../server/transaction/TransactionManager.java     |  14 +-
 .../server/transaction/UnmanagedTransaction.java   | 136 +++++++---
 .../gremlin/server/util/ServerGremlinExecutor.java |   2 +-
 .../GremlinDriverTransactionIntegrateTest.java     |   4 +-
 .../GremlinServerHttpTransactionIntegrateTest.java |  53 +++-
 .../transaction/UnmanagedTransactionTest.java      | 291 +++++++++++++++++++++
 .../util/ManualScheduledExecutorService.java       | 279 ++++++++++++++++++++
 8 files changed, 728 insertions(+), 60 deletions(-)

diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java
index 8d6111ef27..236a4b4431 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java
@@ -185,10 +185,13 @@ public class Settings {
     public boolean strictTransactionManagement = false;
 
     /**
-     * Time in milliseconds that a transaction can remain idle before it is 
automatically rolled back.
-     * This prevents resource leaks from abandoned transactions. Default is 
600000 (10 minutes).
+     * Time in milliseconds that a transaction can remain idle (no operation 
running or queued) before it is
+     * automatically rolled back. This prevents resource leaks from abandoned 
transactions. The idle timer is suspended
+     * while an operation is in progress, so a long-running operation does not 
trip it (its duration is instead bounded
+     * by {@link #evaluationTimeout}). Set to {@code 0} to disable idle 
reclamation entirely. Default is 600000
+     * (10 minutes).
      */
-    public long transactionTimeout = 600000L;
+    public long idleTransactionTimeout = 600000L;
 
     /**
      * Time in milliseconds to wait for a transaction commit or rollback 
operation to complete.
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManager.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManager.java
index 6901664c15..ec203e820e 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManager.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManager.java
@@ -43,7 +43,7 @@ public class TransactionManager {
     private final ConcurrentMap<String, UnmanagedTransaction> transactions = 
new ConcurrentHashMap<>();
     private final ScheduledExecutorService scheduledExecutorService;
     private final GraphManager graphManager;
-    private final long transactionTimeoutMs;
+    private final long idleTransactionTimeoutMs;
     private final int maxConcurrentTransactions;
     private final long perGraphCloseMs;
 
@@ -52,23 +52,23 @@ public class TransactionManager {
      *
      * @param scheduledExecutorService Scheduler for timeout management
      * @param graphManager The graph manager for accessing traversal sources
-     * @param transactionTimeoutMs Timeout in milliseconds before auto-rollback
+     * @param idleTransactionTimeoutMs Inactivity timeout in milliseconds 
before auto-rollback; {@code 0} disables it
      * @param maxConcurrentTransactions Maximum number of concurrent 
transactions allowed
      */
     public TransactionManager(final ScheduledExecutorService 
scheduledExecutorService,
                               final GraphManager graphManager,
-                              final long transactionTimeoutMs,
+                              final long idleTransactionTimeoutMs,
                               final int maxConcurrentTransactions,
                               final long perGraphCloseMs) {
         this.scheduledExecutorService = scheduledExecutorService;
         this.graphManager = graphManager;
-        this.transactionTimeoutMs = transactionTimeoutMs;
+        this.idleTransactionTimeoutMs = idleTransactionTimeoutMs;
         this.maxConcurrentTransactions = maxConcurrentTransactions;
         this.perGraphCloseMs = perGraphCloseMs;
 
         MetricManager.INSTANCE.getGauge(transactions::size, 
name(GremlinServer.class, "transactions"));
-        logger.info("TransactionManager initialized with timeout={}ms, 
maxTransactions={}",
-                transactionTimeoutMs, maxConcurrentTransactions);
+        logger.info("TransactionManager initialized with 
idleTransactionTimeout={}ms, maxTransactions={}",
+                idleTransactionTimeoutMs, maxConcurrentTransactions);
     }
 
     /**
@@ -123,7 +123,7 @@ public class TransactionManager {
                     traversalSourceName,
                     graph,
                     scheduledExecutorService,
-                    transactionTimeoutMs,
+                    idleTransactionTimeoutMs,
                     perGraphCloseMs
             );
         } while (transactions.putIfAbsent(txId, ctx) != null);
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransaction.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransaction.java
index 3dfc394204..367e8f963b 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransaction.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransaction.java
@@ -23,12 +23,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Optional;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -42,6 +43,12 @@ import java.util.concurrent.atomic.AtomicReference;
  * the complete request lifecycle (graph operation, error handling, response 
writing),
  * following the same pattern as the non-transactional HTTP path and the legacy
  * {@code SessionOpProcessor}.
+ * <p>
+ * The single-threaded executor is a {@link SingleThreadTransactionExecutor} 
(a {@code ThreadPoolExecutor} with one
+ * core/max thread) rather than {@link 
java.util.concurrent.Executors#newSingleThreadExecutor}. It is behaviorally
+ * identical for task execution but exposes the {@code beforeExecute}/{@code 
afterExecute} lifecycle hooks and the task
+ * queue, which the idle-timer management relies on to tell "an operation is 
running" apart from "the worker is idle
+ * with an empty queue". The {@code Executors} factory hides those behind a 
sealed wrapper.
  */
 public class UnmanagedTransaction {
     private static final Logger logger = 
LoggerFactory.getLogger(UnmanagedTransaction.class);
@@ -51,16 +58,16 @@ public class UnmanagedTransaction {
     private final TransactionManager manager;
     private final Graph graph;
     private final ScheduledExecutorService scheduledExecutorService;
-    private final long timeout;
+    private final long idleTimeout;
     private final long perGraphClose;
-    private final AtomicReference<ScheduledFuture<?>> timeoutFuture = new 
AtomicReference<>();
+    private final AtomicReference<ScheduledFuture<?>> idleFuture = new 
AtomicReference<>();
     // Controls whether the executor is still accepting tasks.
     private final AtomicBoolean accepting = new AtomicBoolean(true);
     /**
      * Single-threaded executor ensures all operations for this transaction 
run on
      * the same thread, preserving the ThreadLocal nature of Graph 
transactions.
      */
-    private final ExecutorService executor;
+    private final SingleThreadTransactionExecutor executor;
 
     /**
      * Creates a new {@code UnmanagedTransaction} for managing an HTTP 
transaction.
@@ -70,14 +77,14 @@ public class UnmanagedTransaction {
      * @param traversalSourceName The traversal source name bound at begin time
      * @param graph The graph instance for this transaction
      * @param scheduledExecutorService Scheduler for timeout management
-     * @param transactionTimeout Timeout in milliseconds before auto-rollback
+     * @param idleTransactionTimeout Inactivity timeout in milliseconds before 
auto-rollback; {@code 0} disables it
      */
     public UnmanagedTransaction(final String transactionId,
                                 final TransactionManager transactionManager,
                                 final String traversalSourceName,
                                 final Graph graph,
                                 final ScheduledExecutorService 
scheduledExecutorService,
-                                final long transactionTimeout,
+                                final long idleTransactionTimeout,
                                 final long perGraphClose) {
         logger.debug("New transaction context established for {}", 
transactionId);
         this.transactionId = transactionId;
@@ -85,12 +92,15 @@ public class UnmanagedTransaction {
         this.manager = transactionManager;
         this.graph = graph;
         this.scheduledExecutorService = scheduledExecutorService;
-        this.timeout = transactionTimeout;
+        this.idleTimeout = idleTransactionTimeout;
         this.perGraphClose = perGraphClose;
 
-        // Create single-threaded executor with named thread for debugging
-        this.executor = Executors.newSingleThreadExecutor(
-            r -> new Thread(r, "tx-" + transactionId.substring(0, Math.min(8, 
transactionId.length()))));
+        // Create single-threaded executor with named thread for debugging. A 
ThreadPoolExecutor(1,1) is used (rather
+        // than Executors.newSingleThreadExecutor) so the before/afterExecute 
hooks and the task queue are accessible
+        // for idle-timer management; see SingleThreadTransactionExecutor.
+        final ThreadFactory threadFactory =
+            r -> new Thread(r, "tx-" + transactionId.substring(0, Math.min(8, 
transactionId.length())));
+        this.executor = new SingleThreadTransactionExecutor(threadFactory);
     }
 
     /**
@@ -107,36 +117,6 @@ public class UnmanagedTransaction {
         return traversalSourceName;
     }
 
-    /**
-     * Resets the timeout for this transaction. Called on each request.
-     */
-    public void touch() {
-        timeoutFuture.updateAndGet(future -> {
-            if (future != null) future.cancel(false);
-            return scheduledExecutorService.schedule(() -> {
-                logger.info("Transaction {} timed out after {} ms of 
inactivity", transactionId, timeout);
-                close(false);
-            }, timeout, TimeUnit.MILLISECONDS);
-        });
-    }
-
-    /**
-     * Opens the underlying graph transaction and starts the inactivity 
timeout.
-     * Should be called on the transaction's single-threaded executor to 
preserve
-     * ThreadLocal affinity. On failure the exception is re-thrown and the 
caller
-     * is responsible for cleanup (e.g. via {@link #close(boolean)}).
-     */
-    public void open() {
-        try {
-            graph.tx().open();
-            touch();
-            logger.debug("Transaction {} opened", transactionId);
-        } catch (Exception e) {
-            logger.warn("Failed to begin transaction {}: {}", transactionId, 
e.getMessage());
-            throw e;
-        }
-    }
-
     /**
      * Closes this transaction and releases its resources. When {@code force} 
is {@code false},
      * any open graph transaction is rolled back before shutdown. When {@code 
force} is {@code true},
@@ -183,7 +163,7 @@ public class UnmanagedTransaction {
         // reorder these two statements.
         manager.destroy(transactionId);
         executor.shutdown();
-        Optional.ofNullable(timeoutFuture.get()).ifPresent(f -> 
f.cancel(true));
+        Optional.ofNullable(idleFuture.get()).ifPresent(f -> f.cancel(true));
         logger.debug("Transaction {} closed", transactionId);
     }
 
@@ -199,7 +179,77 @@ public class UnmanagedTransaction {
     public Future<?> submit(final FutureTask<Void> task) {
         if (!accepting.get()) throw new IllegalStateException("Transaction " + 
transactionId + " is closed");
 
-        touch();
+        // Insurance backstop: cancel (do NOT arm) the idle timer on submit. 
Arming is the executor's job, done in
+        // afterExecute once the worker parks with an empty queue. 
beforeExecute will also cancel when the task starts;
+        // cancelling here too closes the small window between accepting a 
task and the worker picking it up.
+        cancelIdleTimer();
         return executor.submit(task);
     }
+
+    /**
+     * Suspends the inactivity timer because an operation is running (or about 
to run) on the transaction thread.
+     * Invoked from {@link SingleThreadTransactionExecutor#beforeExecute} and, 
as a backstop, from {@link #submit}.
+     * <p>
+     * A long-running operation must not trip the idle timeout: while an 
operation is in progress the idle timer is
+     * simply not armed (the operation's own duration is bounded by the 
per-request {@code evaluationTimeout} instead).
+     */
+    private void cancelIdleTimer() {
+        idleFuture.updateAndGet(future -> {
+            if (future != null) future.cancel(false);
+            return null;
+        });
+    }
+
+    /**
+     * (Re)arms the inactivity timer, but only when the transaction is 
genuinely idle. Invoked from
+     * {@link SingleThreadTransactionExecutor#afterExecute} once an operation 
has finished and the worker is about to
+     * look for more work.
+     * <p>
+     * "Idle" means: still {@link #accepting} new work (not closing), the 
executor queue is empty (no sibling request is
+     * already waiting — on a single thread there is a brief instant between 
one task finishing and the next starting),
+     * and the idle timeout is enabled ({@code idleTimeout > 0}; {@code 0} 
disables idle reclamation entirely). When all
+     * hold, a fresh {@code close(false)} is scheduled {@code idleTimeout} ms 
out, replacing any previously scheduled one.
+     */
+    private void maybeScheduleIdleTimer() {
+        if (!accepting.get()) return;            // closing/closed: never 
re-arm a dying transaction
+        if (idleTimeout <= 0) return;            // 0 (or negative) disables 
idle reclamation
+        if (!executor.getQueue().isEmpty()) return; // a sibling task is 
already queued -> not idle yet
+
+        idleFuture.updateAndGet(future -> {
+            if (future != null) future.cancel(false);
+            return scheduledExecutorService.schedule(() -> {
+                logger.info("Transaction {} timed out after {} ms of 
inactivity", transactionId, idleTimeout);
+                close(false);
+            }, idleTimeout, TimeUnit.MILLISECONDS);
+        });
+    }
+
+    /**
+     * A single-threaded {@link ThreadPoolExecutor} (one core and max thread) 
that runs all operations for a single
+     * transaction on the same worker thread, preserving the ThreadLocal 
nature of graph transactions.
+     * <p>
+     * It is used in place of {@link 
java.util.concurrent.Executors#newSingleThreadExecutor} solely to expose the
+     * {@link #beforeExecute}/{@link #afterExecute} lifecycle hooks (and, via 
{@link #getQueue()}, the pending-task
+     * queue), which the enclosing {@link UnmanagedTransaction} needs to 
distinguish "an operation is running" from
+     * "the worker is idle with nothing queued". Task-execution semantics are 
otherwise identical to a single-thread
+     * executor: one worker, FIFO ordering. Submitted {@link FutureTask}s are 
returned unwrapped so callers can
+     * {@code cancel(true)} the real work (e.g. the per-request evaluation 
timeout interrupting a running operation).
+     */
+    private final class SingleThreadTransactionExecutor extends 
ThreadPoolExecutor {
+        private SingleThreadTransactionExecutor(final ThreadFactory 
threadFactory) {
+            super(1, 1, 0L, TimeUnit.MILLISECONDS, new 
LinkedBlockingQueue<>(), threadFactory);
+        }
+
+        @Override
+        protected void beforeExecute(final Thread t, final Runnable r) {
+            super.beforeExecute(t, r);
+            cancelIdleTimer();
+        }
+
+        @Override
+        protected void afterExecute(final Runnable r, final Throwable t) {
+            super.afterExecute(r, t);
+            maybeScheduleIdleTimer();
+        }
+    }
 }
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java
index 3995337566..ab6be093e2 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java
@@ -306,7 +306,7 @@ public class ServerGremlinExecutor {
         transactionManager = new TransactionManager(
                 scheduledExecutorService,
                 graphManager,
-                settings.transactionTimeout,
+                settings.idleTransactionTimeout,
                 settings.maxConcurrentTransactions,
                 settings.perGraphCloseTimeout
         );
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java
index 69f387b17e..8a6c4700cd 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java
@@ -85,10 +85,10 @@ public class GremlinDriverTransactionIntegrateTest extends 
AbstractGremlinServer
             case "shouldTimeoutIdleTransaction":
             case "shouldTimeoutIdleTransactionWithNoOperations":
             case "shouldRejectLateCommitAfterTimeout":
-                settings.transactionTimeout = 1000;
+                settings.idleTransactionTimeout = 1000;
                 break;
             case "shouldTimeoutOnlyIdleTransactionNotActiveOne":
-                settings.transactionTimeout = 2000;
+                settings.idleTransactionTimeout = 2000;
                 break;
         }
         return settings;
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java
index 95895e66b4..c9a05b82ab 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java
@@ -93,17 +93,21 @@ public class GremlinServerHttpTransactionIntegrateTest 
extends AbstractGremlinSe
                 break;
             case "shouldTimeoutFreeSlotUnderMaxConcurrentTransactions":
                 settings.maxConcurrentTransactions = 1;
-                settings.transactionTimeout = 1000;
+                settings.idleTransactionTimeout = 1000;
                 break;
             case "shouldTimeoutIdleTransactionWithNoOperations":
-                settings.transactionTimeout = 500;
+                settings.idleTransactionTimeout = 500;
                 break;
             case "shouldTimeoutAndRejectLateCommit":
             case "shouldTrackTransactionCountAccurately":
-                settings.transactionTimeout = 1000;
+                settings.idleTransactionTimeout = 1000;
                 break;
             case "shouldRollbackAbandonedTransaction":
-                settings.transactionTimeout = 300;
+                settings.idleTransactionTimeout = 300;
+                break;
+            case "shouldNotIdleTimeoutLongRunningOperation":
+                // Short idle timeout, but a single long operation must NOT 
trip it (idle suspended while busy).
+                settings.idleTransactionTimeout = 500;
                 break;
             case "shouldRejectMismatchedGraphAliasInTransaction": {
                 final Settings.GraphSettings gs = new Settings.GraphSettings();
@@ -528,6 +532,47 @@ public class GremlinServerHttpTransactionIntegrateTest 
extends AbstractGremlinSe
         }
     }
 
+    @Test
+    public void shouldNotIdleTimeoutLongRunningOperation() throws Exception {
+        // With a short idle timeout (500ms), a single operation that runs 
LONGER than the idle timeout must still
+        // succeed -- the idle timer is suspended while an operation is in 
progress, so a long-running op is not
+        // reclaimed mid-execution (it is instead bounded by 
evaluationTimeout, left at its default here).
+        final String txId = beginTx(client, GTX);
+
+        // Seed two vertices and an edge so repeat(both()) has something to 
traverse and keeps the executor busy. Each
+        // response body is fully consumed before the next request is sent: 
the chunked stream is only complete once the
+        // server has finished processing, so consuming guarantees these 
requests are strictly ordered.
+        try (final CloseableHttpResponse r = submitInTx(client, txId, 
"g.addV().property(T.id, 1)", GTX)) {
+            assertEquals(200, r.getStatusLine().getStatusCode());
+            EntityUtils.consume(r.getEntity());
+        }
+        try (final CloseableHttpResponse r = submitInTx(client, txId, 
"g.addV().property(T.id, 2)", GTX)) {
+            assertEquals(200, r.getStatusLine().getStatusCode());
+            EntityUtils.consume(r.getEntity());
+        }
+        try (final CloseableHttpResponse r = submitInTx(client, txId, 
"g.V(1).addE('self').to(__.V(2))", GTX)) {
+            assertEquals(200, r.getStatusLine().getStatusCode());
+            EntityUtils.consume(r.getEntity());
+        }
+
+        // A traversal that runs well past the 500ms idle timeout. Under the 
old arm-on-arrival behavior the idle timer
+        // would have fired mid-execution; under suspend-while-busy it does 
not.
+        try (final CloseableHttpResponse r = submitInTx(client, txId, 
"g.V().repeat(both()).times(2000)", GTX)) {
+            assertEquals(200, r.getStatusLine().getStatusCode());
+            EntityUtils.consume(r.getEntity());
+        }
+
+        // The transaction is still alive and usable after the long op (it was 
not reclaimed mid-flight).
+        try (final CloseableHttpResponse r = submitInTx(client, txId, 
"g.V().count()", GTX)) {
+            assertEquals(200, r.getStatusLine().getStatusCode());
+            assertEquals(2, extractCount(r)); // extractCount fully reads the 
body
+        }
+        try (final CloseableHttpResponse r = commitTx(client, txId, GTX)) {
+            assertEquals(200, r.getStatusLine().getStatusCode());
+            EntityUtils.consume(r.getEntity());
+        }
+    }
+
     @Test
     public void shouldRejectMismatchedGraphAliasInTransaction() throws 
Exception {
         final String txId = beginTx(client, GTX);
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransactionTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransactionTest.java
new file mode 100644
index 0000000000..81a8a65949
--- /dev/null
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransactionTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.transaction;
+
+import org.apache.tinkerpop.gremlin.server.util.ManualScheduledExecutorService;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link UnmanagedTransaction}, driven by a deterministic 
{@link ManualScheduledExecutorService} so the
+ * inactivity-timeout behaviour can be asserted without real wall-clock waits.
+ * <p>
+ * These are <em>specification</em> tests for the reworked idle timer 
(suspend-while-busy): the idle timer is armed only
+ * when the transaction goes idle (no operation running, empty queue) and is 
suspended while an operation runs. The idle
+ * timer is (re)armed from the executor's {@code afterExecute} hook, which 
runs on the transaction worker thread, so
+ * timer assertions poll the scheduler with a bounded wait via {@link 
#awaitPendingTimer(boolean)}.
+ */
+public class UnmanagedTransactionTest {
+
+    private static final String TX_ID = "test-tx-0001";
+    private static final long TIMEOUT_MS = 600000L;
+    private static final long PER_GRAPH_CLOSE_MS = 10000L;
+    private static final long AWAIT_MS = 5000L;
+
+    private TransactionManager manager;
+    private Graph graph;
+    private ManualScheduledExecutorService scheduler;
+    private UnmanagedTransaction tx;
+
+    @Before
+    public void setUp() {
+        manager = mock(TransactionManager.class);
+        graph = mock(Graph.class);
+        final Transaction graphTx = mock(Transaction.class);
+        when(graph.tx()).thenReturn(graphTx);
+        when(graphTx.isOpen()).thenReturn(false); // rollback path is a no-op 
during close(false)
+
+        scheduler = new ManualScheduledExecutorService();
+        tx = new UnmanagedTransaction(TX_ID, manager, "g", graph, scheduler, 
TIMEOUT_MS, PER_GRAPH_CLOSE_MS);
+
+        // close() short-circuits unless the manager still knows about the 
transaction.
+        when(manager.get(TX_ID)).thenReturn(Optional.of(tx));
+    }
+
+    /**
+     * Submits a no-op task and blocks until it has finished running on the 
worker thread.
+     */
+    private void runOp() throws Exception {
+        tx.submit(new FutureTask<>(() -> null)).get(AWAIT_MS, 
TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Waits (bounded) for the idle timer to reach the expected 
armed/not-armed state, since it is (re)armed on the
+     * worker thread from afterExecute slightly after the submitted task's 
Future completes. Returns once the condition
+     * holds or the wait elapses; the caller asserts on the final state.
+     */
+    private void awaitPendingTimer(final boolean expectArmed) throws 
InterruptedException {
+        final long deadline = System.nanoTime() + 
TimeUnit.MILLISECONDS.toNanos(AWAIT_MS);
+        while (System.nanoTime() < deadline) {
+            if ((scheduler.getPendingTaskCount() == 1) == expectArmed) return;
+            Thread.sleep(5);
+        }
+    }
+
+    @Test
+    public void shouldNotScheduleAnyCloseAtConstruction() {
+        assertEquals(0, scheduler.getScheduledTaskCount());
+        assertEquals(0, scheduler.getPendingTaskCount());
+    }
+
+    @Test
+    public void shouldArmIdleTimerWhenWorkerGoesIdleAfterAnOperation() throws 
Exception {
+        runOp();
+
+        awaitPendingTimer(true);
+        assertEquals("idle timer should be armed once the worker parks with an 
empty queue",
+                1, scheduler.getPendingTaskCount());
+        assertEquals(TIMEOUT_MS, scheduler.nextPendingDelayMillis());
+    }
+
+    @Test
+    public void shouldNotArmIdleTimerWhileAnOperationIsRunning() throws 
Exception {
+        // Hold an operation "running" and assert no idle timer is armed 
during that window.
+        final CountDownLatch started = new CountDownLatch(1);
+        final CountDownLatch release = new CountDownLatch(1);
+        final Future<?> running = tx.submit(new FutureTask<>(() -> {
+            started.countDown();
+            release.await();
+            return null;
+        }));
+
+        assertTrue(started.await(AWAIT_MS, MILLISECONDS));
+        // While the op runs, the idle timer must not be armed (a long op must 
not trip the idle timeout).
+        assertEquals(0, scheduler.getPendingTaskCount());
+
+        release.countDown();
+        running.get(AWAIT_MS, MILLISECONDS);
+
+        // Once the worker goes idle, the timer arms.
+        awaitPendingTimer(true);
+        assertEquals(1, scheduler.getPendingTaskCount());
+    }
+
+    @Test
+    public void shouldNotFireIdleCloseForALongRunningOperation() throws 
Exception {
+        // A single operation that runs longer than the idle timeout must not 
be reclaimed mid-execution: no timer is
+        // armed while it runs, so advancing the clock far past the timeout 
fires nothing.
+        final CountDownLatch started = new CountDownLatch(1);
+        final CountDownLatch release = new CountDownLatch(1);
+        final Future<?> running = tx.submit(new FutureTask<>(() -> {
+            started.countDown();
+            release.await();
+            return null;
+        }));
+        assertTrue(started.await(AWAIT_MS, MILLISECONDS));
+
+        scheduler.advanceTimeBy(TIMEOUT_MS * 2, MILLISECONDS);
+
+        verify(manager, never()).destroy(TX_ID);
+        release.countDown();
+        running.get(AWAIT_MS, MILLISECONDS);
+    }
+
+    @Test
+    public void shouldCloseTransactionWhenIdleTimeoutFires() throws Exception {
+        runOp();
+        awaitPendingTimer(true);
+
+        scheduler.advanceTimeBy(TIMEOUT_MS, MILLISECONDS);
+
+        // The scheduled close(false) removes the transaction from the manager.
+        verify(manager).destroy(TX_ID);
+        assertEquals(0, scheduler.getPendingTaskCount());
+    }
+
+    @Test
+    public void shouldNotCloseBeforeIdleTimeoutElapses() throws Exception {
+        runOp();
+        awaitPendingTimer(true);
+
+        scheduler.advanceTimeBy(TIMEOUT_MS - 1, MILLISECONDS);
+
+        verify(manager, never()).destroy(TX_ID);
+        assertEquals(1, scheduler.getPendingTaskCount());
+    }
+
+    @Test
+    public void shouldReArmIdleTimerAfterEachOperation() throws Exception {
+        runOp();
+        awaitPendingTimer(true);
+        assertEquals(1, scheduler.getScheduledTaskCount());
+
+        runOp();
+        awaitPendingTimer(true);
+
+        // A second operation cancels the prior idle timer and arms a fresh 
one.
+        assertEquals(2, scheduler.getScheduledTaskCount());
+        assertEquals(1, scheduler.getPendingTaskCount());
+    }
+
+    @Test
+    public void shouldNotArmIdleTimerWhenIdleTimeoutDisabled() throws 
Exception {
+        // idleTransactionTimeout == 0 disables idle reclamation entirely: the 
timer is never armed.
+        final UnmanagedTransaction disabledTx =
+                new UnmanagedTransaction(TX_ID, manager, "g", graph, 
scheduler, 0L, PER_GRAPH_CLOSE_MS);
+
+        disabledTx.submit(new FutureTask<>(() -> null)).get(AWAIT_MS, 
TimeUnit.MILLISECONDS);
+
+        awaitPendingTimer(false);
+        assertEquals(0, scheduler.getPendingTaskCount());
+    }
+
+    @Test
+    public void shouldCancelScheduledCloseOnExplicitClose() throws Exception {
+        runOp();
+        awaitPendingTimer(true);
+
+        tx.close(true);
+
+        verify(manager).destroy(TX_ID);
+        // The pending inactivity close must be cancelled so it cannot fire 
after the transaction is gone.
+        assertEquals(0, scheduler.getPendingTaskCount());
+    }
+
+    @Test
+    public void shouldNotReArmIdleTimerAfterClose() throws Exception {
+        runOp();
+        awaitPendingTimer(true);
+
+        tx.close(false);
+
+        verify(manager).destroy(TX_ID);
+        // Advancing the clock must not resurrect a close on a transaction 
that is already gone.
+        scheduler.advanceTimeBy(TIMEOUT_MS * 2, MILLISECONDS);
+        assertEquals(0, scheduler.getPendingTaskCount());
+    }
+
+    // ---- Step 2: SingleThreadTransactionExecutor invariants (executor swap) 
----
+
+    @Test
+    public void 
shouldRunSubmittedTasksOnASingleNamedTransactionThreadInOrder() throws 
Exception {
+        final List<String> executionOrder = new CopyOnWriteArrayList<>();
+        final List<String> threadNames = new CopyOnWriteArrayList<>();
+
+        Future<?> last = null;
+        for (int i = 0; i < 5; i++) {
+            final int n = i;
+            last = tx.submit(new FutureTask<>(() -> {
+                threadNames.add(Thread.currentThread().getName());
+                executionOrder.add("task-" + n);
+                return null;
+            }));
+        }
+        last.get(5, TimeUnit.SECONDS); // FIFO single thread: the last task 
completing means all ran
+
+        assertEquals(List.of("task-0", "task-1", "task-2", "task-3", 
"task-4"), executionOrder);
+        // All ran on one thread, and that thread is the named transaction 
worker.
+        assertEquals(1, threadNames.stream().distinct().count());
+        assertTrue("expected tx-* thread but was " + threadNames.get(0),
+                threadNames.get(0).startsWith("tx-"));
+    }
+
+    @Test
+    public void shouldInterruptRunningTaskWhenReturnedFutureIsCancelled() 
throws Exception {
+        // Guards the "do NOT wrap submitted tasks" invariant: cancel(true) on 
the Future returned by submit() must
+        // interrupt the real work, exactly as the per-request evaluation 
timeout relies on in the handler.
+        final CountDownLatch started = new CountDownLatch(1);
+        final AtomicBoolean interrupted = new AtomicBoolean(false);
+        final AtomicReference<Throwable> unexpected = new AtomicReference<>();
+
+        final Future<?> running = tx.submit(new FutureTask<>(() -> {
+            started.countDown();
+            try {
+                Thread.sleep(30000); // block until interrupted by cancel(true)
+            } catch (InterruptedException e) {
+                interrupted.set(true);
+                throw e;
+            } catch (Throwable t) {
+                unexpected.set(t);
+            }
+            return null;
+        }));
+
+        assertTrue("task did not start", started.await(5, TimeUnit.SECONDS));
+        running.cancel(true);
+
+        // Give the worker a moment to observe the interrupt and record it.
+        final long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
+        while (!interrupted.get() && System.nanoTime() < deadline) {
+            Thread.sleep(10);
+        }
+        assertTrue("cancel(true) did not interrupt the running task", 
interrupted.get());
+        assertEquals(null, unexpected.get());
+    }
+}
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/util/ManualScheduledExecutorService.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/util/ManualScheduledExecutorService.java
new file mode 100644
index 0000000000..21d93a2e08
--- /dev/null
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/util/ManualScheduledExecutorService.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A deterministic, single-threaded test double for {@link 
ScheduledExecutorService} backed by a virtual clock.
+ * <p>
+ * It exists so timer-driven behavior (such as the {@code gremlin-server} 
transaction idle / lifetime timeouts) can be
+ * exercised without {@link Thread#sleep(long)} and without real wall-clock 
waits, which are slow and flaky. Time only
+ * advances when the test calls {@link #advanceTimeBy(long, TimeUnit)} (or 
{@link #runDueTasks()}), at which point any
+ * scheduled task whose trigger time has been reached is run synchronously on 
the calling thread, in trigger-time order.
+ * <p>
+ * Only the one-shot {@link #schedule(Runnable, long, TimeUnit)} overload is 
implemented, because that is all the
+ * transaction code under test uses. Every other {@link 
ScheduledExecutorService} method throws
+ * {@link UnsupportedOperationException} with an explanatory message so that 
an unsupported use is loud rather than
+ * silently wrong.
+ * <p>
+ * It is thread-safe: the transaction idle timer is armed/cancelled on the 
transaction's worker thread (via the
+ * executor's before/afterExecute hooks) while a test thread advances the 
clock and reads counts. All shared state is
+ * guarded by {@code lock}. Fired task commands are run <em>outside</em> the 
lock — a fired idle close calls
+ * {@code close(false)}, which submits a rollback to the transaction executor 
and blocks on it; running it under the
+ * lock could deadlock against the worker thread re-entering {@link #schedule}.
+ */
+public class ManualScheduledExecutorService implements 
ScheduledExecutorService {
+
+    private final Object lock = new Object();
+    private final List<ScheduledTask> tasks = new ArrayList<>();
+    private long nowMillis = 0L;
+    private int scheduledCount = 0;
+
+    /**
+     * Schedules a one-shot task to run when the virtual clock advances by at 
least {@code delay}. Returns a
+     * {@link ScheduledFuture} whose {@link Future#cancel(boolean)} prevents 
the task from running on a later advance.
+     */
+    @Override
+    public ScheduledFuture<?> schedule(final Runnable command, final long 
delay, final TimeUnit unit) {
+        synchronized (lock) {
+            final ScheduledTask task = new ScheduledTask(command, nowMillis + 
unit.toMillis(delay));
+            tasks.add(task);
+            scheduledCount++;
+            return task;
+        }
+    }
+
+    /**
+     * Advances the virtual clock by the given amount and runs every task that 
is now due (trigger time {@code <=} the
+     * new current time) and not cancelled, in ascending trigger-time order.
+     */
+    public void advanceTimeBy(final long amount, final TimeUnit unit) {
+        synchronized (lock) {
+            nowMillis += unit.toMillis(amount);
+        }
+        runDueTasks();
+    }
+
+    /**
+     * Runs every currently-due, non-cancelled task without advancing the 
clock. Useful for firing a zero-delay task.
+     * Each due task is selected under the lock but executed outside it (see 
class Javadoc).
+     */
+    public void runDueTasks() {
+        while (true) {
+            final ScheduledTask next;
+            synchronized (lock) {
+                ScheduledTask soonest = null;
+                // Loop because a fired task may schedule another task that is 
itself immediately due.
+                for (final ScheduledTask t : tasks) {
+                    if (!t.cancelled && !t.done && t.triggerAtMillis <= 
nowMillis) {
+                        if (soonest == null || t.triggerAtMillis < 
soonest.triggerAtMillis) soonest = t;
+                    }
+                }
+                if (soonest == null) return;
+                soonest.done = true; // mark done under lock so it is not 
re-selected
+                next = soonest;
+            }
+            next.command.run(); // run OUTSIDE the lock
+        }
+    }
+
+    /**
+     * The number of tasks that are still scheduled to run (not cancelled, not 
yet fired).
+     */
+    public int getPendingTaskCount() {
+        synchronized (lock) {
+            int count = 0;
+            for (final ScheduledTask t : tasks) {
+                if (!t.cancelled && !t.done) count++;
+            }
+            return count;
+        }
+    }
+
+    /**
+     * The total number of tasks ever scheduled, including ones later 
cancelled or already fired. Lets a test assert
+     * that a reschedule actually issued a fresh {@code schedule(...)} call.
+     */
+    public int getScheduledTaskCount() {
+        synchronized (lock) {
+            return scheduledCount;
+        }
+    }
+
+    /**
+     * The remaining delay (ms) until the soonest still-pending task fires, or 
{@code -1} if none is pending.
+     */
+    public long nextPendingDelayMillis() {
+        synchronized (lock) {
+            long soonest = Long.MAX_VALUE;
+            for (final ScheduledTask t : tasks) {
+                if (!t.cancelled && !t.done) soonest = Math.min(soonest, 
t.triggerAtMillis - nowMillis);
+            }
+            return soonest == Long.MAX_VALUE ? -1L : soonest;
+        }
+    }
+
+    private final class ScheduledTask implements ScheduledFuture<Object> {
+        private final Runnable command;
+        private final long triggerAtMillis;
+        private volatile boolean cancelled = false;
+        private volatile boolean done = false;
+
+        private ScheduledTask(final Runnable command, final long 
triggerAtMillis) {
+            this.command = command;
+            this.triggerAtMillis = triggerAtMillis;
+        }
+
+        @Override
+        public long getDelay(final TimeUnit unit) {
+            synchronized (lock) {
+                return unit.convert(triggerAtMillis - nowMillis, 
TimeUnit.MILLISECONDS);
+            }
+        }
+
+        @Override
+        public int compareTo(final Delayed o) {
+            return Long.compare(getDelay(TimeUnit.MILLISECONDS), 
o.getDelay(TimeUnit.MILLISECONDS));
+        }
+
+        @Override
+        public boolean cancel(final boolean mayInterruptIfRunning) {
+            if (done || cancelled) return false;
+            cancelled = true;
+            return true;
+        }
+
+        @Override
+        public boolean isCancelled() {
+            return cancelled;
+        }
+
+        @Override
+        public boolean isDone() {
+            return done || cancelled;
+        }
+
+        @Override
+        public Object get() {
+            return null;
+        }
+
+        @Override
+        public Object get(final long timeout, final TimeUnit unit) {
+            return null;
+        }
+    }
+
+    // ---- Unsupported ScheduledExecutorService surface: fail loudly rather 
than behave unexpectedly. ----
+
+    private static UnsupportedOperationException unsupported(final String 
method) {
+        return new UnsupportedOperationException(
+                ManualScheduledExecutorService.class.getSimpleName() + " does 
not support " + method
+                        + "; only schedule(Runnable, long, TimeUnit) is 
implemented for tests.");
+    }
+
+    @Override
+    public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final 
long delay, final TimeUnit unit) {
+        throw unsupported("schedule(Callable, long, TimeUnit)");
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, 
final long initialDelay, final long period, final TimeUnit unit) {
+        throw unsupported("scheduleAtFixedRate");
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, 
final long initialDelay, final long delay, final TimeUnit unit) {
+        throw unsupported("scheduleWithFixedDelay");
+    }
+
+    @Override
+    public void execute(final Runnable command) {
+        throw unsupported("execute");
+    }
+
+    @Override
+    public void shutdown() {
+        throw unsupported("shutdown");
+    }
+
+    @Override
+    public List<Runnable> shutdownNow() {
+        throw unsupported("shutdownNow");
+    }
+
+    @Override
+    public boolean isShutdown() {
+        throw unsupported("isShutdown");
+    }
+
+    @Override
+    public boolean isTerminated() {
+        throw unsupported("isTerminated");
+    }
+
+    @Override
+    public boolean awaitTermination(final long timeout, final TimeUnit unit) {
+        throw unsupported("awaitTermination");
+    }
+
+    @Override
+    public <T> Future<T> submit(final Callable<T> task) {
+        throw unsupported("submit(Callable)");
+    }
+
+    @Override
+    public <T> Future<T> submit(final Runnable task, final T result) {
+        throw unsupported("submit(Runnable, T)");
+    }
+
+    @Override
+    public Future<?> submit(final Runnable task) {
+        throw unsupported("submit(Runnable)");
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(final java.util.Collection<? extends 
Callable<T>> tasks) {
+        throw unsupported("invokeAll");
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(final java.util.Collection<? extends 
Callable<T>> tasks, final long timeout, final TimeUnit unit) {
+        throw unsupported("invokeAll");
+    }
+
+    @Override
+    public <T> T invokeAny(final java.util.Collection<? extends Callable<T>> 
tasks) throws ExecutionException {
+        throw unsupported("invokeAny");
+    }
+
+    @Override
+    public <T> T invokeAny(final java.util.Collection<? extends Callable<T>> 
tasks, final long timeout, final TimeUnit unit) throws ExecutionException {
+        throw unsupported("invokeAny");
+    }
+}


Reply via email to