This is an automated email from the ASF dual-hosted git repository. kenhuuu pushed a commit to branch tx-idle-to in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 53e5ba1a96e6ca5fc4b00c07f56d17963d4294b4 Author: Ken Hu <[email protected]> AuthorDate: Thu Jun 25 19:00:24 2026 -0700 Add maxTransactionLifetime absolute cap for HTTP transactions The idle timeout only reclaims transactions that go quiet; a client could still hold a transaction (and its dedicated worker thread and concurrency slot) open indefinitely with a single long operation or a keep-alive drip. maxTransactionLifetime bounds total transaction age regardless of activity: when it fires it interrupts the running operation and rolls the transaction back, so the in-flight client gets a transaction-timeout (504) rather than a misleading evaluation-timeout error. Rather than validate timeout configuration and fail begins (or silently override a client's timeoutMs) when bounds are disabled, the server ships sane defaults instead: idle reclamation at 1 minute and a lifetime cap at 10 minutes. A transaction is bounded out of the box, disabling the bounds is a deliberate operator choice, and a per-request timeoutMs is always honored as sent rather than second-guessed on the client's behalf. Assisted-by: Claude Code:claude-opus-4-8 --- CHANGELOG.asciidoc | 2 + docs/src/dev/provider/index.asciidoc | 15 ++- docs/src/reference/gremlin-applications.asciidoc | 25 ++-- .../conf/gremlin-server-transaction.yaml | 2 + .../apache/tinkerpop/gremlin/server/Context.java | 20 +++ .../apache/tinkerpop/gremlin/server/Settings.java | 17 ++- .../server/handler/HttpGremlinEndpointHandler.java | 36 ++++-- .../server/transaction/TransactionManager.java | 49 ++++++-- .../server/transaction/UnmanagedTransaction.java | 80 +++++++++++- .../gremlin/server/util/ServerGremlinExecutor.java | 1 + .../GremlinServerHttpTransactionIntegrateTest.java | 62 ++++++++++ .../tinkerpop/gremlin/server/SettingsTest.java | 19 +++ .../handler/HttpGremlinEndpointHandlerTest.java | 38 ++++++ .../server/transaction/TransactionManagerTest.java | 136 +++++++++++++++++++++ .../transaction/UnmanagedTransactionTest.java | 105 ++++++++++++++-- .../gremlin/server/gremlin-server-integration.yaml | 3 + 16 files changed, 570 insertions(+), 40 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 0fe949254e..0b74c229ac 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -25,6 +25,8 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima [[release-4-0-0]] === TinkerPop 4.0.0 (Release Date: NOT OFFICIALLY RELEASED YET) +* Added `maxTransactionLifetime` setting to Gremlin Server, an absolute cap on the total age of an HTTP transaction that interrupts a running operation and rolls the transaction back when it fires (default 600000ms, set to `0` to disable). +* Changed the Gremlin Server HTTP transaction idle timer to suspend while an operation is running (so a long-running operation is bounded by `evaluationTimeout` rather than the idle timeout) and to honor `0` as "disable idle reclamation"; the `idleTransactionTimeout` default is now 60000ms. * Added configurable CORS `allowedOrigins` setting to Gremlin Server; warns when wildcard origin is used alongside authentication. * Fixed `ByteBuf` leak in `GraphBinaryMessageSerializerV4` when serialization throws an `IOException`. * Changed `Tree` to no longer extend `HashMap`; it is now a final class with a tree-shaped API (`childAt`, `hasChild`, `contains`, `findSubtree`, `getOrCreateChild`, `getNodesAtDepth`, `getLeafNodes`, `nodeCount`) and is no longer a `Map`. diff --git a/docs/src/dev/provider/index.asciidoc b/docs/src/dev/provider/index.asciidoc index 1f6ed71945..ec642cbfa1 100644 --- a/docs/src/dev/provider/index.asciidoc +++ b/docs/src/dev/provider/index.asciidoc @@ -1430,10 +1430,17 @@ rejects it with HTTP 400. This prevents cross-graph operations within a single t ==== Transaction Timeout and Idle Reclamation -Servers implement a configurable transaction timeout (`idleTransactionTimeout`, default 600000ms). The timeout -represents how long a transaction can sit idle with no requests before the server forcibly rolls it back and removes it. -The timeout resets on each request received for that transaction, so active transactions are not affected. After a -timeout fires, any subsequent request with that transaction ID receives a 404 response. +A transaction can be bounded at three independent scopes, each disabled with `0`. The `evaluationTimeout` bounds a +single operation. The `idleTransactionTimeout` (default 60000ms) bounds the idle gaps between operations: how long a +transaction may remain idle, with no operation running or queued, before it is rolled back and removed; once it is +removed, a subsequent request with that transaction ID receives a 404. The `maxTransactionLifetime` (default 600000ms) +bounds the total age of the transaction regardless of activity, and so may end a transaction while an operation is still +running; the in-flight request then receives a 504 (`TransactionException`) and subsequent requests receive a 404. + +The defaults bound a transaction without any operator configuration; disabling them is a deliberate choice, and a +per-request `timeoutMs` is honored as sent rather than overridden by the server. How promptly a running operation is +actually interrupted when a bound fires is a property of the provider's execution and traversal machinery, not +guaranteed by these settings. ==== Transaction Capacity Limits diff --git a/docs/src/reference/gremlin-applications.asciidoc b/docs/src/reference/gremlin-applications.asciidoc index 3b1989ab2f..eaa9950b69 100644 --- a/docs/src/reference/gremlin-applications.asciidoc +++ b/docs/src/reference/gremlin-applications.asciidoc @@ -930,7 +930,7 @@ The following table describes the various YAML configuration options that Gremli |gremlinPool |The number of "Gremlin" threads available to execute actual scripts in a `ScriptEngine`. This pool represents the workers available to handle blocking operations in Gremlin Server. When set to `0`, Gremlin Server will use the value provided by `Runtime.availableProcessors()`. |0 |host |The name of the host to bind the server to. |localhost |idleConnectionTimeout |Time in milliseconds that the server will allow a channel to not receive requests from a client before it automatically closes. If enabled, the value provided should typically exceed the amount of time given to `keepAliveInterval`. Note that while this value is to be provided as milliseconds it will resolve to second precision. Set this value to `0` to disable this feature. |0 -|idleTransactionTimeout |Time in milliseconds that a transaction can sit idle (no requests) before the server forcibly rolls it back and removes it. The timeout resets on each request received for that transaction. Set to `0` to disable this feature. |600000 +|idleTransactionTimeout |Time in milliseconds that a transaction can remain idle (no operation running or queued) before the server forcibly rolls it back and removes it. The idle timer is suspended while an operation is running, so a long-running operation does not trip it (its duration is instead bounded by `evaluationTimeout`); the timer is armed only once the transaction returns to idle. Set to `0` to disable idle reclamation. |60000 |keepAliveInterval |Time in milliseconds that the server will allow a channel to not send responses to a client before it sends a "ping" to see if it is still present. If it is present, the client should respond with a "pong" which will thus reset the `idleConnectionTimeout` and keep the channel open. If enabled, this number should be smaller than the value provided to the `idleConnectionTimeout`. Note that while this value is to be provided as milliseconds it will resolve to second prec [...] |maxAccumulationBufferComponents |Maximum number of request components that can be aggregated for a message. |1024 |maxChunkSize |The maximum length of the content or each chunk. If the content length exceeds this value, the transfer encoding of the decoded request will be converted to 'chunked' and the content will be split into multiple `HttpContent` objects. If the transfer encoding of the HTTP request is 'chunked' already, each chunk will be split into smaller chunks if the length of the chunk exceeds this value. |8192 @@ -938,6 +938,7 @@ The following table describes the various YAML configuration options that Gremli |maxHeaderSize |The maximum length of all headers. |8192 |maxInitialLineLength |The maximum length of the initial line (e.g. "GET / HTTP/1.0") processed in a request, which essentially controls the maximum length of the submitted URI. |4096 |maxParameters |The maximum number of parameters that can be passed on a request. Larger numbers may impact performance for scripts. This configuration only applies to the `HttpChannelizer`. |16 +|maxTransactionLifetime |Absolute cap in milliseconds on the total age of a transaction regardless of activity. Unlike `idleTransactionTimeout`, it fires even while an operation is running, interrupting it and rolling the transaction back. Set to `0` to disable the cap. |600000 |maxRequestContentLength |The maximum length of the aggregated content for a request message. Works in concert with `maxChunkSize` where chunked requests are accumulated back into a single message. A request exceeding this size will return a `413 - Request Entity Too Large` status code. |10485760 |maxWorkQueueSize |The maximum size the general processing queue can grow before the `gremlinPool` starts to reject requests. |8192 |metrics.consoleReporter.enabled |Turns on console reporting of metrics. |false @@ -2259,12 +2260,22 @@ IMPORTANT: Not all graph implementations support explicit transactions (for exam Use `TinkerTransactionGraph` or another graph implementation that supports explicit transactions. Attempting to begin an explicit transaction on a graph that does not support them will result in an error. -Two settings in the Gremlin Server YAML control transaction resource usage: - -* `idleTransactionTimeout` (default: 600000ms) -- How long a transaction can sit idle before the server forcibly rolls - it back. The timeout resets on each request received for that transaction. -* `maxConcurrentTransactions` (default: 1000) -- The maximum number of open transactions allowed. When the limit is - reached, new begin requests are rejected with HTTP 503. +Several settings in the Gremlin Server YAML control transaction resource usage. The `idleTransactionTimeout` +(default 60000ms) governs how long a transaction can remain idle (no operation running or queued) before the +server forcibly rolls it back. The idle timer is suspended while an operation is running, so a long-running operation +does not trip it; it is armed only once the transaction returns to idle. Set it to `0` to disable idle reclamation. The +`maxTransactionLifetime` (default 600000ms) is an absolute cap on the total age of a transaction regardless of +activity. Unlike `idleTransactionTimeout`, it fires even while an operation is running, interrupting it and rolling the +transaction back. It bounds transaction lifetime and concurrency-slot occupancy absolutely; like `evaluationTimeout`, +its ability to free the underlying worker thread depends on the operation reaching an interruptible point. Set it to +`0` to disable the cap. Finally, `maxConcurrentTransactions` (default 1000) caps the number of open transactions +allowed; when the limit is reached, new begin requests are rejected with HTTP 503. + +These compose with the per-operation `evaluationTimeout` (and its per-request `timeoutMs` override) to bound a +transaction at three independent scopes: a single operation (`evaluationTimeout`), the gaps between operations +(`idleTransactionTimeout`), and the transaction as a whole (`maxTransactionLifetime`). The defaults keep a transaction +bounded out of the box; disabling all of them is a deliberate operator choice, and a per-request `timeoutMs` is always +honored as sent. Each open transaction consumes a dedicated thread on the server to maintain thread-local transaction state for the underlying graph. Ensure that clients close transactions promptly and that the `idleTransactionTimeout` is set to diff --git a/gremlin-server/conf/gremlin-server-transaction.yaml b/gremlin-server/conf/gremlin-server-transaction.yaml index 6b61f40a25..6db62912a9 100644 --- a/gremlin-server/conf/gremlin-server-transaction.yaml +++ b/gremlin-server/conf/gremlin-server-transaction.yaml @@ -31,6 +31,8 @@ metrics: { jmxReporter: {enabled: true}, slf4jReporter: {enabled: true, interval: 180000}} strictTransactionManagement: false +idleTransactionTimeout: 60000 +maxTransactionLifetime: 600000 idleConnectionTimeout: 0 keepAliveInterval: 0 maxInitialLineLength: 4096 diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java index 51373d7741..598514ed6c 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java @@ -57,6 +57,10 @@ public class Context { private final Object timeoutExecutorLock = new Object(); private String transactionId; // initially null for non-transactional requests and begin() calls; set after transaction creation. private Map<String, Object> parameters = new HashMap<>(); // only available after string parameters are parsed by grammar. + // Set by the transaction's lifetime cap (on the scheduler thread) before it interrupts this request's operation, and + // read as the interrupt unwinds the operation (on the transaction worker thread) to report an accurate + // transaction-timeout error rather than a generic evaluation timeout. volatile for cross-thread visibility. + private volatile boolean closedByLifetimeCap = false; public Context(final RequestMessage requestMessage, final ChannelHandlerContext ctx, final Settings settings, final GraphManager graphManager, @@ -143,6 +147,22 @@ public class Context { this.transactionId = transactionId; } + /** + * Marks this request's operation as having been interrupted because its transaction hit its absolute lifetime cap. + * Set by the transaction's lifetime cap before it interrupts the operation. + */ + public void setClosedByLifetimeCap(final boolean closedByLifetimeCap) { + this.closedByLifetimeCap = closedByLifetimeCap; + } + + /** + * Returns {@code true} if this request's operation was interrupted by its transaction's absolute lifetime cap, in + * which case the resulting interrupt should be reported as a transaction timeout rather than an evaluation timeout. + */ + public boolean isClosedByLifetimeCap() { + return closedByLifetimeCap; + } + public Map<String, Object> getParameters() { return this.parameters; } diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java index 236a4b4431..c8f718a774 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java @@ -188,10 +188,10 @@ public class Settings { * Time in milliseconds that a transaction can remain idle (no operation running or queued) before it is * automatically rolled back. This prevents resource leaks from abandoned transactions. The idle timer is suspended * while an operation is in progress, so a long-running operation does not trip it (its duration is instead bounded - * by {@link #evaluationTimeout}). Set to {@code 0} to disable idle reclamation entirely. Default is 600000 - * (10 minutes). + * by {@link #evaluationTimeout}). Set to {@code 0} to disable idle reclamation entirely. Default is 60000 + * (1 minute). */ - public long idleTransactionTimeout = 600000L; + public long idleTransactionTimeout = 60000L; /** * Time in milliseconds to wait for a transaction commit or rollback operation to complete. @@ -205,6 +205,17 @@ public class Settings { */ public int maxConcurrentTransactions = 1000; + /** + * Absolute ceiling, in milliseconds, on the total age of a transaction regardless of activity. Unlike + * {@link #idleTransactionTimeout} (which only reclaims idle transactions), this cap fires even while an operation is + * running, interrupting it and rolling the transaction back, so it bounds how long a single transaction can hold its + * dedicated worker thread and concurrency slot. The bound on transaction lifetime and slot occupancy is absolute; + * the bound on thread occupancy is best-effort in the same way {@link #evaluationTimeout} is, since interrupting a + * running operation only takes effect when it reaches an interruptible point. Set to {@code 0} to disable the cap. + * Default is 600000 (10 minutes). + */ + public long maxTransactionLifetime = 600000L; + /** * The full class name of the {@link Channelizer} to use in Gremlin Server. */ diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java index 11939008a8..eafae3a62a 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java @@ -184,6 +184,14 @@ public class HttpGremlinEndpointHandler extends SimpleChannelInboundHandler<Requ ctx.channel().attr(StateKey.RESPONSE_COORDINATOR).set(coordinator); final Timer.Context timerContext = evalOpTimer.time(); + + // Resolve the target transaction once for a transactional (non-begin) request and reuse it at submit below, so + // the work runs against exactly the transaction resolved here. Empty for begins / non-transactional requests, + // and also when the id is unknown (the submit path turns that into a 404). + final boolean isTransactionalOp = (requestCtx.getTransactionId() != null) && !requestCtx.isTransactionBegin(); + final Optional<UnmanagedTransaction> txForRequest = + isTransactionalOp ? transactionManager.get(requestCtx.getTransactionId()) : Optional.empty(); + // timeout override - handle both deprecated and newly named configuration. earlier logic should prevent // both configurations from being submitted at the same time final Long timeoutMs = requestMessage.getField(Tokens.TIMEOUT_MS); @@ -277,7 +285,7 @@ public class HttpGremlinEndpointHandler extends SimpleChannelInboundHandler<Requ coordinator.writeHeader(createResponseHeaders(ctx, serializer, requestCtx).toArray(CharSequence[]::new)); sendHttpContents(ctx, requestCtx, coordinator); } catch (Throwable t) { - coordinator.writeError(formErrorResponseMessage(t, requestMessage)); + coordinator.writeError(formErrorResponseMessage(t, requestMessage, requestCtx)); } finally { // Idempotent terminal backstop: if the data or error path already terminated the response, complete() // is a no-op via its COMPLETED short-circuit. It runs in finally — not at the end of the try — so the @@ -301,9 +309,11 @@ public class HttpGremlinEndpointHandler extends SimpleChannelInboundHandler<Requ }); try { - final boolean isBeginTransactionRequest = requestCtx.isTransactionBegin(); - final Future<?> executionFuture = ((requestCtx.getTransactionId() != null) && !isBeginTransactionRequest) ? - transactionManager.get(requestCtx.getTransactionId()).get().submit(evalFuture) : + // Reuse the transaction resolved above (txForRequest) rather than looking it up again. For a transactional + // op an empty Optional means the id is unknown/reclaimed: get() throws NoSuchElementException, caught below + // and reported as a 404, preserving the prior behavior. + final Future<?> executionFuture = isTransactionalOp ? + txForRequest.get().submit(evalFuture, requestCtx) : requestCtx.getGremlinExecutor().getExecutorService().submit(evalFuture); if (seto > 0) { // Schedule a timeout in the thread pool for future execution. The coordinator's monitor guarantees @@ -311,7 +321,12 @@ public class HttpGremlinEndpointHandler extends SimpleChannelInboundHandler<Requ // first wins, and the other's write becomes a no-op. requestCtx.setTimeoutExecutor(requestCtx.getScheduledExecutorService().schedule(() -> { executionFuture.cancel(true); - coordinator.writeError(GremlinError.timeout(requestMessage)); + // If the lifetime cap fired for this same operation (it flags the Context before interrupting), + // report the cap's 504 even when this eval-timeout task is the one that writes - so a cap-kill is + // never mislabeled as a generic "increase evaluationTimeout" 500 just because of writer ordering. + coordinator.writeError(requestCtx.isClosedByLifetimeCap() + ? GremlinError.transactionTimeout(requestCtx.getTransactionId(), "execute") + : GremlinError.timeout(requestMessage)); }, seto, TimeUnit.MILLISECONDS)); } } catch (RejectedExecutionException ree) { @@ -360,7 +375,7 @@ public class HttpGremlinEndpointHandler extends SimpleChannelInboundHandler<Requ } } - private GremlinError formErrorResponseMessage(Throwable t, RequestMessage requestMessage) { + GremlinError formErrorResponseMessage(Throwable t, RequestMessage requestMessage, final Context requestCtx) { if (t instanceof UndeclaredThrowableException) t = t.getCause(); // if any exception in the chain is TemporaryException or Failure then we should respond with the @@ -385,6 +400,13 @@ public class HttpGremlinEndpointHandler extends SimpleChannelInboundHandler<Requ return GremlinError.longFrame(t); } if (t instanceof InterruptedException || t instanceof TraversalInterruptedException) { + // An interrupt here is normally an evaluation timeout, but it is also how a transaction's absolute lifetime + // cap stops a running operation. In the cap case the transaction flagged this request's Context before + // interrupting, so report an accurate transaction-timeout (504) rather than the generic "increase + // evaluationTimeout" error (500), whose advice would be misleading for a lifetime-cap kill. + if (requestCtx != null && requestCtx.isClosedByLifetimeCap()) { + return GremlinError.transactionTimeout(requestCtx.getTransactionId(), "execute"); + } return GremlinError.timeout(requestMessage); } if (t instanceof TimedInterruptTimeoutException) { @@ -509,7 +531,7 @@ public class HttpGremlinEndpointHandler extends SimpleChannelInboundHandler<Requ txCtx.submit(new FutureTask<>(() -> { graph.tx().open(); return null; - })).get(5000, TimeUnit.MILLISECONDS); // Not an option for now, but 5s should be plenty. + }), ctx).get(5000, TimeUnit.MILLISECONDS); // Not an option for now, but 5s should be plenty. } catch (IllegalStateException ise) { throw new ProcessingException(GremlinError.maxTransactionsExceeded(ise.getMessage())); } catch (IllegalArgumentException iae) { diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManager.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManager.java index ec203e820e..6144b5fb52 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManager.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManager.java @@ -31,6 +31,8 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import static com.codahale.metrics.MetricRegistry.name; @@ -41,9 +43,14 @@ public class TransactionManager { private static final Logger logger = LoggerFactory.getLogger(TransactionManager.class); private final ConcurrentMap<String, UnmanagedTransaction> transactions = new ConcurrentHashMap<>(); + // Absolute-lifetime timers, one per transaction with the cap enabled, keyed by transaction id. The manager owns the + // lifetime cap because it is scoped to the transaction's existence in the registry (a single fixed schedule), unlike + // the activity-driven idle timer that the transaction must own to see its executor's running/idle transitions. + private final ConcurrentMap<String, ScheduledFuture<?>> lifetimeTimers = new ConcurrentHashMap<>(); private final ScheduledExecutorService scheduledExecutorService; private final GraphManager graphManager; private final long idleTransactionTimeoutMs; + private final long maxTransactionLifetimeMs; private final int maxConcurrentTransactions; private final long perGraphCloseMs; @@ -53,22 +60,25 @@ public class TransactionManager { * @param scheduledExecutorService Scheduler for timeout management * @param graphManager The graph manager for accessing traversal sources * @param idleTransactionTimeoutMs Inactivity timeout in milliseconds before auto-rollback; {@code 0} disables it + * @param maxTransactionLifetimeMs Absolute cap in milliseconds on total transaction age; {@code 0} disables it * @param maxConcurrentTransactions Maximum number of concurrent transactions allowed */ public TransactionManager(final ScheduledExecutorService scheduledExecutorService, final GraphManager graphManager, final long idleTransactionTimeoutMs, + final long maxTransactionLifetimeMs, final int maxConcurrentTransactions, final long perGraphCloseMs) { this.scheduledExecutorService = scheduledExecutorService; this.graphManager = graphManager; this.idleTransactionTimeoutMs = idleTransactionTimeoutMs; + this.maxTransactionLifetimeMs = maxTransactionLifetimeMs; this.maxConcurrentTransactions = maxConcurrentTransactions; this.perGraphCloseMs = perGraphCloseMs; MetricManager.INSTANCE.getGauge(transactions::size, name(GremlinServer.class, "transactions")); - logger.info("TransactionManager initialized with idleTransactionTimeout={}ms, maxTransactions={}", - idleTransactionTimeoutMs, maxConcurrentTransactions); + logger.info("TransactionManager initialized with idleTransactionTimeout={}ms, maxTransactionLifetime={}ms, maxTransactions={}", + idleTransactionTimeoutMs, maxTransactionLifetimeMs, maxConcurrentTransactions); } /** @@ -105,6 +115,11 @@ public class TransactionManager { */ void destroy(final String id) { transactions.remove(id); + // Cancel this transaction's lifetime cap (if any) so the one-shot cannot fire after the transaction is gone. + // Covers every close path uniformly (commit, rollback, idle reclaim, and the cap firing itself, where + // cancelling the already-running one-shot is a harmless no-op). + final ScheduledFuture<?> lifetimeTimer = lifetimeTimers.remove(id); + if (lifetimeTimer != null) lifetimeTimer.cancel(false); } /** @@ -112,13 +127,13 @@ public class TransactionManager { * {@link UnmanagedTransaction} is inserted into the transactions map. */ private UnmanagedTransaction createTransactionContext(final String traversalSourceName, final Graph graph) { - String txId; - UnmanagedTransaction ctx; + String transactionId; + UnmanagedTransaction transaction; do { - txId = UUID.randomUUID().toString(); - ctx = new UnmanagedTransaction( - txId, + transactionId = UUID.randomUUID().toString(); + transaction = new UnmanagedTransaction( + transactionId, this, traversalSourceName, graph, @@ -126,9 +141,21 @@ public class TransactionManager { idleTransactionTimeoutMs, perGraphCloseMs ); - } while (transactions.putIfAbsent(txId, ctx) != null); + } while (transactions.putIfAbsent(transactionId, transaction) != null); + + // Schedule the absolute lifetime cap only AFTER the transaction is registered above. The cap's teardown + // (onLifetimeCap -> close(false)) early-returns if the manager does not yet know about the transaction, so + // scheduling it before registration could let a pathologically small cap fire into nothing and leave an + // unreclaimable transaction holding a worker thread and slot. Scheduling after registration guarantees the cap + // can always tear the transaction down; destroy() cancels it on every close path. The clock effectively starts + // at construction (registration follows within microseconds), so it bounds total transaction age including begin. + if (maxTransactionLifetimeMs > 0) { + final UnmanagedTransaction registered = transaction; // effectively-final copy for the scheduled method ref + lifetimeTimers.put(transactionId, scheduledExecutorService.schedule( + registered::onLifetimeCap, maxTransactionLifetimeMs, TimeUnit.MILLISECONDS)); + } - return ctx; + return transaction; } /** @@ -174,6 +201,10 @@ public class TransactionManager { }); transactions.clear(); + // Each close(false) above already cancelled its lifetime timer via destroy(); cancel any stragglers (e.g. a + // transaction whose close threw) so no scheduled cap outlives the manager. + lifetimeTimers.values().forEach(timer -> timer.cancel(false)); + lifetimeTimers.clear(); logger.info("TransactionManager shutdown complete"); } } diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransaction.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransaction.java index c462dfa41b..25bd36ee5c 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransaction.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransaction.java @@ -18,6 +18,7 @@ */ package org.apache.tinkerpop.gremlin.server.transaction; +import org.apache.tinkerpop.gremlin.server.Context; import org.apache.tinkerpop.gremlin.structure.Graph; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +62,15 @@ public class UnmanagedTransaction { private final long idleTimeout; private final long perGraphClose; private final AtomicReference<ScheduledFuture<?>> idleFuture = new AtomicReference<>(); + /** + * The operation currently executing on the worker thread (or most recently submitted) paired with its request + * {@link Context}, held as a single immutable {@link Running} so the lifetime cap reads a consistent pair — it can + * never flag one operation's {@code Context} while interrupting another's {@link Future}. The future is the exact + * same object the per-request evaluation timeout cancels in the handler. Set in {@link #submit} and compare-and- + * cleared in {@link SingleThreadTransactionExecutor#afterExecute} so a fast next operation is not un-tracked by the + * previous one's completion. + */ + private final AtomicReference<Running> current = new AtomicReference<>(); // Controls whether the executor is still accepting tasks. private final AtomicBoolean accepting = new AtomicBoolean(true); /** @@ -173,17 +183,28 @@ public class UnmanagedTransaction { * error handling, and response writing. * * @param task The FutureTask to execute on the transaction thread + * @param context The request context driving this task, recorded so the lifetime cap can flag it before + * interrupting; may be {@code null} for internal operations (such as the begin's tx open) where no + * user-facing error needs to be tailored * @return Future that can be used for timeout cancellation * @throws IllegalStateException if the transaction is closed */ - public Future<?> submit(final FutureTask<Void> task) { + public Future<?> submit(final FutureTask<Void> task, final Context context) { if (!accepting.get()) throw new IllegalStateException("Transaction " + transactionId + " is closed"); // Insurance backstop: cancel (do NOT arm) the idle timer on submit. Arming is the executor's job, done in // afterExecute once the worker parks with an empty queue. beforeExecute will also cancel when the task starts; // cancelling here too closes the small window between accepting a task and the worker picking it up. cancelIdleTimer(); - return executor.submit(task); + + // Track the running operation BEFORE dispatching it, so the lifetime cap can never miss a worker that starts + // running between dispatch and tracking. The submitted FutureTask is itself the Future we track and return: + // cancel(true) on it interrupts the real work (the same future the handler's evaluation timeout cancels), and + // afterExecute receives this same object to compare-and-clear. Pairing the future with its Context in one + // immutable Running means the cap always reads a consistent pair (never flags op1 while interrupting op2). + current.set(new Running(task, context)); + executor.execute(task); + return task; } /** @@ -230,6 +251,55 @@ public class UnmanagedTransaction { if (!accepting.get()) cancelIdleTimer(); } + /** + * Forcibly tears the transaction down because it has reached its absolute lifetime cap. Invoked by the + * {@link TransactionManager}'s lifetime timer (the manager owns scheduling and cancelling that timer; this method is + * the behavior it triggers). Unlike the idle timer, the cap fires regardless of activity, so it may interrupt an + * operation that is still running. + * <p> + * It flags the running operation's {@link Context} <em>before</em> interrupting it so that, as the interrupt unwinds + * the operation on the worker thread, the error it reports is an accurate transaction-timeout (504) rather than the + * generic evaluation timeout. It then interrupts only the currently-running operation via + * {@link Future#cancel(boolean) cancel(true)} (any siblings already queued behind it continue to fail fast with a + * 404 via the destroy-before-shutdown guard in {@link #close(boolean)}), and finally runs {@code close(false)} to + * roll back and tear the transaction down. Logged at {@code warn} because this is a forced teardown of active work. + */ + void onLifetimeCap() { + // Read the running (future, context) pair once, as a unit, so the Context we flag always belongs to the same + // operation whose future we interrupt. + final Running running = current.get(); + if (running != null) { + if (running.context != null) running.context.setClosedByLifetimeCap(true); // flag BEFORE interrupting + running.future.cancel(true); // interrupt only the running op + } + + logger.warn("Transaction {} exceeded its maximum lifetime and is being closed", transactionId); + close(false); + } + + /** + * Compare-and-clears the tracked running operation once it completes. Only clears when the completed future is still + * the one tracked, so a fast next operation submitted between this one finishing and this clearing is not lost. + */ + private void clearCurrentExecution(final Future<?> completed) { + current.updateAndGet(running -> (running != null && running.future == completed) ? null : running); + } + + /** + * An in-flight operation paired with the request {@link Context} that drove it, tracked as one immutable unit so the + * lifetime cap reads a consistent pair. {@code context} may be {@code null} for internal operations (e.g. begin's tx + * open) that need no tailored client error. + */ + private static final class Running { + private final Future<?> future; + private final Context context; + + private Running(final Future<?> future, final Context context) { + this.future = future; + this.context = context; + } + } + /** * A single-threaded {@link ThreadPoolExecutor} (one core and max thread) that runs all operations for a single * transaction on the same worker thread, preserving the ThreadLocal nature of graph transactions. @@ -255,6 +325,12 @@ public class UnmanagedTransaction { @Override protected void afterExecute(final Runnable r, final Throwable t) { super.afterExecute(r, t); + // For operations submitted via submit(), r is the FutureTask that submit() executed and tracked in + // `current`, so compare-and-clear by identity un-tracks the operation that just finished without disturbing + // a faster sibling that may already have replaced it. Other tasks that complete here (e.g. close()'s + // rollback, which ThreadPoolExecutor also wraps in a Future) simply will not match the tracked future, so + // the compare-and-clear is a safe no-op for them. + if (r instanceof Future) clearCurrentExecution((Future<?>) r); maybeScheduleIdleTimer(); } } diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java index ab6be093e2..b8ff46ab59 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java @@ -307,6 +307,7 @@ public class ServerGremlinExecutor { scheduledExecutorService, graphManager, settings.idleTransactionTimeout, + settings.maxTransactionLifetime, settings.maxConcurrentTransactions, settings.perGraphCloseTimeout ); diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java index c9a05b82ab..4d1ec785c3 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java @@ -109,6 +109,17 @@ public class GremlinServerHttpTransactionIntegrateTest extends AbstractGremlinSe // Short idle timeout, but a single long operation must NOT trip it (idle suspended while busy). settings.idleTransactionTimeout = 500; break; + case "shouldReclaimTransactionExceedingMaxLifetime": + // Short absolute cap with the idle timer disabled, so only the lifetime cap can reclaim the transaction. + settings.idleTransactionTimeout = 0; + settings.maxTransactionLifetime = 800; + break; + case "shouldHonorPerRequestTimeoutMsZeroInTransaction": + // Both transaction timers disabled: a per-request timeoutMs of 0 is honored (not silently overridden), + // so the operation is bounded only by its own request, exactly as on the non-transactional path. + settings.idleTransactionTimeout = 0; + settings.maxTransactionLifetime = 0; + break; case "shouldRejectMismatchedGraphAliasInTransaction": { final Settings.GraphSettings gs = new Settings.GraphSettings(); gs.configuration = "conf/tinkertransactiongraph-empty.properties"; @@ -573,6 +584,57 @@ public class GremlinServerHttpTransactionIntegrateTest extends AbstractGremlinSe } } + @Test + public void shouldReclaimTransactionExceedingMaxLifetime() throws Exception { + // With a short absolute cap (800ms) and the idle timer disabled, a transaction that simply stays open past the + // cap must be reclaimed: the lifetime cap rolls it back and removes it, so a later request gets a 404. The + // deterministic mid-operation interrupt and the 504 it yields are covered by the unit tests; here we assert the + // guarantee we actually make end-to-end -- the transaction (and its slot) is reclaimed on time. + final String txId = beginTx(client, GTX); + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.addV()", GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + EntityUtils.consume(r.getEntity()); + } + + // wait for the absolute cap to fire + Thread.sleep(1500); + + // The transaction is gone: a subsequent request on it is rejected as not found. + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.V().count()", GTX)) { + assertEquals(404, r.getStatusLine().getStatusCode()); + assertTrue(extractStatusMessage(r).contains("Transaction not found")); + } + + // and the addV was rolled back, not persisted. + try (final CloseableHttpResponse r = submitNonTx(client, "g.V().count()", GTX)) { + assertEquals(0, extractCount(r)); + } + } + + @Test + public void shouldHonorPerRequestTimeoutMsZeroInTransaction() throws Exception { + // With both transaction timers disabled, a per-request timeoutMs of 0 is honored rather than overridden: the + // server does not reject the begin or silently substitute another timeout - the operation runs as requested. + // (Server-side defaults keep transactions bounded out of the box; disabling them is a deliberate operator + // choice and must not turn into a client-facing failure.) + final String txId = beginTx(client, GTX); + + try (final CloseableHttpResponse r = postJson(client, + "{\"gremlin\":\"g.addV()\",\"g\":\"" + GTX + + "\",\"transactionId\":\"" + txId + "\",\"timeoutMs\":\"0\"}")) { + assertEquals(200, r.getStatusLine().getStatusCode()); + // The operation runs and returns a normal result body, with no timeout error: the request was honored, not + // bounded by a substituted timeout or rejected at begin. + final String body = EntityUtils.toString(r.getEntity()); + assertFalse(body.toLowerCase().contains("timeout")); + } + + try (final CloseableHttpResponse r = commitTx(client, txId, GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + EntityUtils.consume(r.getEntity()); + } + } + @Test public void shouldRejectMismatchedGraphAliasInTransaction() throws Exception { final String txId = beginTx(client, GTX); diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/SettingsTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/SettingsTest.java index 28b2cf50fd..98365f7c44 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/SettingsTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/SettingsTest.java @@ -144,4 +144,23 @@ public class SettingsTest { assertThat(settings.lifecycleHooks, is(notNullValue())); assertThat(settings.lifecycleHooks.isEmpty(), is(true)); } + + @Test + public void transactionTimeoutsDefaultToReasonableValuesWhenAbsentFromYaml() throws Exception { + final Settings settings = Settings.read(getMinimalConfigStream()); + + // Out of the box a transaction is bounded without any operator configuration: idle reclamation at 1 minute and + // an absolute lifetime cap at 10 minutes. + assertEquals(60000L, settings.idleTransactionTimeout); + assertEquals(600000L, settings.maxTransactionLifetime); + } + + @Test + public void maxTransactionLifetimeParsedFromYaml() throws Exception { + final InputStream stream = SettingsTest.class.getResourceAsStream("gremlin-server-integration.yaml"); + final Settings settings = Settings.read(stream); + + // Confirms a YAML-provided value overrides the code default (600000); the resource sets it to 480000. + assertEquals(480000L, settings.maxTransactionLifetime); + } } diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandlerTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandlerTest.java index 3058289f7b..04db116362 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandlerTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandlerTest.java @@ -21,13 +21,20 @@ package org.apache.tinkerpop.gremlin.server.handler; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.util.ReferenceCountUtil; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException; +import org.apache.tinkerpop.gremlin.server.Context; +import org.apache.tinkerpop.gremlin.server.util.GremlinError; +import org.apache.tinkerpop.gremlin.util.message.RequestMessage; import org.junit.Test; import static io.netty.util.CharsetUtil.UTF_8; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Unit tests for {@link HttpGremlinEndpointHandler#exceptionCaught}. When no {@link HttpResponseCoordinator} has been @@ -59,4 +66,35 @@ public class HttpGremlinEndpointHandlerTest { ReferenceCountUtil.release(response); channel.finishAndReleaseAll(); } + + @Test + public void shouldMapInterruptToTransactionTimeoutWhenClosedByLifetimeCap() { + // When the lifetime cap interrupts an operation, it first flags the request Context, so the interrupt that + // unwinds the operation must be reported as a transaction timeout (504), not the generic evaluation timeout. + final RequestMessage message = RequestMessage.build("g.V()").create(); + final Context ctx = mock(Context.class); + when(ctx.isClosedByLifetimeCap()).thenReturn(true); + when(ctx.getTransactionId()).thenReturn("tx-1234"); + + final GremlinError error = newHandler().formErrorResponseMessage( + new TraversalInterruptedException(), message, ctx); + + assertEquals(HttpResponseStatus.GATEWAY_TIMEOUT, error.getCode()); + assertEquals("TransactionException", error.getException()); + assertTrue(error.getMessage().contains("tx-1234")); + } + + @Test + public void shouldMapInterruptToEvaluationTimeoutWhenNotClosedByLifetimeCap() { + // An ordinary evaluation-timeout interrupt (cap flag unset) must keep the existing 500 timeout behavior. + final RequestMessage message = RequestMessage.build("g.V()").create(); + final Context ctx = mock(Context.class); + when(ctx.isClosedByLifetimeCap()).thenReturn(false); + + final GremlinError error = newHandler().formErrorResponseMessage( + new InterruptedException(), message, ctx); + + assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, error.getCode()); + assertEquals("ServerTimeoutExceededException", error.getException()); + } } diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManagerTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManagerTest.java new file mode 100644 index 0000000000..390bdf17a1 --- /dev/null +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManagerTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server.transaction; + +import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource; +import org.apache.tinkerpop.gremlin.server.GraphManager; +import org.apache.tinkerpop.gremlin.server.util.ManualScheduledExecutorService; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.Transaction; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link TransactionManager}'s ownership of the absolute lifetime cap: the manager schedules the cap + * timer after a transaction is registered and cancels it when the transaction is destroyed. The cap's <em>behavior</em> + * when it fires (interrupt + flag + close) is covered in {@code UnmanagedTransactionTest}; here a deterministic + * {@link ManualScheduledExecutorService} drives the scheduling/cancellation without wall-clock waits. + */ +public class TransactionManagerTest { + + private static final String SOURCE = "g"; + private static final long IDLE_DISABLED = 0L; + private static final long PER_GRAPH_CLOSE_MS = 10000L; + private static final long CAP_MS = 5000L; + private static final int MAX_CONCURRENT = 1000; + + private ManualScheduledExecutorService scheduler; + private GraphManager graphManager; + + @Before + public void setUp() { + scheduler = new ManualScheduledExecutorService(); + + // A traversal source whose graph supports transactions, so create() proceeds to build a transaction. + final Graph graph = mock(Graph.class, RETURNS_DEEP_STUBS); + when(graph.features().graph().supportsTransactions()).thenReturn(true); + final Transaction graphTx = mock(Transaction.class); + when(graph.tx()).thenReturn(graphTx); + when(graphTx.isOpen()).thenReturn(false); // rollback during close(false) is a no-op + + final TraversalSource ts = mock(TraversalSource.class); + when(ts.getGraph()).thenReturn(graph); + + graphManager = mock(GraphManager.class); + when(graphManager.getTraversalSource(SOURCE)).thenReturn(ts); + } + + private TransactionManager newManager(final long maxLifetimeMs) { + return new TransactionManager(scheduler, graphManager, IDLE_DISABLED, maxLifetimeMs, MAX_CONCURRENT, PER_GRAPH_CLOSE_MS); + } + + @Test + public void shouldNotScheduleLifetimeCapWhenDisabled() { + final TransactionManager manager = newManager(0L); // cap disabled + manager.create(SOURCE); + + // No lifetime timer is scheduled when the cap is disabled (idle is also disabled here, so nothing is scheduled). + assertEquals(0, scheduler.getScheduledTaskCount()); + assertEquals(0, scheduler.getPendingTaskCount()); + } + + @Test + public void shouldScheduleLifetimeCapAfterRegistrationWhenEnabled() { + final TransactionManager manager = newManager(CAP_MS); + manager.create(SOURCE); + + // The cap is scheduled exactly once, for the configured delay, as soon as the transaction is created. + assertEquals(1, scheduler.getScheduledTaskCount()); + assertEquals(1, scheduler.getPendingTaskCount()); + assertEquals(CAP_MS, scheduler.nextPendingDelayMillis()); + } + + @Test + public void shouldReclaimTransactionWhenLifetimeCapFires() { + final TransactionManager manager = newManager(CAP_MS); + final UnmanagedTransaction tx = manager.create(SOURCE); + assertEquals(1, manager.getActiveTransactionCount()); + + scheduler.advanceTimeBy(CAP_MS, TimeUnit.MILLISECONDS); + + // The cap fired and tore the transaction down: it is no longer tracked and its timer is gone. + assertEquals(0, manager.getActiveTransactionCount()); + assertEquals(0, scheduler.getPendingTaskCount()); + assertFalse(manager.get(tx.getTransactionId()).isPresent()); + } + + @Test + public void shouldCancelLifetimeCapWhenTransactionClosed() { + final TransactionManager manager = newManager(CAP_MS); + final UnmanagedTransaction tx = manager.create(SOURCE); + assertEquals(1, scheduler.getPendingTaskCount()); + + tx.close(true); // explicit close -> destroy() must cancel the pending cap + + assertEquals(0, scheduler.getPendingTaskCount()); + // Advancing past the cap must not resurrect a close on a transaction that is already gone. + scheduler.advanceTimeBy(CAP_MS * 2, TimeUnit.MILLISECONDS); + assertEquals(0, manager.getActiveTransactionCount()); + } + + @Test + public void shouldScheduleAnIndependentLifetimeCapPerTransaction() { + final TransactionManager manager = newManager(CAP_MS); + manager.create(SOURCE); + manager.create(SOURCE); + + // Each transaction gets its own one-shot cap timer. + assertEquals(2, scheduler.getScheduledTaskCount()); + assertEquals(2, scheduler.getPendingTaskCount()); + assertEquals(2, manager.getActiveTransactionCount()); + } +} diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransactionTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransactionTest.java index 81a8a65949..a5031d81cf 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransactionTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransactionTest.java @@ -18,6 +18,7 @@ */ package org.apache.tinkerpop.gremlin.server.transaction; +import org.apache.tinkerpop.gremlin.server.Context; import org.apache.tinkerpop.gremlin.server.util.ManualScheduledExecutorService; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.Transaction; @@ -82,7 +83,7 @@ public class UnmanagedTransactionTest { * Submits a no-op task and blocks until it has finished running on the worker thread. */ private void runOp() throws Exception { - tx.submit(new FutureTask<>(() -> null)).get(AWAIT_MS, TimeUnit.MILLISECONDS); + tx.submit(new FutureTask<>(() -> null), null).get(AWAIT_MS, TimeUnit.MILLISECONDS); } /** @@ -123,7 +124,7 @@ public class UnmanagedTransactionTest { started.countDown(); release.await(); return null; - })); + }), null); assertTrue(started.await(AWAIT_MS, MILLISECONDS)); // While the op runs, the idle timer must not be armed (a long op must not trip the idle timeout). @@ -147,7 +148,7 @@ public class UnmanagedTransactionTest { started.countDown(); release.await(); return null; - })); + }), null); assertTrue(started.await(AWAIT_MS, MILLISECONDS)); scheduler.advanceTimeBy(TIMEOUT_MS * 2, MILLISECONDS); @@ -200,7 +201,7 @@ public class UnmanagedTransactionTest { final UnmanagedTransaction disabledTx = new UnmanagedTransaction(TX_ID, manager, "g", graph, scheduler, 0L, PER_GRAPH_CLOSE_MS); - disabledTx.submit(new FutureTask<>(() -> null)).get(AWAIT_MS, TimeUnit.MILLISECONDS); + disabledTx.submit(new FutureTask<>(() -> null), null).get(AWAIT_MS, TimeUnit.MILLISECONDS); awaitPendingTimer(false); assertEquals(0, scheduler.getPendingTaskCount()); @@ -231,8 +232,6 @@ public class UnmanagedTransactionTest { assertEquals(0, scheduler.getPendingTaskCount()); } - // ---- Step 2: SingleThreadTransactionExecutor invariants (executor swap) ---- - @Test public void shouldRunSubmittedTasksOnASingleNamedTransactionThreadInOrder() throws Exception { final List<String> executionOrder = new CopyOnWriteArrayList<>(); @@ -245,7 +244,7 @@ public class UnmanagedTransactionTest { threadNames.add(Thread.currentThread().getName()); executionOrder.add("task-" + n); return null; - })); + }), null); } last.get(5, TimeUnit.SECONDS); // FIFO single thread: the last task completing means all ran @@ -275,7 +274,7 @@ public class UnmanagedTransactionTest { unexpected.set(t); } return null; - })); + }), null); assertTrue("task did not start", started.await(5, TimeUnit.SECONDS)); running.cancel(true); @@ -288,4 +287,94 @@ public class UnmanagedTransactionTest { assertTrue("cancel(true) did not interrupt the running task", interrupted.get()); assertEquals(null, unexpected.get()); } + + @Test + public void shouldCloseTransactionWhenLifetimeCapFiresWhileIdle() { + // The lifetime timer itself is scheduled/cancelled by the TransactionManager (see TransactionManagerTest); this + // and the other onLifetimeCap() tests cover what the cap does when it fires, by invoking it directly as the + // manager's timer would. Here: the cap tears the transaction down even when nothing is running. + tx.onLifetimeCap(); + + verify(manager).destroy(TX_ID); + } + + @Test + public void shouldInterruptRunningOperationAndFlagContextWhenLifetimeCapFires() throws Exception { + final Context ctx = mock(Context.class); + final CountDownLatch started = new CountDownLatch(1); + final AtomicBoolean interrupted = new AtomicBoolean(false); + tx.submit(new FutureTask<>(() -> { + started.countDown(); + try { + Thread.sleep(30000); // block until the cap interrupts it + } catch (InterruptedException e) { + interrupted.set(true); + } + return null; + }), ctx); + assertTrue("operation did not start", started.await(AWAIT_MS, MILLISECONDS)); + + tx.onLifetimeCap(); + + // The cap flagged the running request's Context (before interrupting) so the unwinding op reports a 504, + // interrupted the running operation, and tore the transaction down. + verify(ctx).setClosedByLifetimeCap(true); + final long deadline = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(AWAIT_MS); + while (!interrupted.get() && System.nanoTime() < deadline) { + Thread.sleep(10); + } + assertTrue("lifetime cap did not interrupt the running operation", interrupted.get()); + verify(manager).destroy(TX_ID); + } + + @Test + public void shouldFlagAndInterruptTheSameOperationWhenLifetimeCapFires() throws Exception { + // Guards against a mismatched (future, context) pair: the cap must flag the Context of the very operation whose + // future it interrupts. op1 completes (clearing its tracking), then op2 runs; when the cap fires it must flag + // op2's Context and never op1's, and interrupt op2. + final Context ctx1 = mock(Context.class); + final Context ctx2 = mock(Context.class); + + tx.submit(new FutureTask<>(() -> null), ctx1).get(AWAIT_MS, MILLISECONDS); // op1 runs to completion + + final CountDownLatch started = new CountDownLatch(1); + final AtomicBoolean interrupted = new AtomicBoolean(false); + tx.submit(new FutureTask<>(() -> { + started.countDown(); + try { + Thread.sleep(30000); + } catch (InterruptedException e) { + interrupted.set(true); + } + return null; + }), ctx2); // op2 is the running op when the cap fires + assertTrue("op2 did not start", started.await(AWAIT_MS, MILLISECONDS)); + + tx.onLifetimeCap(); + + verify(ctx2).setClosedByLifetimeCap(true); + verify(ctx1, never()).setClosedByLifetimeCap(true); + final long deadline = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(AWAIT_MS); + while (!interrupted.get() && System.nanoTime() < deadline) { + Thread.sleep(10); + } + assertTrue("the running op (op2) was not interrupted", interrupted.get()); + } + + @Test + public void shouldClearTrackedExecutionAfterOperationCompletesSoLaterCapFlagsNothing() throws Exception { + // Compare-and-clear guard: once an operation completes its tracking is cleared, so a cap firing while the + // transaction is idle finds no running op to flag/interrupt. The tracking is cleared on the worker thread in + // afterExecute, which races a bare get() on the completed future; the executor is FIFO with one worker, so a + // second submitted-and-awaited op guarantees the first op's afterExecute (and thus its clear) has already run. + final Context ctx = mock(Context.class); + tx.submit(new FutureTask<>(() -> null), ctx); // op1 tracks ctx + tx.submit(new FutureTask<>(() -> null), null).get(AWAIT_MS, MILLISECONDS); // op2 awaited -> op1 cleared + + // The cap still closes the transaction, but op1's Context was cleared (and op2 carried none), so nothing is + // flagged: a quiet, completed transaction reaching its cap reports no in-flight cap-kill. + tx.onLifetimeCap(); + verify(ctx, never()).setClosedByLifetimeCap(true); + verify(manager).destroy(TX_ID); + } } diff --git a/gremlin-server/src/test/resources/org/apache/tinkerpop/gremlin/server/gremlin-server-integration.yaml b/gremlin-server/src/test/resources/org/apache/tinkerpop/gremlin/server/gremlin-server-integration.yaml index be135f03c7..ff84e4a5e0 100644 --- a/gremlin-server/src/test/resources/org/apache/tinkerpop/gremlin/server/gremlin-server-integration.yaml +++ b/gremlin-server/src/test/resources/org/apache/tinkerpop/gremlin/server/gremlin-server-integration.yaml @@ -31,6 +31,9 @@ host: 0.0.0.0 port: 45940 evaluationTimeout: 30000 +# Set below the code default (600000) so SettingsTest can confirm a YAML-provided value is honored; still far larger +# than any integration test's runtime (8 minutes), so it never fires during a test. +maxTransactionLifetime: 480000 graphs: { graph: { configuration: conf/tinkergraph-empty.properties,
