This is an automated email from the ASF dual-hosted git repository. kenhuuu pushed a commit to branch closuretx in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit dc154d0a3a11f818e10afba46ff7ca9e991a5fa6 Author: Ken Hu <[email protected]> AuthorDate: Tue Jun 16 13:25:12 2026 -0700 Add transaction closure methods to GLVs Adds executeInTx/evaluateInTx to GraphTraversalSource across all GLVs to wrap the begin/commit/rollback lifecycle, so a lambda receives the transaction-bound g and is auto-committed on success or rolled back on error. The methods live on GraphTraversalSource rather than on the Transaction returned by g.tx() because the closure is a Traversal-API convenience; hosting it on the Transaction would hand Driver-API users (who submit strings) a traversal source and mix the two APIs. Keeping it on g also lets Java's existing g.tx() routing cover both embedded and remote with no interface changes. The value-returning form is named evaluateInTx rather than call because call already exists on GraphTraversalSource as the service step, and the InTx suffix keeps the transactional intent clear now that the methods no longer sit under tx(). The surface follows each language's idiom rather than forcing uniformity: dynamic languages expose a single method, while statically typed languages get a void/value pair. Go returns interface{} instead of using generics, to match the driver's existing untyped result API. Behavior is single-shot with no retry, since Gremlin Server has no standardized retriable-error signal across providers. On commit failure a rollback is still attempted for server-side resource hygiene, but the original error stays primary and secondary cleanup failures are only logged. gtx.tx() still returns the same transaction so the commit path keeps working; only opening a second transaction errors. Assisted-by: Claude Code:claude-opus-4-8 --- CHANGELOG.asciidoc | 1 + docs/src/reference/gremlin-variants.asciidoc | 121 +++++++++++++ docs/src/upgrade/release-4.x.x.asciidoc | 27 +++ .../traversal/dsl/graph/GraphTraversalSource.java | 93 ++++++++++ .../Process/Traversal/GraphTraversalSource.cs | 105 +++++++++++ .../Driver/TransactionTests.cs | 121 +++++++++++++ gremlin-go/driver/graphTraversalSource.go | 106 ++++++++++++ gremlin-go/driver/logger.go | 1 + .../driver/resources/logger-messages/en.json | 3 +- gremlin-go/driver/transaction.go | 13 ++ gremlin-go/driver/transaction_test.go | 191 ++++++++++++++++++++- .../lib/process/graph-traversal.ts | 68 ++++++++ .../test/integration/transaction-tests.js | 114 ++++++++++++ .../gremlin_python/process/graph_traversal.py | 53 ++++++ .../tests/integration/driver/test_transaction.py | 86 ++++++++++ .../GremlinDriverTransactionIntegrateTest.java | 125 ++++++++++++++ .../gremlin/structure/TransactionTest.java | 88 ++++++++++ 17 files changed, 1313 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 31fc242bf6..34d061fc13 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -31,6 +31,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima * Added `NextN(n)` to `Traversal` in `gremlin-go` for batched result iteration, providing API parity with `next(n)` in the Java, Python, and .NET GLVs, and updated the Go translators in `gremlin-core` and `gremlin-javascript` to emit `NextN(n)` for the batched form. * Added Gremlator, a single page web application, that translates Gremlin into various programming languages like Javascript and Python. * Added explicit transaction support to all non-Java GLVs (gremlin-python, gremlin-go, gremlin-javascript, gremlin-dotnet). +* Added transaction closure methods (`executeInTx`/`evaluateInTx`) on `GraphTraversalSource` in all GLVs that wrap the begin/commit/rollback lifecycle, so a lambda receives the transaction-bound `g` and is auto-committed on success or rolled back on error. * Changed default transaction close behavior from commit to rollback across all GLVs to align with embedded graph defaults. * Refactored Go driver connection to block until response headers arrive, enabling synchronous error returns and proper transaction ordering. * Removed `uuid` dependency from `gremlin-javascript` in favor of the built-in `globalThis.crypto.randomUUID()`. diff --git a/docs/src/reference/gremlin-variants.asciidoc b/docs/src/reference/gremlin-variants.asciidoc index 27924fa360..562f3b6fe3 100644 --- a/docs/src/reference/gremlin-variants.asciidoc +++ b/docs/src/reference/gremlin-variants.asciidoc @@ -311,6 +311,34 @@ err = tx.Commit() if err != nil { log.Fatal(err) } ---- +==== Transaction Closures + +To avoid the manual begin/commit/rollback boilerplate, `g` provides closure methods that manage the +lifecycle: the closure receives the transaction-bound `gtx` and the transaction is committed when it returns a `nil` +error or rolled back if it returns an error (or panics). `ExecuteInTx` is for the common no-result case; `EvaluateInTx` +returns the value the body produces as `interface{}`, which the caller type-asserts: + +[source,go] +---- +g := gremlingo.Traversal_().With(remote) + +// ExecuteInTx: no return value +err := g.ExecuteInTx(func(gtx *gremlingo.GraphTraversalSource) error { + _, e := gtx.AddV("person").Property("name", "alice").Iterate() + return <-e +}) + +// EvaluateInTx: returns the body's value as interface{} +v, err := g.EvaluateInTx(func(gtx *gremlingo.GraphTraversalSource) (interface{}, error) { + return gtx.V().Count().Next() +}) +count := v.(int64) +---- + +The transaction runs exactly once (no automatic retry). The body's error is returned after the rollback; if the commit +fails, a rollback is still attempted and the commit error is returned. A panic in the body rolls back and then +re-panics. + ==== Driver API [source,go] @@ -968,6 +996,31 @@ try { Traversals spawned from `gtx` are bound to the transaction. The driver handles host pinning and transaction ID propagation automatically. +==== Transaction Closures + +To avoid the manual begin/commit/rollback boilerplate, `g` also provides closure methods that manage the +transaction lifecycle. The closure receives the transaction-bound `gtx`, and the transaction is committed when the +closure completes normally or rolled back if it throws. Use `executeInTx` when the body returns nothing and +`evaluateInTx` when it returns a value: + +[source,java] +---- +GraphTraversalSource g = traversal().with(DriverRemoteConnection.using("localhost", 8182, "g")); + +// executeInTx: no return value +g.executeInTx(gtx -> { + gtx.addV("person").property("name", "jorge").iterate(); + gtx.addV("person").property("name", "josh").iterate(); +}); + +// evaluateInTx: returns the body's value +long count = g.evaluateInTx(gtx -> gtx.V().count().next()); +---- + +The transaction runs exactly once (no automatic retry). If the closure throws, the original exception is re-thrown +after the rollback; if the commit fails, a rollback is still attempted to release server-side resources and the commit +error propagates. These methods are also available on embedded graphs that support transactions. + ==== Driver API For script-based usage or when working with the `Client` API directly, transactions can be created from the `Cluster`: @@ -1813,6 +1866,29 @@ await gtx.addV("person").property("name", "josh").iterate(); await tx.commit(); ---- +==== Transaction Closures + +To avoid the manual begin/commit/rollback boilerplate, `g` provides an `executeInTx` method that manages the +lifecycle. The callback receives the transaction-bound `gtx` and may be `async`; the transaction is committed when the +callback resolves or rolled back if it throws. `executeInTx` resolves to whatever the callback returns: + +[source,javascript] +---- +const g = traversal().with_(new DriverRemoteConnection('http://localhost:8182/gremlin')); + +// no return value +await g.executeInTx(async (gtx) => { + await gtx.addV("person").property("name", "jorge").iterate(); + await gtx.addV("person").property("name", "josh").iterate(); +}); + +// returns the callback's value +const count = await g.executeInTx((gtx) => gtx.V().count().next()); +---- + +The transaction runs exactly once (no automatic retry). If the callback rejects, the original error is re-thrown after +the rollback; if the commit fails, a rollback is still attempted and the commit error propagates. + ==== Driver API [source,javascript] @@ -2371,6 +2447,31 @@ await gtx.AddV("person").Property("name", "josh").Promise(t => t.Iterate()); await tx.CommitAsync(); ---- +==== Transaction Closures + +To avoid the manual begin/commit/rollback boilerplate, `g` provides closure methods that manage the +lifecycle. The callback receives the transaction-bound `gtx` and the transaction is committed when it completes or +rolled back if it throws. Use `ExecuteInTxAsync` when the body returns nothing and `EvaluateInTxAsync` when it returns +a value; both accept an optional `CancellationToken`: + +[source,csharp] +---- +var g = AnonymousTraversalSource.Traversal().With(new DriverRemoteConnection("localhost", 8182)); + +// ExecuteInTxAsync: no return value +await g.ExecuteInTxAsync(async gtx => +{ + await gtx.AddV("person").Property("name", "jorge").Promise(t => t.Iterate()); + await gtx.AddV("person").Property("name", "josh").Promise(t => t.Iterate()); +}); + +// EvaluateInTxAsync: returns the body's value +var count = await g.EvaluateInTxAsync(gtx => gtx.V().Count().Promise(t => t.Next())); +---- + +The transaction runs exactly once (no automatic retry). If the callback throws, the original exception is re-thrown +after the rollback; if the commit fails, a rollback is still attempted and the commit error propagates. + ==== Driver API [source,csharp] @@ -2907,6 +3008,26 @@ gtx.addV('person').property('name', 'josh').iterate() tx.commit() ---- +==== Transaction Closures + +To avoid the manual begin/commit/rollback boilerplate, `g` provides an `execute_in_tx` method that manages the +lifecycle. The function receives the transaction-bound `gtx` and the transaction is committed when it returns or rolled +back if it raises. `execute_in_tx` returns whatever the function returns: + +[source,python] +---- +g = traversal().with_remote(DriverRemoteConnection('http://localhost:8182/gremlin', 'g')) + +# no return value +g.execute_in_tx(lambda gtx: gtx.addV('person').property('name', 'jorge').iterate()) + +# returns the function's value +count = g.execute_in_tx(lambda gtx: gtx.V().count().next()) +---- + +The transaction runs exactly once (no automatic retry). If the function raises, the original exception is re-raised +after the rollback; if the commit fails, a rollback is still attempted and the commit error propagates. + ==== Driver API [source,python] diff --git a/docs/src/upgrade/release-4.x.x.asciidoc b/docs/src/upgrade/release-4.x.x.asciidoc index 4706775c7f..6a2eb20e26 100644 --- a/docs/src/upgrade/release-4.x.x.asciidoc +++ b/docs/src/upgrade/release-4.x.x.asciidoc @@ -62,6 +62,33 @@ Key behaviors consistent across all GLVs: See the <<gremlin-drivers-variants,Gremlin Drivers and Variants>> reference documentation for language-specific syntax and examples. +==== Transaction Closures + +In addition to the manual `begin()`/`commit()`/`rollback()` lifecycle, every GLV (including the Java driver and embedded +Java graphs) now offers closure-based convenience methods directly on `GraphTraversalSource` (`g`). You hand `g` a +function that receives the transaction-bound `g` (`gtx`), and the transaction lifecycle is managed for you: the +transaction is begun, your function runs, and it is committed on normal completion or rolled back if your function +raises. This reduces the common begin/do-work/commit boilerplate and makes it harder to accidentally run a traversal +against the non-transactional `g`, since only the transactional source is in scope inside the closure. The methods are a +Traversal-API convenience (they live on `g`, not on the `Transaction` returned by `g.tx()`). + +The method naming follows each language's idiom (the value-returning form is `evaluateInTx` rather than `call`, because +`call` is already the `call()` service step on `GraphTraversalSource`): + +* **Java**: `g.executeInTx(Consumer)` (no return) and `g.evaluateInTx(Function)` (returns the body's value). +* **Python**: `g.execute_in_tx(fn)` — a single method that returns whatever the function returns. +* **JavaScript**: `await g.executeInTx(fn)` — the callback may be `async`; resolves to the callback's return value. +* **.NET**: `g.ExecuteInTxAsync(Func<..,Task>)` and `g.EvaluateInTxAsync<T>(Func<..,Task<T>>)`, both accepting an optional `CancellationToken`. +* **Go**: `g.ExecuteInTx(func(*GraphTraversalSource) error) error` and `g.EvaluateInTx(func(*GraphTraversalSource) (interface{}, error)) (interface{}, error)`. + +The closure runs the transaction exactly once (no automatic retry). If the function fails, the original error is +re-raised after rollback; if `commit()` fails, the commit error is raised and a rollback is still attempted to release +server-side resources. The manual `begin()`/`commit()`/`rollback()` API remains available and unchanged for advanced +use. See the <<gremlin-drivers-variants,Gremlin Drivers and Variants>> reference documentation for language-specific +examples. + +See: link:https://issues.apache.org/jira/browse/TINKERPOP-3253[TINKERPOP-3253] + ==== Transaction Default Close Behavior Changed The default behavior of `close()` on a remote transaction has been changed from `commit` to `rollback` across all diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.java index 684d9e8609..002bdcd79a 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.java @@ -50,12 +50,16 @@ import org.apache.tinkerpop.gremlin.structure.Transaction; import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.util.StringFactory; import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; import java.util.function.BinaryOperator; +import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Supplier; import java.util.function.UnaryOperator; @@ -68,6 +72,8 @@ import java.util.function.UnaryOperator; * @author Stephen Mallette (http://stephen.genoprime.com) */ public class GraphTraversalSource implements TraversalSource { + private static final Logger LOGGER = LoggerFactory.getLogger(GraphTraversalSource.class); + protected transient RemoteConnection connection; protected final Graph graph; protected TraversalStrategies strategies; @@ -718,6 +724,93 @@ public class GraphTraversalSource implements TraversalSource { return this.connection.tx(); } + /** + * Runs the supplied unit of work inside a single transaction, managing the transaction lifecycle automatically. + * <p> + * This is the no-return (action) form of {@link #evaluateInTx(Function)}. It is a thin convenience wrapper that + * obtains a {@link Transaction} via {@link #tx()}, {@link Transaction#begin() begins} it, invokes {@code txWork} + * with the transaction-bound {@link GraphTraversalSource} ({@code gtx}), and then {@link Transaction#commit() + * commits} on normal completion. If {@code txWork} throws, the transaction is {@link Transaction#rollback() rolled + * back} and the original error is re-thrown unchanged. Because the lifecycle is driven through {@link #tx()}, the + * underlying transaction semantics (embedded thread-bound vs. remote server session) are whatever the underlying + * {@code begin()}/{@code commit()}/{@code rollback()} provide. + * <p> + * This is a <strong>single-shot</strong> operation - exactly one attempt is made, with no automatic retry. The + * lambda receives the transactional {@code gtx} and should issue its traversals against that source only. + * + * @param txWork the unit of work to run against the transaction-bound {@link GraphTraversalSource} + * @see #evaluateInTx(Function) + */ + public void executeInTx(final Consumer<GraphTraversalSource> txWork) { + evaluateInTx(gtx -> { + txWork.accept(gtx); + return null; + }); + } + + /** + * Runs the supplied unit of work inside a single transaction, managing the transaction lifecycle automatically, + * and returns the value the work produces. + * <p> + * This wrapper obtains a {@link Transaction} via {@link #tx()}, {@link Transaction#begin() begins} it, invokes + * {@code txWork} with the transaction-bound {@link GraphTraversalSource} ({@code gtx}), and then + * {@link Transaction#commit() commits} on normal completion, returning the value computed by {@code txWork}. + * Error handling: + * <ul> + * <li>If {@code txWork} throws, the transaction is {@link Transaction#rollback() rolled back} and the exact + * original error is re-thrown to the caller. If that rollback itself fails, the rollback failure is attached + * to the original error via {@link Throwable#addSuppressed(Throwable)} and a warning is logged, but the + * original error still propagates as the primary error.</li> + * <li>If {@link Transaction#commit() commit} fails, a {@link Transaction#rollback() rollback} is attempted + * afterward (to avoid leaving transaction resources tied up on the server), and the commit error is re-thrown + * as the primary error. If the follow-up rollback also fails, the rollback failure is attached to the commit + * error via {@link Throwable#addSuppressed(Throwable)} and a warning is logged.</li> + * </ul> + * <p> + * This is a <strong>single-shot</strong> operation - exactly one attempt is made, with no automatic retry. The + * lambda receives the transactional {@code gtx} and should issue its traversals against that source only. Because + * the lifecycle is driven through {@link #tx()}, the underlying transaction semantics (embedded thread-bound vs. + * remote server session) are whatever the underlying {@code begin()}/{@code commit()}/{@code rollback()} provide. + * + * @param txWork the unit of work to run against the transaction-bound {@link GraphTraversalSource} + * @param <T> the type of value produced by {@code txWork} + * @return the value produced by {@code txWork} + * @see #executeInTx(Consumer) + */ + public <T> T evaluateInTx(final Function<GraphTraversalSource, T> txWork) { + final Transaction tx = this.tx(); + final GraphTraversalSource gtx = tx.begin(); + final T result; + // Phase 1: run the user's work. If it throws, roll back and rethrow the body error - the + // throw below exits the method, so a failed body never reaches the commit in phase 2. + try { + result = txWork.apply(gtx); + } catch (Throwable bodyError) { + try { + tx.rollback(); + } catch (Throwable rollbackError) { + bodyError.addSuppressed(rollbackError); + LOGGER.warn("Rollback failed after transaction body error", rollbackError); + } + throw bodyError; + } + // Phase 2: the body succeeded, so commit. A separate try because this failure mode is + // distinct (commit, not body): we still roll back for server-side hygiene, then rethrow + // the commit error as the primary error. + try { + tx.commit(); + } catch (Throwable commitError) { + try { + tx.rollback(); + } catch (Throwable rollbackError) { + commitError.addSuppressed(rollbackError); + LOGGER.warn("Rollback failed after commit failure", rollbackError); + } + throw commitError; + } + return result; + } + /** * If there is an underlying {@link RemoteConnection} it will be closed by this method. */ diff --git a/gremlin-dotnet/src/Gremlin.Net/Process/Traversal/GraphTraversalSource.cs b/gremlin-dotnet/src/Gremlin.Net/Process/Traversal/GraphTraversalSource.cs index fe1f6028a6..80573fbaae 100644 --- a/gremlin-dotnet/src/Gremlin.Net/Process/Traversal/GraphTraversalSource.cs +++ b/gremlin-dotnet/src/Gremlin.Net/Process/Traversal/GraphTraversalSource.cs @@ -24,6 +24,8 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; +using System.Threading.Tasks; using Gremlin.Net.Driver; using Gremlin.Net.Process.Remote; using Gremlin.Net.Process.Traversal.Strategy.Decoration; @@ -275,6 +277,109 @@ namespace Gremlin.Net.Process.Traversal return _connection!.Tx(this); } + /// <summary> + /// Runs a unit of work inside a transaction whose lifecycle is managed automatically. + /// A transaction is started via <c>this.Tx().BeginAsync()</c>, the supplied + /// <paramref name="txWork"/> is invoked with the transaction-bound + /// <see cref="GraphTraversalSource"/> (<c>gtx</c>), and the transaction is then committed + /// on normal completion or rolled back on any failure. + /// + /// This is a single-shot wrapper (no retry): exactly one + /// begin → run → commit/rollback sequence is performed. Only <c>gtx</c> is in scope inside + /// the body; the non-transactional source must not be used. If <paramref name="txWork"/> + /// throws, the transaction is rolled back and the original exception is re-thrown unchanged. + /// If the commit fails, a rollback is attempted for server-side hygiene and the commit error + /// is propagated. A failed rollback during cleanup is swallowed so it never replaces the + /// primary (body or commit) error. + /// </summary> + /// <param name="txWork"> + /// The unit of work to run against the transaction-bound <c>gtx</c>. + /// </param> + /// <param name="cancellationToken">The token to cancel the operation.</param> + /// <returns>A <see cref="Task"/> that completes when the transaction has been committed.</returns> + public async Task ExecuteInTxAsync(Func<GraphTraversalSource, Task> txWork, + CancellationToken cancellationToken = default) + => await EvaluateInTxAsync<object?>(async gtx => + { + await txWork(gtx).ConfigureAwait(false); + return null; + }, cancellationToken).ConfigureAwait(false); + + /// <summary> + /// Runs a value-returning unit of work inside a transaction whose lifecycle is managed + /// automatically. A transaction is started via <c>this.Tx().BeginAsync()</c>, the supplied + /// <paramref name="txWork"/> is invoked with the transaction-bound + /// <see cref="GraphTraversalSource"/> (<c>gtx</c>), and the transaction is then committed + /// on normal completion or rolled back on any failure. + /// + /// This is a single-shot wrapper (no retry): exactly one + /// begin → run → commit/rollback sequence is performed. Only <c>gtx</c> is in scope inside + /// the body; the non-transactional source must not be used. The value produced by + /// <paramref name="txWork"/> is returned to the caller after a successful commit. If + /// <paramref name="txWork"/> throws, the transaction is rolled back and the original + /// exception is re-thrown unchanged. If the commit fails, a rollback is attempted for + /// server-side hygiene and the commit error is propagated. A failed rollback during cleanup + /// is swallowed so it never replaces the primary (body or commit) error. + /// </summary> + /// <typeparam name="T">The type of value produced by <paramref name="txWork"/>.</typeparam> + /// <param name="txWork"> + /// The unit of work to run against the transaction-bound <c>gtx</c>. Its result is returned + /// to the caller once the transaction commits. + /// </param> + /// <param name="cancellationToken">The token to cancel the operation.</param> + /// <returns>The value produced by <paramref name="txWork"/>.</returns> + public async Task<T> EvaluateInTxAsync<T>(Func<GraphTraversalSource, Task<T>> txWork, + CancellationToken cancellationToken = default) + { + var tx = this.Tx(); + var gtx = await tx.BeginAsync(cancellationToken).ConfigureAwait(false); + T result; + // Phase 1: run the user's work. If it throws, roll back and rethrow the body error - the + // throw below exits the method, so a failed body never reaches the commit in phase 2. + try + { + result = await txWork(gtx).ConfigureAwait(false); + } + catch (Exception) + { + try + { + // No cancellation token: if the body failed because the token was cancelled, + // honoring it here would abort the cleanup rollback before it is sent. + await tx.RollbackAsync().ConfigureAwait(false); + } + catch (Exception) + { + // Rollback cleanup failure is swallowed so the original body error stays primary. + } + throw; + } + + // Phase 2: the body succeeded, so commit. A separate try because this failure mode is + // distinct (commit, not body): we still roll back for server-side hygiene, then rethrow + // the commit error as the primary error. + try + { + await tx.CommitAsync(cancellationToken).ConfigureAwait(false); + } + catch (Exception) + { + try + { + // No cancellation token: honoring a cancelled token here would abort the + // cleanup rollback before it is sent, leaving the transaction open server-side. + await tx.RollbackAsync().ConfigureAwait(false); + } + catch (Exception) + { + // Rollback cleanup failure is swallowed so the commit error stays primary. + } + throw; + } + + return result; + } + /// <summary> /// Add a GraphComputer class used to execute the traversal. /// This adds a <see cref="VertexProgramStrategy" /> to the strategies. diff --git a/gremlin-dotnet/test/Gremlin.Net.IntegrationTest/Driver/TransactionTests.cs b/gremlin-dotnet/test/Gremlin.Net.IntegrationTest/Driver/TransactionTests.cs index 57df672be9..9e62f09c38 100644 --- a/gremlin-dotnet/test/Gremlin.Net.IntegrationTest/Driver/TransactionTests.cs +++ b/gremlin-dotnet/test/Gremlin.Net.IntegrationTest/Driver/TransactionTests.cs @@ -516,6 +516,127 @@ namespace Gremlin.Net.IntegrationTest.Driver Assert.Equal(0L, await GetCount(client2, "drc_close_test")); } + // Sentinel exception used by the body-throws closure test to assert the exact + // original error (type + message) propagates out of the closure wrapper. + private sealed class SentinelTransactionException : Exception + { + public SentinelTransactionException(string message) : base(message) + { + } + } + + [Fact] + public async Task ShouldCommitOnSuccessWithExecuteInTxAsync() + { + using var client = CreateClient(); + await DropGraph(client); + var connection = new DriverRemoteConnection(client, "gtx"); + var g = AnonymousTraversalSource.Traversal().With(connection); + + await g.ExecuteInTxAsync(async gtx => await gtx.AddV("person").Promise(t => t.Iterate())); + + // The closure committed automatically on success, so the vertex is persisted. + Assert.Equal(1L, await GetCount(client, "person")); + } + + [Fact] + public async Task ShouldRollbackAndRethrowWhenExecuteInTxAsyncBodyThrows() + { + using var client = CreateClient(); + await DropGraph(client); + var connection = new DriverRemoteConnection(client, "gtx"); + var g = AnonymousTraversalSource.Traversal().With(connection); + + const string sentinelMessage = "sentinel-body-error-3f1c8e"; + + // (i) The exact original exception (type + message) propagates to the caller. + var ex = await Assert.ThrowsAsync<SentinelTransactionException>(() => + g.ExecuteInTxAsync(async gtx => + { + await gtx.AddV("person").Promise(t => t.Iterate()); + throw new SentinelTransactionException(sentinelMessage); + })); + Assert.Equal(sentinelMessage, ex.Message); + + // (ii) The closure rolled back automatically, so the vertex is NOT persisted. + Assert.Equal(0L, await GetCount(client, "person")); + } + + [Fact] + public async Task ShouldReturnBodyValueFromEvaluateInTxAsync() + { + using var client = CreateClient(); + await DropGraph(client); + var connection = new DriverRemoteConnection(client, "gtx"); + var g = AnonymousTraversalSource.Traversal().With(connection); + + var n = await g.EvaluateInTxAsync(async gtx => + { + await gtx.AddV("person").Promise(t => t.Iterate()); + await gtx.AddV("person").Promise(t => t.Iterate()); + return await gtx.V().Count().Promise(t => t.Next()); + }); + + // The body counted the two vertices it added within the transaction. + Assert.Equal(2L, n); + + // The closure committed, so the same count is visible afterwards. + Assert.Equal(2L, await GetCount(client, "person")); + } + + [Fact] + public async Task ShouldThrowWhenOpeningNestedTransactionInsideExecuteInTxAsync() + { + using var client = CreateClient(); + await DropGraph(client); + var connection = new DriverRemoteConnection(client, "gtx"); + var g = AnonymousTraversalSource.Traversal().With(connection); + + // Opening a SECOND transaction from inside the body must throw. The closure body's + // own commit will then fail because the body threw, surfacing the nesting error. + await Assert.ThrowsAsync<InvalidOperationException>(() => + g.ExecuteInTxAsync(async gtx => + { + // gtx.Tx() legitimately returns the SAME transaction (it must not throw); + // calling BeginAsync() on it opens a second transaction and must throw. + await gtx.Tx().BeginAsync(); + })); + } + + [Fact] + public async Task ShouldPropagateCommitErrorFromExecuteInTxAsync() + { + using var client = CreateClient(); + await DropGraph(client); + var connection = new DriverRemoteConnection(client, "gtx"); + var g = AnonymousTraversalSource.Traversal().With(connection); + + // Drive a commit failure without a mock: from inside the body, terminate the + // server-side transaction out-of-band (rollback by its transactionId via a second + // client). The wrapper's automatic CommitAsync then fails server-side with + // "Transaction not found", and that commit error must propagate out of ExecuteInTxAsync. + using var sideClient = CreateClient(); + + var ex = await Assert.ThrowsAsync<ResponseException>(() => + g.ExecuteInTxAsync(async gtx => + { + await gtx.AddV("person").Promise(t => t.Iterate()); + + // Roll back this very transaction out-of-band so the upcoming commit fails. + var txId = gtx.Tx().TransactionId!; + var rollbackMsg = RequestMessage.Build("g.tx().rollback()") + .AddG("gtx") + .AddField(Tokens.ArgsTransactionId, txId) + .Create(); + await sideClient.SubmitAsync<object>(rollbackMsg); + })); + + Assert.Contains("Transaction not found", ex.Message); + + // The out-of-band rollback already discarded the work, so nothing is persisted. + Assert.Equal(0L, await GetCount(client, "person")); + } + [Fact] public async Task ShouldSerializeUnawaitedSubmissions() { diff --git a/gremlin-go/driver/graphTraversalSource.go b/gremlin-go/driver/graphTraversalSource.go index c03fefa069..cfc0b61603 100644 --- a/gremlin-go/driver/graphTraversalSource.go +++ b/gremlin-go/driver/graphTraversalSource.go @@ -251,3 +251,109 @@ func (gts *GraphTraversalSource) Tx() *Transaction { } return &Transaction{client: drc.client} } + +// ExecuteInTx runs txWork inside a single, automatically managed transaction and +// returns any error that occurred. +// +// It owns the full lifecycle: it obtains a transaction via Tx, calls Begin to +// start it, passes the resulting transaction-bound GraphTraversalSource (gtx) to +// txWork, and then commits on success or rolls back on failure. The body must use +// only the gtx it receives; the non-transactional source is never in scope. +// +// This is single-shot: exactly one attempt is made (begin, run, commit/rollback) +// with no automatic retry. +// +// Error handling: +// - If Begin fails, that error is returned and txWork is never invoked. +// - If txWork returns a non-nil error, the transaction is rolled back and the +// original body error is returned unchanged. +// - If the commit fails, a rollback is attempted for server-side hygiene and +// the commit error is returned (it takes precedence over any rollback error). +// - If a rollback attempted during cleanup itself fails, a warning is logged +// but the primary (body or commit) error still propagates. +// - If txWork panics, the transaction is rolled back and the panic is +// re-raised (it is never swallowed). +// +// For a transaction body that needs to return a value, use EvaluateInTx instead. +func (gts *GraphTraversalSource) ExecuteInTx(txWork func(*GraphTraversalSource) error) error { + _, err := gts.EvaluateInTx(func(gtx *GraphTraversalSource) (interface{}, error) { + return nil, txWork(gtx) + }) + return err +} + +// EvaluateInTx runs txWork inside a single, automatically managed transaction and +// returns the value produced by txWork along with any error. +// +// It is the value-returning counterpart to ExecuteInTx. The value is returned as +// interface{}, matching the rest of the driver's untyped result API (e.g. +// Traversal.Next returns a *Result whose concrete value is obtained via the +// Result.Get* accessors); the caller type-asserts the returned value as needed. +// +// It owns the full lifecycle: it obtains a transaction via gts.Tx, calls Begin to +// start it, passes the resulting transaction-bound GraphTraversalSource (gtx) to +// txWork, and then commits on success or rolls back on failure. The body must use +// only the gtx it receives; the non-transactional source is never in scope. +// +// This is single-shot: exactly one attempt is made (begin, run, commit/rollback) +// with no automatic retry. +// +// Error handling: +// - If Begin fails, that error is returned (with a nil value) and txWork is +// never invoked. +// - If txWork returns a non-nil error, the transaction is rolled back and the +// original body error is returned unchanged, along with the value txWork +// returned. +// - If the commit fails, a rollback is attempted for server-side hygiene and +// the commit error is returned (it takes precedence over any rollback error). +// - If a rollback attempted during cleanup itself fails, a warning is logged +// but the primary (body or commit) error still propagates. +// - If txWork panics, the transaction is rolled back and the panic is +// re-raised (it is never swallowed). +func (gts *GraphTraversalSource) EvaluateInTx( + txWork func(*GraphTraversalSource) (interface{}, error)) (interface{}, error) { + + var result interface{} + tx := gts.Tx() + gtx, err := tx.Begin() + if err != nil { + return result, err + } + + // rollbackQuietly performs a best-effort cleanup rollback. It never propagates + // a failure - a returned error is logged, and a panic from Rollback itself is + // recovered and discarded - so it can never mask the primary (body, commit, or + // panic) error. A failed rollback is not fatal anyway: the server rolls the + // transaction back when it hits its transaction timeout. + rollbackQuietly := func() { + defer func() { _ = recover() }() + if rbErr := tx.Rollback(); rbErr != nil { + tx.logRollbackWarning(rbErr) + } + } + + // A panic in the body must roll back the transaction and then re-panic so the + // original panic is never swallowed. + defer func() { + if r := recover(); r != nil { + rollbackQuietly() + panic(r) + } + }() + + result, err = txWork(gtx) + if err != nil { + // Body returned an error: roll back and surface the exact original error. + rollbackQuietly() + return result, err + } + + if commitErr := tx.Commit(); commitErr != nil { + // Commit failed: attempt a rollback for server-side hygiene, but the + // commit error remains the primary error returned to the caller. + rollbackQuietly() + return result, commitErr + } + + return result, nil +} diff --git a/gremlin-go/driver/logger.go b/gremlin-go/driver/logger.go index 9ae624284a..5f294118c5 100644 --- a/gremlin-go/driver/logger.go +++ b/gremlin-go/driver/logger.go @@ -116,4 +116,5 @@ const ( closeClient errorKey = "CLOSE_CLIENT" failedToCloseResponseBody errorKey = "FAILED_TO_CLOSE_RESPONSE_BODY" failedToCloseDecompReader errorKey = "FAILED_TO_CLOSE_DECOMPRESSION_READER" + rollbackFailedDuringCleanup errorKey = "ROLLBACK_FAILED_DURING_CLEANUP" ) diff --git a/gremlin-go/driver/resources/logger-messages/en.json b/gremlin-go/driver/resources/logger-messages/en.json index f7025e837a..3787040a63 100644 --- a/gremlin-go/driver/resources/logger-messages/en.json +++ b/gremlin-go/driver/resources/logger-messages/en.json @@ -22,5 +22,6 @@ "FAILED_TO_RECEIVE_RESPONSE": "Failed to receive response: %s", "FAILED_TO_SEND_REQUEST": "Failed to send request: %s", "FAILED_TO_CLOSE_RESPONSE_BODY": "Error closing response body: %s", - "FAILED_TO_CLOSE_DECOMPRESSION_READER": "Error closing decompression reader: %s" + "FAILED_TO_CLOSE_DECOMPRESSION_READER": "Error closing decompression reader: %s", + "ROLLBACK_FAILED_DURING_CLEANUP": "Rollback failed during transaction cleanup; the primary error still propagates: %s" } diff --git a/gremlin-go/driver/transaction.go b/gremlin-go/driver/transaction.go index 9f7846031c..e57177a65c 100644 --- a/gremlin-go/driver/transaction.go +++ b/gremlin-go/driver/transaction.go @@ -172,6 +172,19 @@ func (t *Transaction) Submit(gremlin string, options ...RequestOptions) (ResultS return t.client.SubmitWithOptions(gremlin, opts) } +// logRollbackWarning logs a warning that a rollback attempted during transaction +// cleanup failed. The rollback failure is non-fatal: the primary body or commit +// error still propagates to the caller. +// +// It backs the rollback-cleanup handling in the GraphTraversalSource transaction +// closure helpers (ExecuteInTx / EvaluateInTx); those callers live in the same +// package and reach the driver's logHandler through this Transaction method. +func (t *Transaction) logRollbackWarning(rbErr error) { + if t.client != nil && t.client.logHandler != nil { + t.client.logHandler.logf(Warning, rollbackFailedDuringCleanup, rbErr.Error()) + } +} + func extractTransactionId(results []*Result) (string, error) { if len(results) == 0 { return "", fmt.Errorf("server did not return transaction ID") diff --git a/gremlin-go/driver/transaction_test.go b/gremlin-go/driver/transaction_test.go index a449913809..9c9cc17674 100644 --- a/gremlin-go/driver/transaction_test.go +++ b/gremlin-go/driver/transaction_test.go @@ -21,6 +21,7 @@ package gremlingo import ( "crypto/tls" + "errors" "testing" "github.com/stretchr/testify/assert" @@ -536,7 +537,6 @@ func TestTransactionBeginAfterRollback(t *testing.T) { assert.Contains(t, err.Error(), "E1101") } - func TestTransactionRollbackOnClientClose(t *testing.T) { client := newTxClient(t) dropTxGraph(t, client) @@ -579,7 +579,6 @@ func TestTransactionRollbackOnDrcClose(t *testing.T) { assert.Equal(t, int64(0), getTxCount(t, client2, "drc_close_test")) } - func TestTransactionMultiRollback(t *testing.T) { client := newTxClient(t) defer client.Close() @@ -631,3 +630,191 @@ func TestTransactionMultiCommitAndRollback(t *testing.T) { assert.Nil(t, err) assert.Equal(t, int64(0), getTxCount(t, client, "multi_cr2")) } + +func TestTransactionExecuteInTxCommitsOnSuccess(t *testing.T) { + client := newTxClient(t) + defer client.Close() + dropTxGraph(t, client) + + remote := newTxRemoteConnection(t) + defer remote.Close() + g := Traversal_().With(remote) + + err := g.ExecuteInTx(func(gtx *GraphTraversalSource) error { + promise := gtx.AddV("person").Iterate() + return <-promise + }) + assert.Nil(t, err) + + // Committed data is visible outside the transaction. + assert.Equal(t, int64(1), getTxCount(t, client, "person")) +} + +func TestTransactionExecuteInTxRollsBackAndRethrowsOnBodyError(t *testing.T) { + client := newTxClient(t) + defer client.Close() + dropTxGraph(t, client) + + remote := newTxRemoteConnection(t) + defer remote.Close() + g := Traversal_().With(remote) + + sentinel := errors.New("intentional body failure") + + err := g.ExecuteInTx(func(gtx *GraphTraversalSource) error { + // Add a vertex, then fail: the add must be rolled back. + promise := gtx.AddV("person").Iterate() + assert.Nil(t, <-promise) + return sentinel + }) + + // (i) the exact original error is returned unchanged. + assert.NotNil(t, err) + assert.True(t, errors.Is(err, sentinel)) + assert.Equal(t, sentinel, err) + + // (ii) the vertex was NOT persisted (rollback happened). + assert.Equal(t, int64(0), getTxCount(t, client, "person")) +} + +// EvaluateInTx returns the value computed by the body as interface{}, matching +// the driver's untyped result API; the caller type-asserts it. +func TestTransactionEvaluateInTxReturnsValue(t *testing.T) { + client := newTxClient(t) + defer client.Close() + dropTxGraph(t, client) + + remote := newTxRemoteConnection(t) + defer remote.Close() + g := Traversal_().With(remote) + + v, err := g.EvaluateInTx(func(gtx *GraphTraversalSource) (interface{}, error) { + // Add two vertices and return the in-transaction count. + promise := gtx.AddV("widget").Iterate() + if pErr := <-promise; pErr != nil { + return nil, pErr + } + promise = gtx.AddV("widget").Iterate() + if pErr := <-promise; pErr != nil { + return nil, pErr + } + counts, cErr := gtx.V().HasLabel("widget").Count().ToList() + if cErr != nil { + return nil, cErr + } + count, cErr := counts[0].GetInt64() + if cErr != nil { + return nil, cErr + } + return count, nil + }) + assert.Nil(t, err) + // the body returned an int64; the caller type-asserts the interface{} result + assert.Equal(t, int64(2), v) + + // The committed value matches what was returned. + assert.Equal(t, int64(2), getTxCount(t, client, "widget")) +} + +// Opening a SECOND transaction from inside the body must error. gtx.Tx() itself +// legitimately returns the SAME transaction (it must not error - that is the +// commit path), but a nested Begin() on it is rejected by the existing +// double-begin guard (E1101). +func TestTransactionExecuteInTxRejectsNestedBegin(t *testing.T) { + client := newTxClient(t) + defer client.Close() + dropTxGraph(t, client) + + remote := newTxRemoteConnection(t) + defer remote.Close() + g := Traversal_().With(remote) + + err := g.ExecuteInTx(func(gtx *GraphTraversalSource) error { + // gtx.Tx() returns the bound, already-open transaction (must NOT error). + tx := gtx.Tx() + assert.True(t, tx.IsOpen()) + + // gtx.Tx() called again returns that same transaction handle. + assert.Equal(t, tx, gtx.Tx()) + + // Opening a second transaction (a nested begin) IS rejected. + _, beginErr := tx.Begin() + assert.NotNil(t, beginErr) + assert.Contains(t, beginErr.Error(), "E1101") + + // Surface the nested-begin error from the body so the wrapper rolls back. + return beginErr + }) + + assert.NotNil(t, err) + assert.Contains(t, err.Error(), "E1101") +} + +// Verifies that a failure of the wrapper's automatic Commit() is surfaced from +// ExecuteInTx. To drive a deterministic, no-mock commit failure, the body +// succeeds but the transaction is rolled back from a SEPARATE connection (by its +// transactionId) before the body returns; the wrapper's Commit() is then a real +// commit RPC that the server rejects with "Transaction not found". +func TestTransactionExecuteInTxPropagatesCommitFailure(t *testing.T) { + client := newTxClient(t) + defer client.Close() + dropTxGraph(t, client) + + // a second connection used to kill the transaction out-of-band + sideClient := newTxClient(t) + defer sideClient.Close() + + remote := newTxRemoteConnection(t) + defer remote.Close() + g := Traversal_().With(remote) + + err := g.ExecuteInTx(func(gtx *GraphTraversalSource) error { + if pErr := <-gtx.AddV("commit_fail").Iterate(); pErr != nil { + return pErr + } + // Roll the transaction back from a separate connection, targeting it by its + // transactionId. The body still returns nil, so the wrapper proceeds to commit - + // which now fails server-side because the transaction no longer exists. + txId := gtx.Tx().TransactionId() + _, rbErr := sideClient.SubmitWithOptions("g.tx().rollback()", + new(RequestOptionsBuilder).SetTransactionId(txId).Create()) + return rbErr + }) + + // the commit failure is the error surfaced to the caller + assert.NotNil(t, err) + assert.Contains(t, err.Error(), "Transaction not found") + + // data was not persisted (the transaction was rolled back out-of-band) + assert.Equal(t, int64(0), getTxCount(t, client, "commit_fail")) +} + +func TestTransactionExecuteInTxRollsBackAndRepanicsOnPanic(t *testing.T) { + client := newTxClient(t) + defer client.Close() + dropTxGraph(t, client) + + remote := newTxRemoteConnection(t) + defer remote.Close() + g := Traversal_().With(remote) + + func() { + defer func() { + r := recover() + // The panic must propagate out of ExecuteInTx (not be swallowed). + assert.NotNil(t, r) + assert.Equal(t, "intentional body panic", r) + }() + + _ = g.ExecuteInTx(func(gtx *GraphTraversalSource) error { + promise := gtx.AddV("panic_vertex").Iterate() + assert.Nil(t, <-promise) + panic("intentional body panic") + }) + // Unreachable: ExecuteInTx must re-panic. + assert.Fail(t, "expected ExecuteInTx to re-panic") + }() + + // The transaction was rolled back during panic handling: nothing persisted. + assert.Equal(t, int64(0), getTxCount(t, client, "panic_vertex")) +} diff --git a/gremlin-js/gremlin-javascript/lib/process/graph-traversal.ts b/gremlin-js/gremlin-javascript/lib/process/graph-traversal.ts index acde7f4f72..b60d8d3f48 100644 --- a/gremlin-js/gremlin-javascript/lib/process/graph-traversal.ts +++ b/gremlin-js/gremlin-javascript/lib/process/graph-traversal.ts @@ -81,6 +81,74 @@ export class GraphTraversalSource { return new Transaction(client); } + /** + * Runs a unit of work inside a transaction, owning the full lifecycle. + * + * The transaction is begun automatically and the resulting transaction-bound + * <code>GraphTraversalSource</code> (<code>gtx</code>) is passed to the + * callback as its sole argument. Only <code>gtx</code> is in scope inside the + * callback, which avoids accidentally issuing traversals against the + * non-transactional <code>g</code>. + * + * On normal completion of the callback the transaction is committed; on any + * abnormal exit (a thrown error or a rejected promise) the transaction is + * rolled back. This is a single-shot operation: there is exactly one attempt + * (begin -> run -> commit/rollback) and no retry. A second transaction may not + * be opened from inside the callback (calling <code>gtx.tx().begin()</code> + * throws via the existing double-begin guard). + * + * The callback may be synchronous or asynchronous; its return value (or the + * value its promise resolves to) is awaited and returned from + * <code>executeInTx</code>. If the callback returns nothing the result is + * <code>undefined</code>. + * + * Error handling: if the callback throws/rejects, the original error is + * re-raised unchanged after rollback is attempted. If the commit itself fails, + * the commit error propagates and a rollback is still attempted afterward for + * server-side hygiene. In both cases a failed rollback during cleanup is logged + * as a warning and never replaces the primary error. + * + * @param txWork callback that receives the transaction-bound GraphTraversalSource + * @returns {Promise<T>} resolves to the callback's value (undefined if none) + */ + async executeInTx<T>(txWork: (gtx: GraphTraversalSource) => T | Promise<T>): Promise<T> { + const tx = this.tx(); + const gtx = await tx.begin(); + + let result: T; + // Phase 1: run the user's work. If it throws, roll back and rethrow the body error - the + // throw below exits the method, so a failed body never reaches the commit in phase 2. + try { + result = await txWork(gtx); + } catch (bodyError) { + try { + await tx.rollback(); + } catch (rollbackError) { + console.warn('Rollback failed after transaction body error', rollbackError); + } + // Re-raise the exact original error; rollback failures never replace it. + throw bodyError; + } + + // Phase 2: the body succeeded, so commit. A separate try because this failure mode is + // distinct (commit, not body): we still roll back for server hygiene, then rethrow the + // commit error as the primary error. + try { + await tx.commit(); + } catch (commitError) { + try { + // Attempt rollback so the server does not keep the transaction around. + await tx.rollback(); + } catch (rollbackError) { + console.warn('Rollback failed after commit failure', rollbackError); + } + // The commit error stays primary; rollback was only for server hygiene. + throw commitError; + } + + return result; + } + /** * @param graphComputer * @param workers diff --git a/gremlin-js/gremlin-javascript/test/integration/transaction-tests.js b/gremlin-js/gremlin-javascript/test/integration/transaction-tests.js index 5abfc8436c..01a95367df 100644 --- a/gremlin-js/gremlin-javascript/test/integration/transaction-tests.js +++ b/gremlin-js/gremlin-javascript/test/integration/transaction-tests.js @@ -421,4 +421,118 @@ describe('Transaction', function () { await nonTxClient.close(); }); }); + + describe('Closure API (g.executeInTx())', function () { + it('should commit on success', async function () { + const connection = getConnection('gtx'); + const g = anon.traversal().withRemote(connection); + + await g.executeInTx(async (gtx) => { + await gtx.addV('person').iterate(); + }); + + // Fresh non-transactional query confirms the vertex was committed. + const count = await g.V().hasLabel('person').count().next(); + assert.strictEqual(count.value, 1); + + await connection.close(); + }); + + it('should rollback and rethrow when the body throws', async function () { + const connection = getConnection('gtx'); + const g = anon.traversal().withRemote(connection); + + const sentinelMessage = 'sentinel body failure 0xC0FFEE'; + class SentinelError extends Error {} + + let caught; + try { + await g.executeInTx(async (gtx) => { + await gtx.addV('person').iterate(); + throw new SentinelError(sentinelMessage); + }); + assert.fail('Expected executeInTx() to rethrow the body error'); + } catch (e) { + caught = e; + } + + // (i) the exact original error (type + message) is rethrown to the caller + assert.ok(caught instanceof SentinelError); + assert.strictEqual(caught.message, sentinelMessage); + + // (ii) the vertex added in the body was NOT persisted (rollback happened) + const count = await g.V().hasLabel('person').count().next(); + assert.strictEqual(count.value, 0); + + await connection.close(); + }); + + it('should return the body value', async function () { + const connection = getConnection('gtx'); + const g = anon.traversal().withRemote(connection); + + // Seed a vertex outside the transaction so the body can count it. + await g.addV('person').iterate(); + + const n = await g.executeInTx((gtx) => gtx.V().count().next()); + assert.strictEqual(n.value, 1); + + await connection.close(); + }); + + it('should reject opening a nested transaction in the body', async function () { + const connection = getConnection('gtx'); + const g = anon.traversal().withRemote(connection); + + await assert.rejects( + () => + g.executeInTx(async (gtx) => { + // gtx.tx() legitimately returns the SAME transaction; calling begin() + // on it opens a second transaction and trips the double-begin guard. + await gtx.tx().begin(); + }), + /Transaction already started/ + ); + + await connection.close(); + }); + + // Forces a real, no-mock commit failure by finalizing the transaction from + // inside the body: gtx.tx() returns the same Transaction, and calling + // rollback() on it closes it. executeInTx's own trailing commit() then fails + // because the transaction is no longer open, and that error propagates. + it('should propagate a commit failure out of executeInTx', async function () { + const connection = getConnection('gtx'); + const g = anon.traversal().withRemote(connection); + // a second connection used to kill the transaction out-of-band + const sideClient = getClient('gtx'); + + // To drive a deterministic, no-mock commit failure, the body succeeds but the + // transaction is rolled back from a separate connection (by its transactionId) + // before the body returns; executeInTx's commit() is then a real commit RPC that + // the server rejects (HTTP 404, "Transaction not found"). + let caught; + try { + await g.executeInTx(async (gtx) => { + await gtx.addV('person').iterate(); + const transactionId = gtx.tx().transactionId; + await sideClient.submit('g.tx().rollback()', null, { transactionId }); + }); + assert.fail('Expected executeInTx() to propagate the commit failure'); + } catch (e) { + caught = e; + } + // The server maps a missing/closed transaction to HTTP 404; the body text is on + // statusMessage (the driver puts the HTTP status line in the Error message). + assert.strictEqual(caught.statusCode, 404); + assert.ok(caught.statusMessage.includes('Transaction not found')); + + // The body's work was not persisted (the transaction was rolled back out-of-band). + const count = await g.V().hasLabel('person').count().next(); + assert.strictEqual(count.value, 0); + + await sideClient.close(); + await connection.close(); + }); + }); }); diff --git a/gremlin-python/src/main/python/gremlin_python/process/graph_traversal.py b/gremlin-python/src/main/python/gremlin_python/process/graph_traversal.py index 4e83e857ba..86e846ea22 100644 --- a/gremlin-python/src/main/python/gremlin_python/process/graph_traversal.py +++ b/gremlin-python/src/main/python/gremlin_python/process/graph_traversal.py @@ -162,6 +162,59 @@ class GraphTraversalSource(object): from gremlin_python.driver.transaction import Transaction return Transaction(remote_connection._client) + def execute_in_tx(self, tx_work): + """Runs a unit of work inside a transaction, managing its lifecycle. + + The transaction is started automatically via tx().begin() and the + transaction-bound GraphTraversalSource (gtx) is passed to tx_work as its + sole argument. Only gtx should be used inside the body; the + non-transactional g is not in scope. On normal completion the + transaction is committed and the body's return value is returned + (None if the body returns nothing). If the body raises, the + transaction is rolled back and the exact original exception is + re-raised unchanged. + + This is single-shot: exactly one begin -> run -> commit/rollback + attempt is made; there is no automatic retry. If the body raises and + the follow-up rollback also fails, a warning is logged and the original + body exception still propagates. If commit() fails (e.g. the server + already rolled the transaction back), a rollback is attempted for + server-side hygiene and the commit error propagates as the primary + error. + + For example, g.execute_in_tx(lambda gtx: gtx.addV('person').iterate()) + runs the body and commits, while + count = g.execute_in_tx(lambda gtx: gtx.V().count().next()) returns the + body's value. + """ + tx = self.tx() + gtx = tx.begin() + # Phase 1: run the user's work. If it raises, roll back and re-raise the body error - the + # bare `raise` exits the method, so a failed body never reaches the commit in phase 2. + try: + result = tx_work(gtx) + except BaseException: + # Catch BaseException so any abnormal exit (including KeyboardInterrupt) rolls back. + try: + tx.rollback() + except BaseException as rollback_error: + # The cleanup rollback failure is only logged - the bare `raise` below re-raises + # the original body error unchanged so it always stays the primary error. + log.warning("Rollback failed after transaction body error: %s", rollback_error) + raise + # Phase 2: the body succeeded, so commit. A separate try because this failure mode is + # distinct (commit, not body): we still roll back for server-side hygiene, then re-raise + # the commit error as the primary error. + try: + tx.commit() + except Exception: + try: + tx.rollback() + except BaseException as rollback_error: + log.warning("Rollback failed after commit failure: %s", rollback_error) + raise + return result + def withComputer(self, graph_computer=None, workers=None, result=None, persist=None, vertices=None, edges=None, configuration=None): warnings.warn( diff --git a/gremlin-python/src/main/python/tests/integration/driver/test_transaction.py b/gremlin-python/src/main/python/tests/integration/driver/test_transaction.py index 77b5a04241..d7f2da1111 100644 --- a/gremlin-python/src/main/python/tests/integration/driver/test_transaction.py +++ b/gremlin-python/src/main/python/tests/integration/driver/test_transaction.py @@ -410,3 +410,89 @@ class TestTransaction(object): tx2.rollback() result = client.submit("g.V().hasLabel('multi_cr2').count()").all().result() assert result[0] == 0 + + def test_execute_in_tx_commits_on_success(self, remote_connection): + g = traversal().with_(remote_connection) + + g.execute_in_tx(lambda gtx: gtx.addV('person').property('name', 'exec_commit').iterate()) + + c = Client(test_no_auth_url, 'gtx') + result = c.submit("g.V().has('person','name','exec_commit').count()").all().result() + assert result[0] == 1 + c.close() + + def test_execute_in_tx_rolls_back_and_rethrows_when_body_throws(self, remote_connection): + # Asserts BOTH that the exact original exception (type + message) + # propagates and that the vertex added before the raise is NOT persisted. + g = traversal().with_(remote_connection) + + def body(gtx): + gtx.addV('person').property('name', 'exec_throw').iterate() + raise RuntimeError("simulated body failure 0xDEADBEEF") + + with pytest.raises(RuntimeError, match="simulated body failure 0xDEADBEEF"): + g.execute_in_tx(body) + + c = Client(test_no_auth_url, 'gtx') + result = c.submit("g.V().has('person','name','exec_throw').count()").all().result() + assert result[0] == 0 + c.close() + + def test_execute_in_tx_returns_body_value(self, remote_connection): + g = traversal().with_(remote_connection) + + # Seed two vertices in their own committed transaction so the + # value-returning body has something to count. + g.execute_in_tx(lambda gtx: gtx.addV('person').iterate()) + g.execute_in_tx(lambda gtx: gtx.addV('person').iterate()) + + count = g.execute_in_tx(lambda gtx: gtx.V().count().next()) + assert count == 2 + + def test_execute_in_tx_rejects_nested_transaction(self, remote_connection): + # Opening a SECOND transaction inside the body must raise. gtx.tx() + # itself legitimately returns the same transaction (so we do NOT assert + # it raises); calling begin() on it does raise. + g = traversal().with_(remote_connection) + + def body(gtx): + # gtx.tx() returns the same (already-open) transaction; begin() + # on an already-open transaction is the double-begin guard. + gtx.tx().begin() + + with pytest.raises(Exception, match="Transaction already started"): + g.execute_in_tx(body) + + def test_execute_in_tx_propagates_commit_failure(self, remote_connection): + # To drive a deterministic, no-mock commit failure, the body succeeds but + # the transaction is rolled back server-side from a separate connection + # (by its transactionId) before the body returns, leaving the id invalid. + # The commit() inside execute_in_tx() then fails with "Transaction not + # found", and that commit error propagates to the caller. + g = traversal().with_(remote_connection) + + invalidator = Client(test_no_auth_url, 'gtx') + + def body(gtx): + # Add work, then invalidate the transaction server-side by rolling + # it back through a separate plain client that targets this + # transaction's id. The body itself completes normally so execute_in_tx() + # proceeds to commit(), which will now fail. + gtx.addV('person').property('name', 'exec_commit_fail').iterate() + tx_id = gtx.tx().transaction_id + invalidator.submit("g.tx().rollback()", + request_options={'transactionId': tx_id}).all().result() + + try: + with pytest.raises(Exception) as exc_info: + g.execute_in_tx(body) + # The commit error is the primary error surfaced to the caller. + assert "Transaction not found" in str(exc_info.value) + finally: + invalidator.close() + + # Nothing was persisted (the server rolled the transaction back). + c = Client(test_no_auth_url, 'gtx') + result = c.submit("g.V().has('person','name','exec_commit_fail').count()").all().result() + assert result[0] == 0 + c.close() diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java index 69f387b17e..5f0e69d43c 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java @@ -43,6 +43,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.hamcrest.core.StringContains.containsString; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -703,4 +704,128 @@ public class GremlinDriverTransactionIntegrateTest extends AbstractGremlinServer cluster2.close(); } } + + /** + * A sentinel exception used by {@link #shouldRollbackAndRethrowOriginalErrorWhenTxClosureBodyThrows()} to verify + * that the exact original error thrown by an {@code executeInTx}/{@code evaluateInTx} body propagates to the + * caller unchanged. + */ + private static class TxClosureSentinelException extends RuntimeException { + TxClosureSentinelException(final String message) { + super(message); + } + } + + @Test + public void shouldCommitWhenTxClosureBodyCompletes() throws Exception { + final Client client = cluster.connect().alias(GTX); + final GraphTraversalSource g = traversal().with(DriverRemoteConnection.using(cluster, GTX)); + + g.executeInTx(gtx -> gtx.addV("person").property("name", "alice").iterate()); + + // committed data is visible to non-transactional reads + assertEquals(1L, client.submit("g.V().hasLabel('person').count()").one().getLong()); + + client.close(); + } + + @Test + public void shouldRollbackAndRethrowOriginalErrorWhenTxClosureBodyThrows() throws Exception { + final Client client = cluster.connect().alias(GTX); + final GraphTraversalSource g = traversal().with(DriverRemoteConnection.using(cluster, GTX)); + + try { + g.executeInTx(gtx -> { + gtx.addV("person").property("name", "bob").iterate(); + throw new TxClosureSentinelException("boom"); + }); + fail("The exact exception thrown by the closure body should propagate to the caller"); + } catch (Exception ex) { + // (i) the exact original error (type + message) propagates to the caller, unchanged + assertThat(ex, instanceOf(TxClosureSentinelException.class)); + assertEquals("boom", ex.getMessage()); + } + + // (ii) the vertex added before the error was rolled back and is not persisted + assertEquals(0L, client.submit("g.V().hasLabel('person').count()").one().getLong()); + + client.close(); + } + + @Test + public void shouldReturnBodyValueFromTxClosureCall() throws Exception { + final Client client = cluster.connect().alias(GTX); + final GraphTraversalSource g = traversal().with(DriverRemoteConnection.using(cluster, GTX)); + + final Long count = g.evaluateInTx(gtx -> { + gtx.addV("person").iterate(); + gtx.addV("person").iterate(); + return gtx.V().hasLabel("person").count().next(); + }); + + // the value computed in the body is returned to the caller + assertEquals(Long.valueOf(2L), count); + + // the work also committed, so the data is visible afterward + assertEquals(2L, client.submit("g.V().hasLabel('person').count()").one().getLong()); + + client.close(); + } + + @Test + public void shouldRejectOpeningSecondTransactionInsideTxClosureBody() throws Exception { + final GraphTraversalSource g = traversal().with(DriverRemoteConnection.using(cluster, GTX)); + + g.executeInTx(gtx -> { + // gtx.tx() itself is legitimate and returns a transaction handle - it must NOT throw, as it is the + // standard way to commit/rollback when holding a transactional source. + final Transaction nested = gtx.tx(); + assertNotNull(nested); + + // opening a SECOND transaction from within an already-open one must raise. The remote + // HttpRemoteTransaction.begin() guards against double-begin, so the remote nesting test asserts on begin(). + try { + nested.begin(); + fail("Opening a second transaction from within an already-open one should raise"); + } catch (Exception ex) { + // expected - the transaction is already started + assertThat(ex, instanceOf(IllegalStateException.class)); + assertThat(ex.getMessage(), containsString("Transaction already started")); + } + + gtx.addV("person").iterate(); + }); + } + + @Test + public void shouldPropagateCommitFailureFromTxClosure() throws Exception { + final Client client = cluster.connect().alias(GTX); + // a second connection used to kill the transaction out-of-band, so the closure's own commit fails + final Client sideClient = cluster.connect().alias(GTX); + final GraphTraversalSource g = traversal().with(DriverRemoteConnection.using(cluster, GTX)); + + // The closure body succeeds, but the transaction is rolled back server-side from a separate + // connection (by its transactionId) before the body returns. The wrapper's automatic commit() then + // fails with "Transaction not found", and that commit error must propagate out of executeInTx(). + try { + g.executeInTx(gtx -> { + gtx.addV("person").property("name", "doomed_commit").iterate(); + final String txId = ((RemoteTransaction) gtx.tx()).getTransactionId(); + sideClient.submit("g.tx().rollback()", + RequestOptions.build().addG(GTX).transactionId(txId).create()).all().join(); + }); + fail("The commit failure should propagate out of executeInTx()"); + } catch (Exception ex) { + // the commit failure is the error surfaced to the caller; the server reports it as + // "Transaction not found" (asserted on the root cause, as the other transaction-error tests do) + final Throwable root = ExceptionHelper.getRootCause(ex); + assertThat(root.getMessage(), containsString("Transaction not found")); + } + + // data was not persisted (the transaction was rolled back out-of-band) + assertEquals(0L, client.submit("g.V().hasLabel('person').count()").one().getLong()); + + client.close(); + sideClient.close(); + } } diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java index bd6fc31104..4efc21576b 100644 --- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java +++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java @@ -44,6 +44,7 @@ import static org.apache.tinkerpop.gremlin.structure.Graph.Features.VertexProper import static org.hamcrest.core.Is.is; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.fail; @@ -980,4 +981,91 @@ public class TransactionTest extends AbstractGremlinTest { IteratorUtils.count(graph.vertices()) ); } + + /** + * A sentinel exception used by {@link #shouldRollbackAndRethrowOriginalErrorWhenTxClosureBodyThrows()} to verify + * that the exact original error thrown by an {@code executeInTx}/{@code evaluateInTx} body propagates to the + * caller unchanged. + */ + private static class TxClosureSentinelException extends RuntimeException { + TxClosureSentinelException(final String message) { + super(message); + } + } + + @Test + @FeatureRequirement(featureClass = Graph.Features.VertexFeatures.class, feature = Graph.Features.VertexFeatures.FEATURE_ADD_VERTICES) + @FeatureRequirement(featureClass = Graph.Features.GraphFeatures.class, feature = FEATURE_TRANSACTIONS) + public void shouldCommitWhenTxClosureBodyCompletes() { + g.executeInTx(gtx -> gtx.addV("person").iterate()); + + // committed data is visible to reads outside the closure + assertEquals(1L, g.V().hasLabel("person").count().next().longValue()); + } + + @Test + @FeatureRequirement(featureClass = Graph.Features.VertexFeatures.class, feature = Graph.Features.VertexFeatures.FEATURE_ADD_VERTICES) + @FeatureRequirement(featureClass = Graph.Features.GraphFeatures.class, feature = FEATURE_TRANSACTIONS) + public void shouldRollbackAndRethrowOriginalErrorWhenTxClosureBodyThrows() { + try { + g.executeInTx(gtx -> { + gtx.addV("person").iterate(); + throw new TxClosureSentinelException("boom"); + }); + fail("The exact exception thrown by the closure body should propagate to the caller"); + } catch (Exception ex) { + // (i) the exact original error (type + message) propagates to the caller, unchanged + assertThat(ex, instanceOf(TxClosureSentinelException.class)); + assertEquals("boom", ex.getMessage()); + } + + // (ii) the vertex added before the error was rolled back and is not persisted + assertEquals(0L, g.V().hasLabel("person").count().next().longValue()); + } + + @Test + @FeatureRequirement(featureClass = Graph.Features.VertexFeatures.class, feature = Graph.Features.VertexFeatures.FEATURE_ADD_VERTICES) + @FeatureRequirement(featureClass = Graph.Features.GraphFeatures.class, feature = FEATURE_TRANSACTIONS) + public void shouldReturnBodyValueFromTxClosureCall() { + final long count = g.evaluateInTx(gtx -> { + gtx.addV("person").iterate(); + gtx.addV("person").iterate(); + return gtx.V().hasLabel("person").count().next(); + }); + + // the value computed in the body is returned to the caller + assertEquals(2L, count); + + // the work also committed, so the data is visible afterward + assertEquals(2L, g.V().hasLabel("person").count().next().longValue()); + } + + @Test + @FeatureRequirement(featureClass = Graph.Features.VertexFeatures.class, feature = Graph.Features.VertexFeatures.FEATURE_ADD_VERTICES) + @FeatureRequirement(featureClass = Graph.Features.GraphFeatures.class, feature = FEATURE_TRANSACTIONS) + public void shouldRejectOpeningSecondTransactionInsideTxClosureBody() { + g.executeInTx(gtx -> { + // gtx.tx() itself is legitimate and returns the (same) transaction - it must NOT throw, as it is the + // standard way to commit/rollback when holding a transactional source. + final Transaction nested = gtx.tx(); + assertNotNull(nested); + + // opening a SECOND transaction from within an already-open one must raise. On the embedded impl + // (TinkerTransaction) begin() calls doOpen() unconditionally with no double-open guard, so the guard lives + // in AbstractTransaction.open() (transactionAlreadyOpen()) - hence the embedded nesting test asserts on + // open(), not begin(). + try { + nested.open(); + fail("Opening a second transaction from within an already-open one should raise"); + } catch (Exception ex) { + // expected - a transaction is already open + assertThat(ex, instanceOf(IllegalStateException.class)); + } + + gtx.addV("person").iterate(); + }); + + // the outer transaction still committed normally + assertEquals(1L, g.V().hasLabel("person").count().next().longValue()); + } }
