This is an automated email from the ASF dual-hosted git repository. kenhuuu pushed a commit to branch tx-idle-to in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 986ab97a1b88d849e09d158e095726b8c92fe575 Author: Ken Hu <[email protected]> AuthorDate: Wed Jun 24 19:39:11 2026 -0700 Suspend the HTTP transaction idle timer while an operation is running The idle timeout was armed on request arrival rather than when the transaction went idle, so a single operation running longer than the timeout tripped it mid-execution, contradicting the documented promise that active transactions are unaffected. A long operation should be bounded by evaluationTimeout; the idle timer should only reclaim abandoned transactions. The per-transaction executor is now a ThreadPoolExecutor(1,1) whose before/afterExecute hooks suspend the idle timer while work runs and re-arm it only once the worker parks with an empty queue. This gives a reliable running-vs-idle signal without wrapping submitted tasks, which would break the evaluation-timeout interrupt that relies on cancelling the real FutureTask. transactionTimeout is renamed to idleTransactionTimeout to reflect its actual meaning (renamed outright as the feature is unreleased), and now honors 0 as "disabled" to match its documentation. Assisted-by: Claude Code:claude-opus-4-8 --- docs/src/dev/provider/index.asciidoc | 10 +- docs/src/reference/gremlin-applications.asciidoc | 10 +- .../apache/tinkerpop/gremlin/server/Settings.java | 9 +- .../server/transaction/TransactionManager.java | 14 +- .../server/transaction/UnmanagedTransaction.java | 142 +++++++--- .../gremlin/server/util/ServerGremlinExecutor.java | 2 +- .../GremlinDriverTransactionIntegrateTest.java | 4 +- .../GremlinServerHttpTransactionIntegrateTest.java | 53 +++- .../transaction/UnmanagedTransactionTest.java | 291 +++++++++++++++++++++ .../util/ManualScheduledExecutorService.java | 279 ++++++++++++++++++++ 10 files changed, 744 insertions(+), 70 deletions(-) diff --git a/docs/src/dev/provider/index.asciidoc b/docs/src/dev/provider/index.asciidoc index a09c67b915..1f6ed71945 100644 --- a/docs/src/dev/provider/index.asciidoc +++ b/docs/src/dev/provider/index.asciidoc @@ -1430,14 +1430,14 @@ rejects it with HTTP 400. This prevents cross-graph operations within a single t ==== Transaction Timeout and Idle Reclamation -Servers implement a configurable transaction timeout (`transactionTimeout`, default 600000ms). The timeout represents -how long a transaction can sit idle with no requests before the server forcibly rolls it back and removes it. The -timeout resets on each request received for that transaction, so active transactions are not affected. After a timeout -fires, any subsequent request with that transaction ID receives a 404 response. +Servers implement a configurable transaction timeout (`idleTransactionTimeout`, default 600000ms). The timeout +represents how long a transaction can sit idle with no requests before the server forcibly rolls it back and removes it. +The timeout resets on each request received for that transaction, so active transactions are not affected. After a +timeout fires, any subsequent request with that transaction ID receives a 404 response. ==== Transaction Capacity Limits -Servers enforce a configurable maximum number of concurrent open transactions (`maxConcurrentTransactions`, default +Servers may enforce a configurable maximum number of concurrent open transactions (`maxConcurrentTransactions`, default 1000). When the limit is reached, new begin requests are rejected with HTTP 503. Slots are freed when transactions close via commit, rollback, or timeout. diff --git a/docs/src/reference/gremlin-applications.asciidoc b/docs/src/reference/gremlin-applications.asciidoc index 6494972cdf..3b1989ab2f 100644 --- a/docs/src/reference/gremlin-applications.asciidoc +++ b/docs/src/reference/gremlin-applications.asciidoc @@ -930,6 +930,7 @@ The following table describes the various YAML configuration options that Gremli |gremlinPool |The number of "Gremlin" threads available to execute actual scripts in a `ScriptEngine`. This pool represents the workers available to handle blocking operations in Gremlin Server. When set to `0`, Gremlin Server will use the value provided by `Runtime.availableProcessors()`. |0 |host |The name of the host to bind the server to. |localhost |idleConnectionTimeout |Time in milliseconds that the server will allow a channel to not receive requests from a client before it automatically closes. If enabled, the value provided should typically exceed the amount of time given to `keepAliveInterval`. Note that while this value is to be provided as milliseconds it will resolve to second precision. Set this value to `0` to disable this feature. |0 +|idleTransactionTimeout |Time in milliseconds that a transaction can sit idle (no requests) before the server forcibly rolls it back and removes it. The timeout resets on each request received for that transaction. Set to `0` to disable this feature. |600000 |keepAliveInterval |Time in milliseconds that the server will allow a channel to not send responses to a client before it sends a "ping" to see if it is still present. If it is present, the client should respond with a "pong" which will thus reset the `idleConnectionTimeout` and keep the channel open. If enabled, this number should be smaller than the value provided to the `idleConnectionTimeout`. Note that while this value is to be provided as milliseconds it will resolve to second prec [...] |maxAccumulationBufferComponents |Maximum number of request components that can be aggregated for a message. |1024 |maxChunkSize |The maximum length of the content or each chunk. If the content length exceeds this value, the transfer encoding of the decoded request will be converted to 'chunked' and the content will be split into multiple `HttpContent` objects. If the transfer encoding of the HTTP request is 'chunked' already, each chunk will be split into smaller chunks if the length of the chunk exceeds this value. |8192 @@ -983,7 +984,6 @@ The following table describes the various YAML configuration options that Gremli |strictTransactionManagement |Set to `true` to require `aliases` to be submitted on every requests, where the `aliases` become the scope of transaction management. |false |threadPoolBoss |The number of threads available to Gremlin Server for accepting connections. Should always be set to `1`. |1 |threadPoolWorker |The number of threads available to Gremlin Server for processing non-blocking reads and writes. |1 -|transactionTimeout |Time in milliseconds that a transaction can sit idle (no requests) before the server forcibly rolls it back and removes it. The timeout resets on each request received for that transaction. Set to `0` to disable this feature. |600000 |useEpollEventLoop |Try to use epoll event loops (works only on Linux os) instead of netty NIO. |false |writeBufferHighWaterMark | If the number of bytes in the network send buffer exceeds this value then the channel is no longer writeable, accepting no additional writes until buffer is drained and the `writeBufferLowWaterMark` is met. |65536 |writeBufferLowWaterMark | Once the number of bytes queued in the network send buffer exceeds the `writeBufferHighWaterMark`, the channel will not become writeable again until the buffer is drained and it drops below this value. |32768 @@ -2261,14 +2261,14 @@ an explicit transaction on a graph that does not support them will result in an Two settings in the Gremlin Server YAML control transaction resource usage: -* `transactionTimeout` (default: 600000ms) -- How long a transaction can sit idle before the server forcibly rolls it - back. The timeout resets on each request received for that transaction. +* `idleTransactionTimeout` (default: 600000ms) -- How long a transaction can sit idle before the server forcibly rolls + it back. The timeout resets on each request received for that transaction. * `maxConcurrentTransactions` (default: 1000) -- The maximum number of open transactions allowed. When the limit is reached, new begin requests are rejected with HTTP 503. Each open transaction consumes a dedicated thread on the server to maintain thread-local transaction state for the -underlying graph. Ensure that clients close transactions promptly and that the `transactionTimeout` is set to reclaim -abandoned ones. The `transactions` gauge metric can be used to monitor usage. +underlying graph. Ensure that clients close transactions promptly and that the `idleTransactionTimeout` is set to +reclaim abandoned ones. The `transactions` gauge metric can be used to monitor usage. In load-balanced deployments, all requests within a transaction must reach the same server instance because transaction state is local to the server that created it. The `X-Transaction-Id` header is available for load diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java index 8d6111ef27..236a4b4431 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java @@ -185,10 +185,13 @@ public class Settings { public boolean strictTransactionManagement = false; /** - * Time in milliseconds that a transaction can remain idle before it is automatically rolled back. - * This prevents resource leaks from abandoned transactions. Default is 600000 (10 minutes). + * Time in milliseconds that a transaction can remain idle (no operation running or queued) before it is + * automatically rolled back. This prevents resource leaks from abandoned transactions. The idle timer is suspended + * while an operation is in progress, so a long-running operation does not trip it (its duration is instead bounded + * by {@link #evaluationTimeout}). Set to {@code 0} to disable idle reclamation entirely. Default is 600000 + * (10 minutes). */ - public long transactionTimeout = 600000L; + public long idleTransactionTimeout = 600000L; /** * Time in milliseconds to wait for a transaction commit or rollback operation to complete. diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManager.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManager.java index 6901664c15..ec203e820e 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManager.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManager.java @@ -43,7 +43,7 @@ public class TransactionManager { private final ConcurrentMap<String, UnmanagedTransaction> transactions = new ConcurrentHashMap<>(); private final ScheduledExecutorService scheduledExecutorService; private final GraphManager graphManager; - private final long transactionTimeoutMs; + private final long idleTransactionTimeoutMs; private final int maxConcurrentTransactions; private final long perGraphCloseMs; @@ -52,23 +52,23 @@ public class TransactionManager { * * @param scheduledExecutorService Scheduler for timeout management * @param graphManager The graph manager for accessing traversal sources - * @param transactionTimeoutMs Timeout in milliseconds before auto-rollback + * @param idleTransactionTimeoutMs Inactivity timeout in milliseconds before auto-rollback; {@code 0} disables it * @param maxConcurrentTransactions Maximum number of concurrent transactions allowed */ public TransactionManager(final ScheduledExecutorService scheduledExecutorService, final GraphManager graphManager, - final long transactionTimeoutMs, + final long idleTransactionTimeoutMs, final int maxConcurrentTransactions, final long perGraphCloseMs) { this.scheduledExecutorService = scheduledExecutorService; this.graphManager = graphManager; - this.transactionTimeoutMs = transactionTimeoutMs; + this.idleTransactionTimeoutMs = idleTransactionTimeoutMs; this.maxConcurrentTransactions = maxConcurrentTransactions; this.perGraphCloseMs = perGraphCloseMs; MetricManager.INSTANCE.getGauge(transactions::size, name(GremlinServer.class, "transactions")); - logger.info("TransactionManager initialized with timeout={}ms, maxTransactions={}", - transactionTimeoutMs, maxConcurrentTransactions); + logger.info("TransactionManager initialized with idleTransactionTimeout={}ms, maxTransactions={}", + idleTransactionTimeoutMs, maxConcurrentTransactions); } /** @@ -123,7 +123,7 @@ public class TransactionManager { traversalSourceName, graph, scheduledExecutorService, - transactionTimeoutMs, + idleTransactionTimeoutMs, perGraphCloseMs ); } while (transactions.putIfAbsent(txId, ctx) != null); diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransaction.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransaction.java index 3dfc394204..c462dfa41b 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransaction.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransaction.java @@ -23,12 +23,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Optional; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -42,6 +43,12 @@ import java.util.concurrent.atomic.AtomicReference; * the complete request lifecycle (graph operation, error handling, response writing), * following the same pattern as the non-transactional HTTP path and the legacy * {@code SessionOpProcessor}. + * <p> + * The single-threaded executor is a {@link SingleThreadTransactionExecutor} (a {@code ThreadPoolExecutor} with one + * core/max thread) rather than {@link java.util.concurrent.Executors#newSingleThreadExecutor}. It is behaviorally + * identical for task execution but exposes the {@code beforeExecute}/{@code afterExecute} lifecycle hooks and the task + * queue, which the idle-timer management relies on to tell "an operation is running" apart from "the worker is idle + * with an empty queue". The {@code Executors} factory hides those behind a sealed wrapper. */ public class UnmanagedTransaction { private static final Logger logger = LoggerFactory.getLogger(UnmanagedTransaction.class); @@ -51,16 +58,16 @@ public class UnmanagedTransaction { private final TransactionManager manager; private final Graph graph; private final ScheduledExecutorService scheduledExecutorService; - private final long timeout; + private final long idleTimeout; private final long perGraphClose; - private final AtomicReference<ScheduledFuture<?>> timeoutFuture = new AtomicReference<>(); + private final AtomicReference<ScheduledFuture<?>> idleFuture = new AtomicReference<>(); // Controls whether the executor is still accepting tasks. private final AtomicBoolean accepting = new AtomicBoolean(true); /** * Single-threaded executor ensures all operations for this transaction run on * the same thread, preserving the ThreadLocal nature of Graph transactions. */ - private final ExecutorService executor; + private final SingleThreadTransactionExecutor executor; /** * Creates a new {@code UnmanagedTransaction} for managing an HTTP transaction. @@ -70,14 +77,14 @@ public class UnmanagedTransaction { * @param traversalSourceName The traversal source name bound at begin time * @param graph The graph instance for this transaction * @param scheduledExecutorService Scheduler for timeout management - * @param transactionTimeout Timeout in milliseconds before auto-rollback + * @param idleTransactionTimeout Inactivity timeout in milliseconds before auto-rollback; {@code 0} disables it */ public UnmanagedTransaction(final String transactionId, final TransactionManager transactionManager, final String traversalSourceName, final Graph graph, final ScheduledExecutorService scheduledExecutorService, - final long transactionTimeout, + final long idleTransactionTimeout, final long perGraphClose) { logger.debug("New transaction context established for {}", transactionId); this.transactionId = transactionId; @@ -85,12 +92,15 @@ public class UnmanagedTransaction { this.manager = transactionManager; this.graph = graph; this.scheduledExecutorService = scheduledExecutorService; - this.timeout = transactionTimeout; + this.idleTimeout = idleTransactionTimeout; this.perGraphClose = perGraphClose; - // Create single-threaded executor with named thread for debugging - this.executor = Executors.newSingleThreadExecutor( - r -> new Thread(r, "tx-" + transactionId.substring(0, Math.min(8, transactionId.length())))); + // Create single-threaded executor with named thread for debugging. A ThreadPoolExecutor(1,1) is used (rather + // than Executors.newSingleThreadExecutor) so the before/afterExecute hooks and the task queue are accessible + // for idle-timer management; see SingleThreadTransactionExecutor. + final ThreadFactory threadFactory = + r -> new Thread(r, "tx-" + transactionId.substring(0, Math.min(8, transactionId.length()))); + this.executor = new SingleThreadTransactionExecutor(threadFactory); } /** @@ -107,36 +117,6 @@ public class UnmanagedTransaction { return traversalSourceName; } - /** - * Resets the timeout for this transaction. Called on each request. - */ - public void touch() { - timeoutFuture.updateAndGet(future -> { - if (future != null) future.cancel(false); - return scheduledExecutorService.schedule(() -> { - logger.info("Transaction {} timed out after {} ms of inactivity", transactionId, timeout); - close(false); - }, timeout, TimeUnit.MILLISECONDS); - }); - } - - /** - * Opens the underlying graph transaction and starts the inactivity timeout. - * Should be called on the transaction's single-threaded executor to preserve - * ThreadLocal affinity. On failure the exception is re-thrown and the caller - * is responsible for cleanup (e.g. via {@link #close(boolean)}). - */ - public void open() { - try { - graph.tx().open(); - touch(); - logger.debug("Transaction {} opened", transactionId); - } catch (Exception e) { - logger.warn("Failed to begin transaction {}: {}", transactionId, e.getMessage()); - throw e; - } - } - /** * Closes this transaction and releases its resources. When {@code force} is {@code false}, * any open graph transaction is rolled back before shutdown. When {@code force} is {@code true}, @@ -183,7 +163,7 @@ public class UnmanagedTransaction { // reorder these two statements. manager.destroy(transactionId); executor.shutdown(); - Optional.ofNullable(timeoutFuture.get()).ifPresent(f -> f.cancel(true)); + Optional.ofNullable(idleFuture.get()).ifPresent(f -> f.cancel(true)); logger.debug("Transaction {} closed", transactionId); } @@ -199,7 +179,83 @@ public class UnmanagedTransaction { public Future<?> submit(final FutureTask<Void> task) { if (!accepting.get()) throw new IllegalStateException("Transaction " + transactionId + " is closed"); - touch(); + // Insurance backstop: cancel (do NOT arm) the idle timer on submit. Arming is the executor's job, done in + // afterExecute once the worker parks with an empty queue. beforeExecute will also cancel when the task starts; + // cancelling here too closes the small window between accepting a task and the worker picking it up. + cancelIdleTimer(); return executor.submit(task); } + + /** + * Suspends the inactivity timer because an operation is running (or about to run) on the transaction thread. + * Invoked from {@link SingleThreadTransactionExecutor#beforeExecute} and, as a backstop, from {@link #submit}. + * <p> + * A long-running operation must not trip the idle timeout: while an operation is in progress the idle timer is + * simply not armed (the operation's own duration is bounded by the per-request {@code evaluationTimeout} instead). + */ + private void cancelIdleTimer() { + idleFuture.updateAndGet(future -> { + if (future != null) future.cancel(false); + return null; + }); + } + + /** + * (Re)arms the inactivity timer, but only when the transaction is genuinely idle. Invoked from + * {@link SingleThreadTransactionExecutor#afterExecute} once an operation has finished and the worker is about to + * look for more work. + * <p> + * "Idle" means: still {@link #accepting} new work (not closing), the executor queue is empty (no sibling request is + * already waiting — on a single thread there is a brief instant between one task finishing and the next starting), + * and the idle timeout is enabled ({@code idleTimeout > 0}; {@code 0} disables idle reclamation entirely). When all + * hold, a fresh {@code close(false)} is scheduled {@code idleTimeout} ms out, replacing any previously scheduled one. + */ + private void maybeScheduleIdleTimer() { + if (!accepting.get()) return; // closing/closed: never re-arm a dying transaction + if (idleTimeout <= 0) return; // 0 (or negative) disables idle reclamation + if (!executor.getQueue().isEmpty()) return; // a sibling task is already queued -> not idle yet + + idleFuture.updateAndGet(future -> { + if (future != null) future.cancel(false); + return scheduledExecutorService.schedule(() -> { + logger.info("Transaction {} timed out after {} ms of inactivity", transactionId, idleTimeout); + close(false); + }, idleTimeout, TimeUnit.MILLISECONDS); + }); + + // The accepting check above and the arm below are not atomic: a concurrent close() could have flipped + // accepting=false and cancelled idleFuture in between, leaving the timer we just armed orphaned (it would fire + // ~idleTimeout later and call close() on an already-gone transaction). Re-check after arming and cancel if so, + // so the "never re-arm a dying transaction" invariant actually holds. + if (!accepting.get()) cancelIdleTimer(); + } + + /** + * A single-threaded {@link ThreadPoolExecutor} (one core and max thread) that runs all operations for a single + * transaction on the same worker thread, preserving the ThreadLocal nature of graph transactions. + * <p> + * It is used in place of {@link java.util.concurrent.Executors#newSingleThreadExecutor} solely to expose the + * {@link #beforeExecute}/{@link #afterExecute} lifecycle hooks (and, via {@link #getQueue()}, the pending-task + * queue), which the enclosing {@link UnmanagedTransaction} needs to distinguish "an operation is running" from + * "the worker is idle with nothing queued". Task-execution semantics are otherwise identical to a single-thread + * executor: one worker, FIFO ordering. Submitted {@link FutureTask}s are returned unwrapped so callers can + * {@code cancel(true)} the real work (e.g. the per-request evaluation timeout interrupting a running operation). + */ + private final class SingleThreadTransactionExecutor extends ThreadPoolExecutor { + private SingleThreadTransactionExecutor(final ThreadFactory threadFactory) { + super(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), threadFactory); + } + + @Override + protected void beforeExecute(final Thread t, final Runnable r) { + super.beforeExecute(t, r); + cancelIdleTimer(); + } + + @Override + protected void afterExecute(final Runnable r, final Throwable t) { + super.afterExecute(r, t); + maybeScheduleIdleTimer(); + } + } } diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java index 3995337566..ab6be093e2 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java @@ -306,7 +306,7 @@ public class ServerGremlinExecutor { transactionManager = new TransactionManager( scheduledExecutorService, graphManager, - settings.transactionTimeout, + settings.idleTransactionTimeout, settings.maxConcurrentTransactions, settings.perGraphCloseTimeout ); diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java index 69f387b17e..8a6c4700cd 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java @@ -85,10 +85,10 @@ public class GremlinDriverTransactionIntegrateTest extends AbstractGremlinServer case "shouldTimeoutIdleTransaction": case "shouldTimeoutIdleTransactionWithNoOperations": case "shouldRejectLateCommitAfterTimeout": - settings.transactionTimeout = 1000; + settings.idleTransactionTimeout = 1000; break; case "shouldTimeoutOnlyIdleTransactionNotActiveOne": - settings.transactionTimeout = 2000; + settings.idleTransactionTimeout = 2000; break; } return settings; diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java index 95895e66b4..c9a05b82ab 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java @@ -93,17 +93,21 @@ public class GremlinServerHttpTransactionIntegrateTest extends AbstractGremlinSe break; case "shouldTimeoutFreeSlotUnderMaxConcurrentTransactions": settings.maxConcurrentTransactions = 1; - settings.transactionTimeout = 1000; + settings.idleTransactionTimeout = 1000; break; case "shouldTimeoutIdleTransactionWithNoOperations": - settings.transactionTimeout = 500; + settings.idleTransactionTimeout = 500; break; case "shouldTimeoutAndRejectLateCommit": case "shouldTrackTransactionCountAccurately": - settings.transactionTimeout = 1000; + settings.idleTransactionTimeout = 1000; break; case "shouldRollbackAbandonedTransaction": - settings.transactionTimeout = 300; + settings.idleTransactionTimeout = 300; + break; + case "shouldNotIdleTimeoutLongRunningOperation": + // Short idle timeout, but a single long operation must NOT trip it (idle suspended while busy). + settings.idleTransactionTimeout = 500; break; case "shouldRejectMismatchedGraphAliasInTransaction": { final Settings.GraphSettings gs = new Settings.GraphSettings(); @@ -528,6 +532,47 @@ public class GremlinServerHttpTransactionIntegrateTest extends AbstractGremlinSe } } + @Test + public void shouldNotIdleTimeoutLongRunningOperation() throws Exception { + // With a short idle timeout (500ms), a single operation that runs LONGER than the idle timeout must still + // succeed -- the idle timer is suspended while an operation is in progress, so a long-running op is not + // reclaimed mid-execution (it is instead bounded by evaluationTimeout, left at its default here). + final String txId = beginTx(client, GTX); + + // Seed two vertices and an edge so repeat(both()) has something to traverse and keeps the executor busy. Each + // response body is fully consumed before the next request is sent: the chunked stream is only complete once the + // server has finished processing, so consuming guarantees these requests are strictly ordered. + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.addV().property(T.id, 1)", GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + EntityUtils.consume(r.getEntity()); + } + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.addV().property(T.id, 2)", GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + EntityUtils.consume(r.getEntity()); + } + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.V(1).addE('self').to(__.V(2))", GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + EntityUtils.consume(r.getEntity()); + } + + // A traversal that runs well past the 500ms idle timeout. Under the old arm-on-arrival behavior the idle timer + // would have fired mid-execution; under suspend-while-busy it does not. + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.V().repeat(both()).times(2000)", GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + EntityUtils.consume(r.getEntity()); + } + + // The transaction is still alive and usable after the long op (it was not reclaimed mid-flight). + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.V().count()", GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + assertEquals(2, extractCount(r)); // extractCount fully reads the body + } + try (final CloseableHttpResponse r = commitTx(client, txId, GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + EntityUtils.consume(r.getEntity()); + } + } + @Test public void shouldRejectMismatchedGraphAliasInTransaction() throws Exception { final String txId = beginTx(client, GTX); diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransactionTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransactionTest.java new file mode 100644 index 0000000000..81a8a65949 --- /dev/null +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransactionTest.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server.transaction; + +import org.apache.tinkerpop.gremlin.server.util.ManualScheduledExecutorService; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.Transaction; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link UnmanagedTransaction}, driven by a deterministic {@link ManualScheduledExecutorService} so the + * inactivity-timeout behaviour can be asserted without real wall-clock waits. + * <p> + * These are <em>specification</em> tests for the reworked idle timer (suspend-while-busy): the idle timer is armed only + * when the transaction goes idle (no operation running, empty queue) and is suspended while an operation runs. The idle + * timer is (re)armed from the executor's {@code afterExecute} hook, which runs on the transaction worker thread, so + * timer assertions poll the scheduler with a bounded wait via {@link #awaitPendingTimer(boolean)}. + */ +public class UnmanagedTransactionTest { + + private static final String TX_ID = "test-tx-0001"; + private static final long TIMEOUT_MS = 600000L; + private static final long PER_GRAPH_CLOSE_MS = 10000L; + private static final long AWAIT_MS = 5000L; + + private TransactionManager manager; + private Graph graph; + private ManualScheduledExecutorService scheduler; + private UnmanagedTransaction tx; + + @Before + public void setUp() { + manager = mock(TransactionManager.class); + graph = mock(Graph.class); + final Transaction graphTx = mock(Transaction.class); + when(graph.tx()).thenReturn(graphTx); + when(graphTx.isOpen()).thenReturn(false); // rollback path is a no-op during close(false) + + scheduler = new ManualScheduledExecutorService(); + tx = new UnmanagedTransaction(TX_ID, manager, "g", graph, scheduler, TIMEOUT_MS, PER_GRAPH_CLOSE_MS); + + // close() short-circuits unless the manager still knows about the transaction. + when(manager.get(TX_ID)).thenReturn(Optional.of(tx)); + } + + /** + * Submits a no-op task and blocks until it has finished running on the worker thread. + */ + private void runOp() throws Exception { + tx.submit(new FutureTask<>(() -> null)).get(AWAIT_MS, TimeUnit.MILLISECONDS); + } + + /** + * Waits (bounded) for the idle timer to reach the expected armed/not-armed state, since it is (re)armed on the + * worker thread from afterExecute slightly after the submitted task's Future completes. Returns once the condition + * holds or the wait elapses; the caller asserts on the final state. + */ + private void awaitPendingTimer(final boolean expectArmed) throws InterruptedException { + final long deadline = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(AWAIT_MS); + while (System.nanoTime() < deadline) { + if ((scheduler.getPendingTaskCount() == 1) == expectArmed) return; + Thread.sleep(5); + } + } + + @Test + public void shouldNotScheduleAnyCloseAtConstruction() { + assertEquals(0, scheduler.getScheduledTaskCount()); + assertEquals(0, scheduler.getPendingTaskCount()); + } + + @Test + public void shouldArmIdleTimerWhenWorkerGoesIdleAfterAnOperation() throws Exception { + runOp(); + + awaitPendingTimer(true); + assertEquals("idle timer should be armed once the worker parks with an empty queue", + 1, scheduler.getPendingTaskCount()); + assertEquals(TIMEOUT_MS, scheduler.nextPendingDelayMillis()); + } + + @Test + public void shouldNotArmIdleTimerWhileAnOperationIsRunning() throws Exception { + // Hold an operation "running" and assert no idle timer is armed during that window. + final CountDownLatch started = new CountDownLatch(1); + final CountDownLatch release = new CountDownLatch(1); + final Future<?> running = tx.submit(new FutureTask<>(() -> { + started.countDown(); + release.await(); + return null; + })); + + assertTrue(started.await(AWAIT_MS, MILLISECONDS)); + // While the op runs, the idle timer must not be armed (a long op must not trip the idle timeout). + assertEquals(0, scheduler.getPendingTaskCount()); + + release.countDown(); + running.get(AWAIT_MS, MILLISECONDS); + + // Once the worker goes idle, the timer arms. + awaitPendingTimer(true); + assertEquals(1, scheduler.getPendingTaskCount()); + } + + @Test + public void shouldNotFireIdleCloseForALongRunningOperation() throws Exception { + // A single operation that runs longer than the idle timeout must not be reclaimed mid-execution: no timer is + // armed while it runs, so advancing the clock far past the timeout fires nothing. + final CountDownLatch started = new CountDownLatch(1); + final CountDownLatch release = new CountDownLatch(1); + final Future<?> running = tx.submit(new FutureTask<>(() -> { + started.countDown(); + release.await(); + return null; + })); + assertTrue(started.await(AWAIT_MS, MILLISECONDS)); + + scheduler.advanceTimeBy(TIMEOUT_MS * 2, MILLISECONDS); + + verify(manager, never()).destroy(TX_ID); + release.countDown(); + running.get(AWAIT_MS, MILLISECONDS); + } + + @Test + public void shouldCloseTransactionWhenIdleTimeoutFires() throws Exception { + runOp(); + awaitPendingTimer(true); + + scheduler.advanceTimeBy(TIMEOUT_MS, MILLISECONDS); + + // The scheduled close(false) removes the transaction from the manager. + verify(manager).destroy(TX_ID); + assertEquals(0, scheduler.getPendingTaskCount()); + } + + @Test + public void shouldNotCloseBeforeIdleTimeoutElapses() throws Exception { + runOp(); + awaitPendingTimer(true); + + scheduler.advanceTimeBy(TIMEOUT_MS - 1, MILLISECONDS); + + verify(manager, never()).destroy(TX_ID); + assertEquals(1, scheduler.getPendingTaskCount()); + } + + @Test + public void shouldReArmIdleTimerAfterEachOperation() throws Exception { + runOp(); + awaitPendingTimer(true); + assertEquals(1, scheduler.getScheduledTaskCount()); + + runOp(); + awaitPendingTimer(true); + + // A second operation cancels the prior idle timer and arms a fresh one. + assertEquals(2, scheduler.getScheduledTaskCount()); + assertEquals(1, scheduler.getPendingTaskCount()); + } + + @Test + public void shouldNotArmIdleTimerWhenIdleTimeoutDisabled() throws Exception { + // idleTransactionTimeout == 0 disables idle reclamation entirely: the timer is never armed. + final UnmanagedTransaction disabledTx = + new UnmanagedTransaction(TX_ID, manager, "g", graph, scheduler, 0L, PER_GRAPH_CLOSE_MS); + + disabledTx.submit(new FutureTask<>(() -> null)).get(AWAIT_MS, TimeUnit.MILLISECONDS); + + awaitPendingTimer(false); + assertEquals(0, scheduler.getPendingTaskCount()); + } + + @Test + public void shouldCancelScheduledCloseOnExplicitClose() throws Exception { + runOp(); + awaitPendingTimer(true); + + tx.close(true); + + verify(manager).destroy(TX_ID); + // The pending inactivity close must be cancelled so it cannot fire after the transaction is gone. + assertEquals(0, scheduler.getPendingTaskCount()); + } + + @Test + public void shouldNotReArmIdleTimerAfterClose() throws Exception { + runOp(); + awaitPendingTimer(true); + + tx.close(false); + + verify(manager).destroy(TX_ID); + // Advancing the clock must not resurrect a close on a transaction that is already gone. + scheduler.advanceTimeBy(TIMEOUT_MS * 2, MILLISECONDS); + assertEquals(0, scheduler.getPendingTaskCount()); + } + + // ---- Step 2: SingleThreadTransactionExecutor invariants (executor swap) ---- + + @Test + public void shouldRunSubmittedTasksOnASingleNamedTransactionThreadInOrder() throws Exception { + final List<String> executionOrder = new CopyOnWriteArrayList<>(); + final List<String> threadNames = new CopyOnWriteArrayList<>(); + + Future<?> last = null; + for (int i = 0; i < 5; i++) { + final int n = i; + last = tx.submit(new FutureTask<>(() -> { + threadNames.add(Thread.currentThread().getName()); + executionOrder.add("task-" + n); + return null; + })); + } + last.get(5, TimeUnit.SECONDS); // FIFO single thread: the last task completing means all ran + + assertEquals(List.of("task-0", "task-1", "task-2", "task-3", "task-4"), executionOrder); + // All ran on one thread, and that thread is the named transaction worker. + assertEquals(1, threadNames.stream().distinct().count()); + assertTrue("expected tx-* thread but was " + threadNames.get(0), + threadNames.get(0).startsWith("tx-")); + } + + @Test + public void shouldInterruptRunningTaskWhenReturnedFutureIsCancelled() throws Exception { + // Guards the "do NOT wrap submitted tasks" invariant: cancel(true) on the Future returned by submit() must + // interrupt the real work, exactly as the per-request evaluation timeout relies on in the handler. + final CountDownLatch started = new CountDownLatch(1); + final AtomicBoolean interrupted = new AtomicBoolean(false); + final AtomicReference<Throwable> unexpected = new AtomicReference<>(); + + final Future<?> running = tx.submit(new FutureTask<>(() -> { + started.countDown(); + try { + Thread.sleep(30000); // block until interrupted by cancel(true) + } catch (InterruptedException e) { + interrupted.set(true); + throw e; + } catch (Throwable t) { + unexpected.set(t); + } + return null; + })); + + assertTrue("task did not start", started.await(5, TimeUnit.SECONDS)); + running.cancel(true); + + // Give the worker a moment to observe the interrupt and record it. + final long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5); + while (!interrupted.get() && System.nanoTime() < deadline) { + Thread.sleep(10); + } + assertTrue("cancel(true) did not interrupt the running task", interrupted.get()); + assertEquals(null, unexpected.get()); + } +} diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/util/ManualScheduledExecutorService.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/util/ManualScheduledExecutorService.java new file mode 100644 index 0000000000..21d93a2e08 --- /dev/null +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/util/ManualScheduledExecutorService.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server.util; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * A deterministic, single-threaded test double for {@link ScheduledExecutorService} backed by a virtual clock. + * <p> + * It exists so timer-driven behavior (such as the {@code gremlin-server} transaction idle / lifetime timeouts) can be + * exercised without {@link Thread#sleep(long)} and without real wall-clock waits, which are slow and flaky. Time only + * advances when the test calls {@link #advanceTimeBy(long, TimeUnit)} (or {@link #runDueTasks()}), at which point any + * scheduled task whose trigger time has been reached is run synchronously on the calling thread, in trigger-time order. + * <p> + * Only the one-shot {@link #schedule(Runnable, long, TimeUnit)} overload is implemented, because that is all the + * transaction code under test uses. Every other {@link ScheduledExecutorService} method throws + * {@link UnsupportedOperationException} with an explanatory message so that an unsupported use is loud rather than + * silently wrong. + * <p> + * It is thread-safe: the transaction idle timer is armed/cancelled on the transaction's worker thread (via the + * executor's before/afterExecute hooks) while a test thread advances the clock and reads counts. All shared state is + * guarded by {@code lock}. Fired task commands are run <em>outside</em> the lock — a fired idle close calls + * {@code close(false)}, which submits a rollback to the transaction executor and blocks on it; running it under the + * lock could deadlock against the worker thread re-entering {@link #schedule}. + */ +public class ManualScheduledExecutorService implements ScheduledExecutorService { + + private final Object lock = new Object(); + private final List<ScheduledTask> tasks = new ArrayList<>(); + private long nowMillis = 0L; + private int scheduledCount = 0; + + /** + * Schedules a one-shot task to run when the virtual clock advances by at least {@code delay}. Returns a + * {@link ScheduledFuture} whose {@link Future#cancel(boolean)} prevents the task from running on a later advance. + */ + @Override + public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) { + synchronized (lock) { + final ScheduledTask task = new ScheduledTask(command, nowMillis + unit.toMillis(delay)); + tasks.add(task); + scheduledCount++; + return task; + } + } + + /** + * Advances the virtual clock by the given amount and runs every task that is now due (trigger time {@code <=} the + * new current time) and not cancelled, in ascending trigger-time order. + */ + public void advanceTimeBy(final long amount, final TimeUnit unit) { + synchronized (lock) { + nowMillis += unit.toMillis(amount); + } + runDueTasks(); + } + + /** + * Runs every currently-due, non-cancelled task without advancing the clock. Useful for firing a zero-delay task. + * Each due task is selected under the lock but executed outside it (see class Javadoc). + */ + public void runDueTasks() { + while (true) { + final ScheduledTask next; + synchronized (lock) { + ScheduledTask soonest = null; + // Loop because a fired task may schedule another task that is itself immediately due. + for (final ScheduledTask t : tasks) { + if (!t.cancelled && !t.done && t.triggerAtMillis <= nowMillis) { + if (soonest == null || t.triggerAtMillis < soonest.triggerAtMillis) soonest = t; + } + } + if (soonest == null) return; + soonest.done = true; // mark done under lock so it is not re-selected + next = soonest; + } + next.command.run(); // run OUTSIDE the lock + } + } + + /** + * The number of tasks that are still scheduled to run (not cancelled, not yet fired). + */ + public int getPendingTaskCount() { + synchronized (lock) { + int count = 0; + for (final ScheduledTask t : tasks) { + if (!t.cancelled && !t.done) count++; + } + return count; + } + } + + /** + * The total number of tasks ever scheduled, including ones later cancelled or already fired. Lets a test assert + * that a reschedule actually issued a fresh {@code schedule(...)} call. + */ + public int getScheduledTaskCount() { + synchronized (lock) { + return scheduledCount; + } + } + + /** + * The remaining delay (ms) until the soonest still-pending task fires, or {@code -1} if none is pending. + */ + public long nextPendingDelayMillis() { + synchronized (lock) { + long soonest = Long.MAX_VALUE; + for (final ScheduledTask t : tasks) { + if (!t.cancelled && !t.done) soonest = Math.min(soonest, t.triggerAtMillis - nowMillis); + } + return soonest == Long.MAX_VALUE ? -1L : soonest; + } + } + + private final class ScheduledTask implements ScheduledFuture<Object> { + private final Runnable command; + private final long triggerAtMillis; + private volatile boolean cancelled = false; + private volatile boolean done = false; + + private ScheduledTask(final Runnable command, final long triggerAtMillis) { + this.command = command; + this.triggerAtMillis = triggerAtMillis; + } + + @Override + public long getDelay(final TimeUnit unit) { + synchronized (lock) { + return unit.convert(triggerAtMillis - nowMillis, TimeUnit.MILLISECONDS); + } + } + + @Override + public int compareTo(final Delayed o) { + return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); + } + + @Override + public boolean cancel(final boolean mayInterruptIfRunning) { + if (done || cancelled) return false; + cancelled = true; + return true; + } + + @Override + public boolean isCancelled() { + return cancelled; + } + + @Override + public boolean isDone() { + return done || cancelled; + } + + @Override + public Object get() { + return null; + } + + @Override + public Object get(final long timeout, final TimeUnit unit) { + return null; + } + } + + // ---- Unsupported ScheduledExecutorService surface: fail loudly rather than behave unexpectedly. ---- + + private static UnsupportedOperationException unsupported(final String method) { + return new UnsupportedOperationException( + ManualScheduledExecutorService.class.getSimpleName() + " does not support " + method + + "; only schedule(Runnable, long, TimeUnit) is implemented for tests."); + } + + @Override + public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, final TimeUnit unit) { + throw unsupported("schedule(Callable, long, TimeUnit)"); + } + + @Override + public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, final TimeUnit unit) { + throw unsupported("scheduleAtFixedRate"); + } + + @Override + public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, final TimeUnit unit) { + throw unsupported("scheduleWithFixedDelay"); + } + + @Override + public void execute(final Runnable command) { + throw unsupported("execute"); + } + + @Override + public void shutdown() { + throw unsupported("shutdown"); + } + + @Override + public List<Runnable> shutdownNow() { + throw unsupported("shutdownNow"); + } + + @Override + public boolean isShutdown() { + throw unsupported("isShutdown"); + } + + @Override + public boolean isTerminated() { + throw unsupported("isTerminated"); + } + + @Override + public boolean awaitTermination(final long timeout, final TimeUnit unit) { + throw unsupported("awaitTermination"); + } + + @Override + public <T> Future<T> submit(final Callable<T> task) { + throw unsupported("submit(Callable)"); + } + + @Override + public <T> Future<T> submit(final Runnable task, final T result) { + throw unsupported("submit(Runnable, T)"); + } + + @Override + public Future<?> submit(final Runnable task) { + throw unsupported("submit(Runnable)"); + } + + @Override + public <T> List<Future<T>> invokeAll(final java.util.Collection<? extends Callable<T>> tasks) { + throw unsupported("invokeAll"); + } + + @Override + public <T> List<Future<T>> invokeAll(final java.util.Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit) { + throw unsupported("invokeAll"); + } + + @Override + public <T> T invokeAny(final java.util.Collection<? extends Callable<T>> tasks) throws ExecutionException { + throw unsupported("invokeAny"); + } + + @Override + public <T> T invokeAny(final java.util.Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit) throws ExecutionException { + throw unsupported("invokeAny"); + } +}
