This is an automated email from the ASF dual-hosted git repository.
kenhuuu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/master by this push:
new 899ab62e18 Add HTTP transaction support to gremlin-server (#3328)
899ab62e18 is described below
commit 899ab62e18b01e0cdd9946a90989f188a2df4797
Author: Ken Hu <[email protected]>
AuthorDate: Mon Mar 23 16:32:19 2026 -0700
Add HTTP transaction support to gremlin-server (#3328)
The HttpGremlinEndpointHandler is updated to handle transactional
workloads which it does in conjunction with the newly added
TransactionManager. The Transactionmanager is used for managing
lifecycle and it will return an UnmanagedTransaction for use to submit
ThreadLocal-based Graph transactions.
---
CHANGELOG.asciidoc | 1 +
.../apache/tinkerpop/gremlin/server/Context.java | 11 +
.../tinkerpop/gremlin/server/GremlinServer.java | 5 +
.../apache/tinkerpop/gremlin/server/Settings.java | 18 +
.../gremlin/server/channel/HttpChannelizer.java | 4 +-
.../server/handler/HttpGremlinEndpointHandler.java | 171 ++++-
.../gremlin/server/handler/HttpHandlerUtil.java | 35 +-
.../server/handler/HttpRequestMessageDecoder.java | 19 +-
.../gremlin/server/handler/TransactionManager.java | 179 +++++
.../server/handler/UnmanagedTransaction.java | 182 +++++
.../gremlin/server/util/GremlinError.java | 88 +++
.../gremlin/server/util/ServerGremlinExecutor.java | 14 +
.../GremlinServerHttpTransactionIntegrateTest.java | 753 +++++++++++++++++++++
.../org/apache/tinkerpop/gremlin/util/Tokens.java | 18 +
.../gremlin/util/message/RequestMessage.java | 14 +
.../ser/AbstractGraphSONMessageSerializerV4.java | 3 +
.../util/ser/binary/RequestMessageSerializer.java | 3 +
.../gremlin/util/message/RequestMessageTest.java | 6 -
18 files changed, 1485 insertions(+), 39 deletions(-)
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index c06db091e1..6846e4d9a3 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -27,6 +27,7 @@
image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
* Added `__contains__` and `keys()` to `Element` in `gremlin-python`.
* Added `subgraph()` support for `gremlin-python` so that results are stored
in a detached `Graph` object.
+* Added support for remote transactions to the `gremlin-server` through
`TransactionManager` and `UnmanagedTransaction`.
* Modified grammar to make `discard()` usage more consistent as a filter step
where it can now be used to chain additional traversal steps and be used
anonymously.
* Removed `Meta` field from `ResponseResult` struct in `gremlin-go`.
* Removed deprecated elements of the Java-based process testing suite:
`ProcessStandardSuite`, `ProcessComputerSuite`, `ProcessLimitedSuite` and
associated tests.
diff --git
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
index 21b7c0b636..06e2507e75 100644
---
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
+++
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
@@ -56,6 +56,7 @@ public class Context {
private ScheduledFuture<?> timeoutExecutor = null;
private boolean timeoutExecutorGrabbed = false;
private final Object timeoutExecutorLock = new Object();
+ private String transactionId; // initially null for non-transactional
requests and begin() calls; set after transaction creation.
public Context(final RequestMessage requestMessage, final
ChannelHandlerContext ctx,
final Settings settings, final GraphManager graphManager,
@@ -80,6 +81,7 @@ public class Context {
this.requestState = requestState;
this.requestTimeout = determineTimeout();
this.materializeProperties = determineMaterializeProperties();
+ this.transactionId =
requestMessage.getField(Tokens.ARGS_TRANSACTION_ID);
}
public void setTimeoutExecutor(final ScheduledFuture<?> timeoutExecutor) {
@@ -119,6 +121,15 @@ public class Context {
return scheduledExecutorService;
}
+ public String getTransactionId() {
+ return transactionId;
+ }
+
+ public void setTransactionId(final String transactionId) {
+ this.transactionId = transactionId;
+ }
+
+
/**
* Gets the current request to Gremlin Server.
*/
diff --git
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java
index 1314af6225..16d8a2d045 100644
---
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java
+++
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServer.java
@@ -293,6 +293,11 @@ public class GremlinServer {
logger.warn("Timeout waiting for boss/worker thread pools to
shutdown - continuing with shutdown process.");
}
+ if (serverGremlinExecutor != null) {
+ logger.info("Shutting down TransactionManager");
+ serverGremlinExecutor.getTransactionManager().shutdown();
+ }
+
// close TraversalSource and Graph instances - there aren't
guarantees that closing Graph will close all
// spawned TraversalSource instances so both should be closed
directly and independently.
if (serverGremlinExecutor != null) {
diff --git
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java
index 5e56c76ed9..3489bd511e 100644
---
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java
+++
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java
@@ -184,6 +184,24 @@ public class Settings {
*/
public boolean strictTransactionManagement = false;
+ /**
+ * Time in milliseconds that a transaction can remain idle before it is
automatically rolled back.
+ * This prevents resource leaks from abandoned transactions. Default is
600000 (10 minutes).
+ */
+ public long transactionTimeout = 600000L;
+
+ /**
+ * Time in milliseconds to wait for a transaction commit or rollback
operation to complete.
+ * Default is 10000 (10 seconds).
+ */
+ public long perGraphCloseTimeout = 10000L;
+
+ /**
+ * Maximum number of concurrent transactions allowed on the server.
+ * Default is 1000.
+ */
+ public int maxConcurrentTransactions = 1000;
+
/**
* The full class name of the {@link Channelizer} to use in Gremlin Server.
*/
diff --git
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java
index e90ae340bf..e5643f5154 100644
---
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java
+++
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/HttpChannelizer.java
@@ -35,6 +35,7 @@ import
org.apache.tinkerpop.gremlin.server.handler.HttpRequestIdHandler;
import org.apache.tinkerpop.gremlin.server.handler.HttpRequestMessageDecoder;
import org.apache.tinkerpop.gremlin.server.handler.HttpUserAgentHandler;
import org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler;
+import org.apache.tinkerpop.gremlin.server.handler.TransactionManager;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
@@ -61,7 +62,8 @@ public class HttpChannelizer extends AbstractChannelizer {
@Override
public void init(final ServerGremlinExecutor serverGremlinExecutor) {
super.init(serverGremlinExecutor);
- httpGremlinEndpointHandler = new
HttpGremlinEndpointHandler(gremlinExecutor, graphManager, settings);
+ httpGremlinEndpointHandler = new HttpGremlinEndpointHandler(
+ gremlinExecutor, graphManager, settings,
serverGremlinExecutor.getTransactionManager());
}
@Override
diff --git
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
index cbc9f2d616..fa34d4d762 100644
---
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
+++
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
@@ -27,9 +27,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.http.DefaultHttpContent;
-import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
-import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
@@ -57,6 +55,7 @@ import
org.apache.tinkerpop.gremlin.server.util.TraverserIterator;
import org.apache.tinkerpop.gremlin.structure.Column;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator;
import org.apache.tinkerpop.gremlin.structure.util.TemporaryException;
import org.apache.tinkerpop.gremlin.util.ExceptionHelper;
@@ -82,32 +81,33 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
import java.util.stream.Stream;
import static com.codahale.metrics.MetricRegistry.name;
import static io.netty.handler.codec.http.HttpHeaderNames.ACCEPT_ENCODING;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_ENCODING;
-import static io.netty.handler.codec.http.HttpHeaderNames.TRANSFER_ENCODING;
-import static io.netty.handler.codec.http.HttpHeaderValues.CHUNKED;
import static io.netty.handler.codec.http.HttpHeaderValues.DEFLATE;
import static
io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
-import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static
org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler.RequestState.FINISHED;
import static
org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler.RequestState.FINISHING;
import static
org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler.RequestState.NOT_STARTED;
import static
org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler.RequestState.STREAMING;
-import static
org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtil.sendTrailingHeaders;
+import static
org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtil.sendHttpResponse;
+import static
org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtil.sendLastHttpContent;
import static
org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtil.writeError;
/**
@@ -168,13 +168,16 @@ public class HttpGremlinEndpointHandler extends
SimpleChannelInboundHandler<Requ
private final GremlinExecutor gremlinExecutor;
private final GraphManager graphManager;
private final Settings settings;
+ private final TransactionManager transactionManager;
public HttpGremlinEndpointHandler(final GremlinExecutor gremlinExecutor,
final GraphManager graphManager,
- final Settings settings) {
+ final Settings settings,
+ final TransactionManager
transactionManager) {
this.gremlinExecutor = gremlinExecutor;
this.graphManager = graphManager;
this.settings = settings;
+ this.transactionManager = transactionManager;
}
@Override
@@ -210,18 +213,36 @@ public class HttpGremlinEndpointHandler extends
SimpleChannelInboundHandler<Requ
requestMessage.getGremlin());
}
- // Send back the 200 OK response header here since the
response is always chunk transfer encoded. Any
- // failures that follow this will show up in the response body
instead.
- final HttpResponse responseHeader = new
DefaultHttpResponse(HTTP_1_1, OK);
- if
(acceptsDeflateEncoding(ctx.attr(StateKey.REQUEST_HEADERS).get().getAll(ACCEPT_ENCODING)))
{
- responseHeader.headers().add(CONTENT_ENCODING, DEFLATE);
+ // These guards prevent any obvious failures from returning
200 OK early by detecting them here and
+ // throwing before any other processing starts so the user
gets a better error code.
+ final String txId = requestCtx.getTransactionId();
+ final String gremlin = requestMessage.getGremlin();
+ if (isTransactionBegin(gremlin)) {
+ // If this is a begin transaction request then we need to
create the Transaction ID first since the
+ // dual-transmission expectation means the response header
below should contain it.
+
+ // This prevents accidentally re-opening the underlying
transaction.
+ if (txId != null) throw new
ProcessingException(GremlinError.beginHasTransactionId());
+
+ doBegin(requestCtx);
+ } else if (txId != null) {
+ // This check makes sure that the underlying Graph is
already open to stop a closed transaction
+ // from re-opening due to the default autostart nature of
transactions. This occurs in cases where a
+ // transactional traversal is submitted after a
commit/rollback.
+ final Graph g =
graphManager.getTraversalSource(requestMessage.getField(Tokens.ARGS_G)).getGraph();
+ if ((!g.tx().isOpen())) {
+ throw new
ProcessingException(GremlinError.transactionNotFound(txId));
+ }
+ } else if ((txId == null) && (isTransactionCommit(gremlin) ||
isTransactionRollback(gremlin))) {
+ // Logically, commit/rollback should only be allowed on a
transactional request.
+ throw new
ProcessingException(GremlinError.transactionalControlRequiresTransaction());
}
- responseHeader.headers().set(TRANSFER_ENCODING, CHUNKED);
- responseHeader.headers().set(HttpHeaderNames.CONTENT_TYPE,
serializer.getValue0());
- ctx.writeAndFlush(responseHeader);
- ctx.channel().attr(StateKey.HTTP_RESPONSE_SENT).set(true);
- iterateScriptEvalResult(requestCtx, serializer.getValue1(),
requestMessage);
+ // Send back the 200 OK response header here since the
response is always chunk transfer encoded. Any
+ // failures that follow this will show up in the response body
instead.
+ sendHttpResponse(ctx, OK, createResponseHeaders(ctx,
serializer, requestCtx).toArray(CharSequence[]::new));
+ sendHttpContents(ctx, requestCtx);
+ sendLastHttpContent(ctx, HttpResponseStatus.OK, "");
} catch (Throwable t) {
writeError(requestCtx, formErrorResponseMessage(t,
requestMessage), serializer.getValue1());
} finally {
@@ -240,7 +261,10 @@ public class HttpGremlinEndpointHandler extends
SimpleChannelInboundHandler<Requ
});
try {
- final Future<?> executionFuture =
requestCtx.getGremlinExecutor().getExecutorService().submit(evalFuture);
+ final boolean isBeginTransactionRequest =
isTransactionBegin(requestMessage.getGremlin());
+ final Future<?> executionFuture = ((requestCtx.getTransactionId()
!= null) && !isBeginTransactionRequest) ?
+
transactionManager.get(requestCtx.getTransactionId()).get().submit(evalFuture) :
+
requestCtx.getGremlinExecutor().getExecutorService().submit(evalFuture);
if (seto > 0) {
// Schedule a timeout in the thread pool for future execution
requestCtx.setTimeoutExecutor(requestCtx.getScheduledExecutorService().schedule(()
-> {
@@ -252,6 +276,46 @@ public class HttpGremlinEndpointHandler extends
SimpleChannelInboundHandler<Requ
}
} catch (RejectedExecutionException ree) {
writeError(requestCtx, GremlinError.rateLimiting(),
serializer.getValue1());
+ } catch (NoSuchElementException nsee) {
+ writeError(requestCtx,
GremlinError.transactionNotFound(requestCtx.getTransactionId()),
serializer.getValue1());
+ }
+ }
+
+ private List<CharSequence> createResponseHeaders(final
ChannelHandlerContext ctx,
+ final Pair<String,
MessageSerializer<?>> serializer,
+ final Context requestCtx)
{
+ final List<CharSequence> headers = new ArrayList<>();
+ headers.add(HttpHeaderNames.CONTENT_TYPE);
+ headers.add(serializer.getValue0());
+ if
(acceptsDeflateEncoding(ctx.attr(StateKey.REQUEST_HEADERS).get().getAll(ACCEPT_ENCODING)))
{
+ headers.add(CONTENT_ENCODING);
+ headers.add(DEFLATE);
+ }
+ if (requestCtx.getTransactionId() != null) {
+ headers.add(Tokens.Headers.TRANSACTION_ID);
+ headers.add(requestCtx.getTransactionId());
+ }
+ return headers;
+ }
+
+ private void sendHttpContents(final ChannelHandlerContext ctx, final
Context requestContext) throws Exception {
+ final Pair<String, MessageSerializer<?>> serializer =
ctx.channel().attr(StateKey.SERIALIZER).get();
+ final RequestMessage request = requestContext.getRequestMessage();
+ final String txId = requestContext.getTransactionId();
+ final Optional<UnmanagedTransaction> transaction =
transactionManager.get(txId);
+
+ // Early guard against fake or incorrect transaction IDs.
+ if ((txId != null) && transaction.isEmpty()) throw new
ProcessingException(GremlinError.transactionNotFound(txId));
+
+ if (isTransactionBegin(request.getGremlin())) {
+ runBegin(requestContext, transaction.get(), serializer);
+ } else if (isTransactionCommit(request.getGremlin())) {
+ handleGraphOp(requestContext, txId, Transaction::commit,
serializer);
+ } else if
(isTransactionRollback(requestContext.getRequestMessage().getGremlin())) {
+ handleGraphOp(requestContext, txId, Transaction::rollback,
serializer);
+ } else {
+ // Both transactional and non-transactional traversals follow this
path for response chunking.
+ iterateScriptEvalResult(requestContext, serializer.getValue1(),
request);
}
}
@@ -372,6 +436,72 @@ public class HttpGremlinEndpointHandler extends
SimpleChannelInboundHandler<Requ
}
}
+ /**
+ * Detects if the gremlin script is a transaction begin command.
+ */
+ private boolean isTransactionBegin(final String gremlin) {
+ if (gremlin == null) return false;
+ return gremlin.trim().equalsIgnoreCase("g.tx().begin()");
+ }
+
+ /**
+ * Detects if the gremlin script is a transaction commit command.
+ */
+ private boolean isTransactionCommit(final String gremlin) {
+ if (gremlin == null) return false;
+ return gremlin.trim().equalsIgnoreCase("g.tx().commit()");
+ }
+
+ /**
+ * Detects if the gremlin script is a transaction rollback command.
+ */
+ private boolean isTransactionRollback(final String gremlin) {
+ if (gremlin == null) return false;
+ return gremlin.trim().equalsIgnoreCase("g.tx().rollback()");
+ }
+
+ /**
+ * Handle begin by creating an {@link UnmanagedTransaction} and submitting
the open to its executor.
+ */
+ private void doBegin(final Context ctx) throws Exception {
+ final String traversalSourceName =
ctx.getRequestMessage().getField(Tokens.ARGS_G);
+
+ final UnmanagedTransaction txCtx;
+ try {
+ txCtx = transactionManager.create(traversalSourceName);
+ ctx.setTransactionId(txCtx.getTransactionId());
+ final Graph graph =
graphManager.getTraversalSource(traversalSourceName).getGraph();
+ txCtx.submit(new FutureTask<>(() -> {
+ graph.tx().open();
+ return null;
+ })).get(5000, TimeUnit.MILLISECONDS); // Not an option for now,
but 5s should be plenty.
+ } catch (IllegalStateException ise) {
+ throw new
ProcessingException(GremlinError.maxTransactionsExceeded(ise.getMessage()));
+ } catch (IllegalArgumentException iae) {
+ throw new
ProcessingException(GremlinError.binding(traversalSourceName));
+ } catch (UnsupportedOperationException uoe) {
+ throw new
ProcessingException(GremlinError.transactionNotSupported(uoe));
+ } catch (ExecutionException | TimeoutException e) {
+ throw new
ProcessingException(GremlinError.transactionUnableToStart(e.getMessage()));
+ }
+ }
+
+ private void runBegin(final Context ctx, UnmanagedTransaction tx, final
Pair<String, MessageSerializer<?>> serializer) throws Exception {
+ final ByteBuf chunk = makeChunk(ctx, serializer.getValue1(),
List.of(Map.of(Tokens.ARGS_TRANSACTION_ID, tx.getTransactionId())), false,
false);
+ ctx.getChannelHandlerContext().writeAndFlush(new
DefaultHttpContent(chunk));
+ }
+
+ private void handleGraphOp(final Context ctx,
+ final String transactionId,
+ final Consumer<Transaction> graphOp,
+ final Pair<String, MessageSerializer<?>>
serializer) throws Exception {
+ final Graph graph =
graphManager.getTraversalSource(ctx.getRequestMessage().getField(Tokens.ARGS_G)).getGraph();
+ graphOp.accept(graph.tx());
+ transactionManager.destroy(transactionId);
+ final ByteBuf chunk = makeChunk(ctx, serializer.getValue1(),
List.of(Map.of(Tokens.ARGS_TRANSACTION_ID, transactionId)), false, false);
+ ctx.getChannelHandlerContext().writeAndFlush(new
DefaultHttpContent(chunk));
+ }
+
private Bindings mergeBindingsFromRequest(final Context ctx, final
Bindings bindings) throws ProcessingException {
// alias any global bindings to a different variable.
final RequestMessage msg = ctx.getRequestMessage();
@@ -431,7 +561,6 @@ public class HttpGremlinEndpointHandler extends
SimpleChannelInboundHandler<Requ
// it needs to be released here
if (chunk != null) chunk.release();
}
- sendTrailingHeaders(nettyContext, HttpResponseStatus.OK, "");
return;
}
@@ -518,10 +647,6 @@ public class HttpGremlinEndpointHandler extends
SimpleChannelInboundHandler<Requ
}
nettyContext.writeAndFlush(new DefaultHttpContent(chunk));
-
- if (!hasMore) {
- sendTrailingHeaders(nettyContext,
HttpResponseStatus.OK, "");
- }
}
} else {
final long currentTime = System.currentTimeMillis();
diff --git
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
index 1e84e35dde..f0f6b652c0 100644
---
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
+++
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
@@ -106,16 +106,14 @@ public class HttpHandlerUtil {
context.setRequestState(HttpGremlinEndpointHandler.RequestState.ERROR);
if (!ctx.channel().attr(StateKey.HTTP_RESPONSE_SENT).get()) {
- final HttpResponse responseHeader = new
DefaultHttpResponse(HTTP_1_1, responseMessage.getStatus().getCode());
- responseHeader.headers().set(TRANSFER_ENCODING, CHUNKED); //
Set this to make it "keep alive" eligible.
- responseHeader.headers().set(HttpHeaderNames.CONTENT_TYPE,
ctx.channel().attr(StateKey.SERIALIZER).get().getValue0());
- ctx.writeAndFlush(responseHeader);
- ctx.channel().attr(StateKey.HTTP_RESPONSE_SENT).set(true);
+ sendHttpResponse(ctx,
+ responseMessage.getStatus().getCode(),
+ HttpHeaderNames.CONTENT_TYPE,
ctx.channel().attr(StateKey.SERIALIZER).get().getValue0());
}
ctx.writeAndFlush(new DefaultHttpContent(ByteBuf));
- sendTrailingHeaders(ctx, responseMessage.getStatus().getCode(),
responseMessage.getStatus().getException());
+ sendLastHttpContent(ctx, responseMessage.getStatus().getCode(),
responseMessage.getStatus().getException());
} catch (SerializationException se) {
logger.warn("Unable to serialize ResponseMessage: {} ",
responseMessage);
}
@@ -147,7 +145,8 @@ public class HttpHandlerUtil {
* @param statusCode The status code to include in the trailers.
* @param exceptionType The type of exception to include in the trailers.
Leave blank or null if no error occurred.
*/
- static void sendTrailingHeaders(final ChannelHandlerContext ctx, final
HttpResponseStatus statusCode, final String exceptionType) {
+ static void sendLastHttpContent(final ChannelHandlerContext ctx, final
HttpResponseStatus statusCode, final String exceptionType) {
+ // TODO: this might be not sent if exception occurs early so HTTP not
properly terminated
final DefaultLastHttpContent defaultLastHttpContent = new
DefaultLastHttpContent();
defaultLastHttpContent.trailingHeaders().add(SerTokens.TOKEN_CODE,
statusCode.code());
if (exceptionType != null && !exceptionType.isEmpty()) {
@@ -156,4 +155,26 @@ public class HttpHandlerUtil {
ctx.writeAndFlush(defaultLastHttpContent);
}
+
+ /**
+ * Sends the initial HTTP response header with the given status and
optional header pairs.
+ * Also marks the channel as having sent a response. Headers must be
provided as alternating
+ * name/value pairs (e.g. {@code CONTENT_TYPE, "application/json"}).
+ *
+ * @param ctx The netty channel context.
+ * @param status The HTTP status code for the response.
+ * @param headers Alternating header name/value pairs to set on the
response.
+ * @throws IllegalArgumentException if headers length is not even
+ */
+ static void sendHttpResponse(final ChannelHandlerContext ctx, final
HttpResponseStatus status, final CharSequence... headers) {
+ if ((headers.length%2) != 0) throw new
IllegalArgumentException("Headers should come in pairs.");
+
+ final HttpResponse responseHeader = new DefaultHttpResponse(HTTP_1_1,
status);
+ responseHeader.headers().set(TRANSFER_ENCODING, CHUNKED);
+ for (int i=0; i<headers.length; i+=2) {
+ responseHeader.headers().set(headers[i], headers[i+1]);
+ }
+ ctx.writeAndFlush(responseHeader);
+ ctx.channel().attr(StateKey.HTTP_RESPONSE_SENT).set(true);
+ }
}
diff --git
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestMessageDecoder.java
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestMessageDecoder.java
index a220b40106..4501697475 100644
---
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestMessageDecoder.java
+++
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpRequestMessageDecoder.java
@@ -160,12 +160,24 @@ public class HttpRequestMessageDecoder extends
MessageToMessageDecoder<FullHttpR
final ByteBuf buffer = request.content();
try {
- return serializer.deserializeBinaryRequest(buffer);
+ return
conditionallyInsertDefaultG(serializer.deserializeBinaryRequest(buffer));
} catch (Exception e) {
throw new SerializationException("Unable to deserialize
request using: " + serializer.getClass().getSimpleName(), e);
}
}
- return getRequestMessageFromHttpRequest(request);
+ return
conditionallyInsertDefaultG(getRequestMessageFromHttpRequest(request));
+ }
+
+ private RequestMessage conditionallyInsertDefaultG(final RequestMessage
requestMessage) {
+ // The RequestMessage should default ARGS_G to "g" according to the
HTTP API. However, this won't actually work
+ // if the language is gremlin-groovy since that allows any statement.
So, only add in a default if the language
+ // is gremlin-lang.
+ RequestMessage fixedRequestMessage = requestMessage;
+ if ((requestMessage.getFieldOrDefault(Tokens.ARGS_LANGUAGE,
"").equals("gremlin-lang")) &&
+ (requestMessage.getField(Tokens.ARGS_G) == null)) {
+ fixedRequestMessage =
RequestMessage.from(requestMessage).addG("g").create();
+ }
+ return fixedRequestMessage;
}
private RequestMessage getRequestMessageFromHttpRequest(final
FullHttpRequest request) {
@@ -205,6 +217,9 @@ public class HttpRequestMessageDecoder extends
MessageToMessageDecoder<FullHttpR
final JsonNode matPropsNode =
body.get(Tokens.ARGS_MATERIALIZE_PROPERTIES);
if (null != matPropsNode)
builder.addMaterializeProperties(matPropsNode.asText());
+ final JsonNode txIdNode = body.get(Tokens.ARGS_TRANSACTION_ID);
+ if (null != txIdNode) builder.addTransactionId(txIdNode.asText());
+
return builder.create();
}
diff --git
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/TransactionManager.java
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/TransactionManager.java
new file mode 100644
index 0000000000..3408a581b9
--- /dev/null
+++
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/TransactionManager.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
+import org.apache.tinkerpop.gremlin.server.GraphManager;
+import org.apache.tinkerpop.gremlin.server.GremlinServer;
+import org.apache.tinkerpop.gremlin.server.util.MetricManager;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static com.codahale.metrics.MetricRegistry.name;
+
+/**
+ * Tracks active transactions and returns references of them.
+ */
+public class TransactionManager {
+ private static final Logger logger =
LoggerFactory.getLogger(TransactionManager.class);
+
+ private final ConcurrentMap<String, UnmanagedTransaction> transactions =
new ConcurrentHashMap<>();
+ private final ScheduledExecutorService scheduledExecutorService;
+ private final GraphManager graphManager;
+ private final long transactionTimeoutMs;
+ private final int maxConcurrentTransactions;
+ private final long perGraphCloseMs;
+
+ /**
+ * Creates a new TransactionManager with the specified configuration.
+ *
+ * @param scheduledExecutorService Scheduler for timeout management
+ * @param graphManager The graph manager for accessing traversal sources
+ * @param transactionTimeoutMs Timeout in milliseconds before auto-rollback
+ * @param maxConcurrentTransactions Maximum number of concurrent
transactions allowed
+ */
+ public TransactionManager(final ScheduledExecutorService
scheduledExecutorService,
+ final GraphManager graphManager,
+ final long transactionTimeoutMs,
+ final int maxConcurrentTransactions,
+ final long perGraphCloseMs) {
+ this.scheduledExecutorService = scheduledExecutorService;
+ this.graphManager = graphManager;
+ this.transactionTimeoutMs = transactionTimeoutMs;
+ this.maxConcurrentTransactions = maxConcurrentTransactions;
+ this.perGraphCloseMs = perGraphCloseMs;
+
+ MetricManager.INSTANCE.getGauge(transactions::size,
name(GremlinServer.class, "transactions"));
+ logger.info("TransactionManager initialized with timeout={}ms,
maxTransactions={}",
+ transactionTimeoutMs, maxConcurrentTransactions);
+ }
+
+ /**
+ * Creates a new {@link UnmanagedTransaction} for the specified traversal
source.
+ *
+ * @param traversalSourceName The traversal source alias (e.g., "g")
+ * @return The new {@link UnmanagedTransaction}, ready for task submission
+ * @throws IllegalStateException if max transactions exceeded
+ * @throws IllegalArgumentException if traversal source not found
+ * @throws UnsupportedOperationException if the graph does not support
transactions
+ */
+ public UnmanagedTransaction create(final String traversalSourceName) {
+ if (transactions.size() >= maxConcurrentTransactions) {
+ throw new IllegalStateException(
+ "Maximum concurrent transactions exceeded (" +
maxConcurrentTransactions + ")");
+ }
+
+ final TraversalSource ts =
graphManager.getTraversalSource(traversalSourceName);
+ if (ts == null) {
+ throw new IllegalArgumentException("Traversal source not found: "
+ traversalSourceName);
+ } else if (!ts.getGraph().features().graph().supportsTransactions()) {
+ throw Graph.Exceptions.transactionsNotSupported();
+ }
+
+ final UnmanagedTransaction txCtx =
createTransactionContext(ts.getGraph());
+ logger.debug("Transaction {} created for source {}",
txCtx.getTransactionId(), traversalSourceName);
+ return txCtx;
+ }
+
+ /**
+ * Removes a transaction from the active transactions map. Called when a
transaction is
+ * committed, rolled back, or otherwise closed.
+ *
+ * @param id The transaction ID to remove
+ */
+ public void destroy(final String id) {
+ transactions.remove(id);
+ }
+
+ /**
+ * Creates a unique transaction ID, retrying on the unlikely UUID
collision. The newly created
+ * {@link UnmanagedTransaction} is inserted into the transactions map.
+ */
+ private UnmanagedTransaction createTransactionContext(final Graph graph) {
+ String txId;
+ UnmanagedTransaction ctx;
+
+ do {
+ txId = UUID.randomUUID().toString();
+ ctx = new UnmanagedTransaction(
+ txId,
+ this,
+ graph,
+ scheduledExecutorService,
+ transactionTimeoutMs,
+ perGraphCloseMs
+ );
+ } while (transactions.putIfAbsent(txId, ctx) != null);
+
+ return ctx;
+ }
+
+ /**
+ * Gets an existing {@link UnmanagedTransaction} by ID.
+ *
+ * @param transactionId The transaction ID to look up
+ * @return Optional containing the {@link UnmanagedTransaction} if found,
empty otherwise
+ */
+ public Optional<UnmanagedTransaction> get(final String transactionId) {
+ if (null == transactionId) return Optional.empty(); // Prevent NPE
from calling get(null) on ConcurrentHashMap
+ return Optional.ofNullable(transactions.get(transactionId));
+ }
+
+ /**
+ * Returns the number of currently active transactions.
+ *
+ * @return the count of active transactions
+ */
+ public int getActiveTransactionCount() {
+ return transactions.size();
+ }
+
+ /**
+ * Shuts down the transaction manager, rolling back all active
transactions.
+ * <p>
+ * This method should be called during server shutdown to ensure all
transactions
+ * are properly cleaned up. It blocks until all rollbacks complete.
+ */
+ public void shutdown() {
+ final int activeCount = transactions.size();
+ logger.info("Shutting down TransactionManager with {} active
transactions", activeCount);
+
+ if (activeCount == 0) return;
+
+ // Roll back all active transactions
+ transactions.values().forEach(transaction -> {
+ try {
+ transaction.close(false);
+ } catch (Exception e) {
+ logger.warn("Error rolling back transaction {} during
shutdown: {}",
+ transaction.getTransactionId(), e.getMessage());
+ }
+ });
+
+ transactions.clear();
+ logger.info("TransactionManager shutdown complete");
+ }
+}
diff --git
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnmanagedTransaction.java
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnmanagedTransaction.java
new file mode 100644
index 0000000000..ad8d0e3baf
--- /dev/null
+++
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/UnmanagedTransaction.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Maintains state for an active transaction over HTTP.
+ * <p>
+ * Key design principle: Graph transactions are ThreadLocal-bound, so all
operations
+ * for a transaction must execute on the same thread. This is achieved via a
+ * single-threaded executor. Callers submit {@link FutureTask} instances that
contain
+ * the complete request lifecycle (graph operation, error handling, response
writing),
+ * following the same pattern as the non-transactional HTTP path and the legacy
+ * {@code SessionOpProcessor}.
+ */
+public class UnmanagedTransaction {
+ private static final Logger logger =
LoggerFactory.getLogger(UnmanagedTransaction.class);
+
+ private final String transactionId;
+ private final TransactionManager manager;
+ private final Graph graph;
+ private final ScheduledExecutorService scheduledExecutorService;
+ private final long timeout;
+ private final long perGraphClose;
+ private final AtomicReference<ScheduledFuture<?>> timeoutFuture = new
AtomicReference<>();
+ // Controls whether the executor is still accepting tasks.
+ private final AtomicBoolean accepting = new AtomicBoolean(true);
+ /**
+ * Single-threaded executor ensures all operations for this transaction
run on
+ * the same thread, preserving the ThreadLocal nature of Graph
transactions.
+ */
+ private final ExecutorService executor;
+
+ /**
+ * Creates a new {@code UnmanagedTransaction} for managing an HTTP
transaction.
+ *
+ * @param transactionId The unique identifier for this transaction
+ * @param transactionManager The manager that owns this transaction's
lifecycle
+ * @param graph The graph instance for this transaction
+ * @param scheduledExecutorService Scheduler for timeout management
+ * @param transactionTimeout Timeout in milliseconds before auto-rollback
+ */
+ public UnmanagedTransaction(final String transactionId,
+ final TransactionManager transactionManager,
+ final Graph graph,
+ final ScheduledExecutorService
scheduledExecutorService,
+ final long transactionTimeout,
+ final long perGraphClose) {
+ logger.debug("New transaction context established for {}",
transactionId);
+ this.transactionId = transactionId;
+ this.manager = transactionManager;
+ this.graph = graph;
+ this.scheduledExecutorService = scheduledExecutorService;
+ this.timeout = transactionTimeout;
+ this.perGraphClose = perGraphClose;
+
+ // Create single-threaded executor with named thread for debugging
+ this.executor = Executors.newSingleThreadExecutor(
+ r -> new Thread(r, "tx-" + transactionId.substring(0, Math.min(8,
transactionId.length()))));
+ }
+
+ /**
+ * Returns the transaction ID.
+ */
+ public String getTransactionId() {
+ return transactionId;
+ }
+
+ /**
+ * Resets the timeout for this transaction. Called on each request.
+ */
+ public void touch() {
+ timeoutFuture.updateAndGet(future -> {
+ if (future != null) future.cancel(false);
+ return scheduledExecutorService.schedule(() -> {
+ logger.info("Transaction {} timed out after {} ms of
inactivity", transactionId, timeout);
+ close(false);
+ }, timeout, TimeUnit.MILLISECONDS);
+ });
+ }
+
+ /**
+ * Opens the underlying graph transaction and starts the inactivity
timeout.
+ * Should be called on the transaction's single-threaded executor to
preserve
+ * ThreadLocal affinity. On failure the exception is re-thrown and the
caller
+ * is responsible for cleanup (e.g. via {@link #close(boolean)}).
+ */
+ public void open() {
+ try {
+ graph.tx().open();
+ touch();
+ logger.debug("Transaction {} opened", transactionId);
+ } catch (Exception e) {
+ logger.warn("Failed to begin transaction {}: {}", transactionId,
e.getMessage());
+ throw e;
+ }
+ }
+
+ /**
+ * Closes this transaction and releases its resources. When {@code force}
is {@code false},
+ * any open graph transaction is rolled back before shutdown. When {@code
force} is {@code true},
+ * the executor is shut down immediately without attempting a rollback.
+ *
+ * @param force if {@code true}, skip the rollback attempt and shut down
immediately
+ */
+ public synchronized void close(boolean force) {
+ accepting.set(false);
+
+ // if the transaction has already been removed then there's no need to
do this process again. it's possible
+ // for this to be called at roughly the same time. this prevents
close() from being called more than once.
+ if (manager.get(transactionId).isEmpty()) return;
+
+ if (!force) {
+ // when not "forced", an open transaction should be rolled back
+ try {
+ executor.submit(() -> {
+ if (graph.tx().isOpen()) {
+ logger.debug("Rolling back open transaction on {}",
transactionId);
+ graph.tx().rollback();
+ }
+ }).get(perGraphClose, TimeUnit.MILLISECONDS);
+ } catch (Exception ex) {
+ logger.warn(String.format("An error occurred while attempting
rollback on %s ", transactionId), ex);
+ }
+ }
+
+ // prevent any additional requests from processing. if the kill was
not "forced" then jobs were scheduled to
+ // try to rollback open transactions. those jobs either timed-out or
completed successfully. either way, no
+ // additional jobs will be allowed, running jobs will be cancelled (if
possible) and any scheduled jobs will
+ // be cancelled
+ executor.shutdownNow();
+ manager.destroy(transactionId);
+ Optional.ofNullable(timeoutFuture.get()).ifPresent(f ->
f.cancel(true));
+ logger.debug("Transaction {} closed", transactionId);
+ }
+
+ /**
+ * Submits a task to be executed within this transaction's thread context.
+ * The task should contain the complete request lifecycle: graph operation,
+ * error handling, and response writing.
+ *
+ * @param task The FutureTask to execute on the transaction thread
+ * @return Future that can be used for timeout cancellation
+ * @throws IllegalStateException if the transaction is closed
+ */
+ public Future<?> submit(final FutureTask<Void> task) {
+ if (!accepting.get()) throw new IllegalStateException("Transaction " +
transactionId + " is closed");
+
+ touch();
+ return executor.submit(task);
+ }
+}
diff --git
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/GremlinError.java
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/GremlinError.java
index c6bbef3ae9..3014438abd 100644
---
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/GremlinError.java
+++
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/GremlinError.java
@@ -144,4 +144,92 @@ public class GremlinError {
final String message = (t.getMessage() == null) ? t.toString() :
t.getMessage();
return new GremlinError(HttpResponseStatus.INTERNAL_SERVER_ERROR,
message, "ServerErrorException");
}
+
+ /**
+ * Creates an error for when a transaction is not found on the server.
+ * This typically occurs when:
+ * <ul>
+ * <li>The transaction ID was never registered (client didn't call
begin)</li>
+ * <li>The transaction timed out and was automatically rolled back</li>
+ * <li>The transaction was already committed or rolled back</li>
+ * </ul>
+ *
+ * @param transactionId The transaction ID that was not found
+ * @return A GremlinError with appropriate message and status code
+ */
+ public static GremlinError transactionNotFound(final String transactionId)
{
+ final String message = String.format(
+ "Transaction not found: %s. The transaction may have timed out,
already been committed/rolled back, " +
+ "or was never started. Call g.tx().begin() to start a new
transaction.", transactionId);
+ return new GremlinError(HttpResponseStatus.NOT_FOUND, message,
"TransactionException");
+ }
+
+ /**
+ * Creates an error for when commit or rollback is sent without a
transaction ID.
+ *
+ * @return A GremlinError with appropriate message and status code
+ */
+ public static GremlinError transactionalControlRequiresTransaction() {
+ final String message = "g.tx().commit() and g.tx().rollback() are only
allowed in transactional requests.";
+ return new GremlinError(HttpResponseStatus.BAD_REQUEST, message,
"TransactionException");
+ }
+
+ /**
+ * Creates an error for when a begin request is sent with a user-supplied
transaction ID.
+ * The server generates transaction IDs; clients should not provide them
on begin.
+ *
+ * @return A GremlinError with appropriate message and status code
+ */
+ public static GremlinError beginHasTransactionId() {
+ final String message = "Begin transaction request cannot have a
user-supplied transactionId";
+ return new GremlinError(HttpResponseStatus.BAD_REQUEST, message,
"TransactionException");
+ }
+
+ /**
+ * Creates an error for when the maximum number of concurrent transactions
is exceeded.
+ *
+ * @param exceededErrorMessage The error message containing a maximum
number of concurrent transactions
+ * @return A GremlinError with appropriate message and status code
+ */
+ public static GremlinError maxTransactionsExceeded(String
exceededErrorMessage) {
+ final String message = exceededErrorMessage +
+ " The server has reached its transaction limit. " +
+ "Please wait for existing transactions to complete or increase
the server's maxConcurrentTransactions setting.";
+ return new GremlinError(HttpResponseStatus.SERVICE_UNAVAILABLE,
message, "TransactionException");
+ }
+
+ /**
+ * Creates an error for when a transaction operation times out.
+ *
+ * @param transactionId The transaction ID that timed out
+ * @param operation The operation that timed out (e.g., "commit",
"rollback", "execute")
+ * @return A GremlinError with appropriate message and status code
+ */
+ public static GremlinError transactionTimeout(final String transactionId,
final String operation) {
+ final String message = String.format(
+ "Transaction %s timed out during %s operation. The transaction has
been rolled back. " +
+ "Consider increasing the transaction timeout or breaking the
operation into smaller parts.",
+ transactionId, operation);
+ return new GremlinError(HttpResponseStatus.GATEWAY_TIMEOUT, message,
"TransactionException");
+ }
+
+ /**
+ * Creates an error for when the requested graph does not support
transactions.
+ *
+ * @param uoe The exception stating that transactions aren't supported
+ * @return A GremlinError with appropriate message and status code
+ */
+ public static GremlinError transactionNotSupported(final
UnsupportedOperationException uoe) {
+ return new GremlinError(HttpResponseStatus.BAD_REQUEST,
uoe.getMessage(), "TransactionException");
+ }
+
+ /**
+ * Creates an error for when the transaction couldn't begin.
+ *
+ * @param message The error message (likely from a TransactionException)
+ * @return A GremlinError with appropriate message and status code
+ */
+ public static GremlinError transactionUnableToStart(final String message) {
+ return new GremlinError(HttpResponseStatus.INTERNAL_SERVER_ERROR,
message, "TransactionException");
+ }
}
diff --git
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java
index 97a2c8ee4a..dce5fb931a 100644
---
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java
+++
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java
@@ -29,6 +29,7 @@ import org.apache.tinkerpop.gremlin.server.Channelizer;
import org.apache.tinkerpop.gremlin.server.GraphManager;
import org.apache.tinkerpop.gremlin.server.GremlinServer;
import org.apache.tinkerpop.gremlin.server.Settings;
+import org.apache.tinkerpop.gremlin.server.handler.TransactionManager;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,6 +66,7 @@ public class ServerGremlinExecutor {
private static final Logger logger =
LoggerFactory.getLogger(ServerGremlinExecutor.class);
private final GraphManager graphManager;
+ private final TransactionManager transactionManager;
private final Settings settings;
private final List<LifeCycleHook> hooks;
@@ -197,6 +199,14 @@ public class ServerGremlinExecutor {
.filter(kv -> kv.getValue() instanceof LifeCycleHook)
.map(kv -> (LifeCycleHook) kv.getValue())
.collect(Collectors.toList());
+
+ transactionManager = new TransactionManager(
+ scheduledExecutorService,
+ graphManager,
+ settings.transactionTimeout,
+ settings.maxConcurrentTransactions,
+ settings.perGraphCloseTimeout
+ );
}
private void registerMetrics(final String engineName) {
@@ -236,6 +246,10 @@ public class ServerGremlinExecutor {
return graphManager;
}
+ public TransactionManager getTransactionManager() {
+ return transactionManager;
+ }
+
public Settings getSettings() {
return settings;
}
diff --git
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java
new file mode 100644
index 0000000000..fef1241c44
--- /dev/null
+++
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java
@@ -0,0 +1,753 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import org.apache.http.Consts;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer;
+import org.apache.tinkerpop.gremlin.server.handler.TransactionManager;
+import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens;
+import org.apache.tinkerpop.gremlin.util.Tokens;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4;
+import org.apache.tinkerpop.gremlin.util.ser.GraphSONMessageSerializerV4;
+import org.apache.tinkerpop.gremlin.util.ser.Serializers;
+import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
+import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.tinkerpop.gremlin.util.ser.SerTokens.TOKEN_DATA;
+import static org.apache.tinkerpop.gremlin.util.ser.SerTokens.TOKEN_RESULT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Server-side integration tests for HTTP transaction protocol.
+ *
+ * These tests bypass the driver entirely and use a raw Apache HTTP client to
hit the server's HTTP endpoint directly.
+ * This validates that the server returns the correct status codes and error
messages independent of any client-side
+ * guards.
+ */
+public class GremlinServerHttpTransactionIntegrateTest extends
AbstractGremlinServerIntegrationTest {
+ private final String GTX = "gtx";
+ private final ObjectMapper mapper = new ObjectMapper();
+ private CloseableHttpClient client;
+
+ @Before
+ public void createHttpClient() {
+ client = HttpClients.createDefault();
+ }
+
+ @After
+ public void closeHttpClient() throws Exception {
+ client.close();
+ }
+
+ @Override
+ public Settings overrideSettings(final Settings settings) {
+ settings.channelizer = HttpChannelizer.class.getName();
+ final String nameOfTest = name.getMethodName();
+ switch (nameOfTest) {
+ case "shouldRejectRequestWhenMaxConcurrentTransactionsExceeded":
+ settings.maxConcurrentTransactions = 1;
+ break;
+ case "shouldTimeoutFreeSlotUnderMaxConcurrentTransactions":
+ settings.maxConcurrentTransactions = 1;
+ settings.transactionTimeout = 1000;
+ break;
+ case "shouldTimeoutIdleTransactionWithNoOperations":
+ settings.transactionTimeout = 1;
+ break;
+ case "shouldTimeoutAndRejectLateCommit":
+ case "shouldTrackTransactionCountAccurately":
+ settings.transactionTimeout = 1000;
+ break;
+ case "shouldRollbackAbandonedTransaction":
+ settings.transactionTimeout = 300;
+ break;
+ }
+ return settings;
+ }
+
+ /**
+ * Sends a JSON POST request and returns the response. Caller must close
the response.
+ */
+ private CloseableHttpResponse postJson(final CloseableHttpClient client,
final String json) throws Exception {
+ final HttpPost post = new
HttpPost(TestClientFactory.createURLString());
+ post.addHeader("Content-Type", "application/json");
+ post.setEntity(new StringEntity(json, Consts.UTF_8));
+ return client.execute(post);
+ }
+
+ /**
+ * Sends a begin transaction request for the given graph alias and returns
the server-generated transaction ID.
+ */
+ private String beginTx(final CloseableHttpClient client, final String
graphAlias) throws Exception {
+ try (final CloseableHttpResponse response = postJson(client,
+ "{\"gremlin\":\"g.tx().begin()\",\"g\":\"" + graphAlias +
"\"}")) {
+ assertEquals(200, response.getStatusLine().getStatusCode());
+ final String txIdHeader =
response.getFirstHeader(Tokens.Headers.TRANSACTION_ID).getValue();
+ final String json = EntityUtils.toString(response.getEntity());
+ final JsonNode node = mapper.readTree(json);
+ final String txIdBody = node.get(TOKEN_RESULT).
+ get(TOKEN_DATA).
+ get(GraphSONTokens.VALUEPROP).get(0).
+ get(GraphSONTokens.VALUEPROP).get(1).
+ asText();
+ assertNotNull(txIdHeader);
+ assertEquals(txIdHeader, txIdBody);
+ return txIdHeader;
+ }
+ }
+
+ /**
+ * Sends a traversal within an existing transaction.
+ */
+ private CloseableHttpResponse submitInTx(final CloseableHttpClient client,
+ final String txId,
+ final String gremlin,
+ final String graphAlias)
+ throws Exception {
+ return postJson(client,
+ "{\"gremlin\":\"" + gremlin + "\",\"g\":\"" + graphAlias +
"\",\"transactionId\":\"" + txId + "\"}");
+ }
+
+ /**
+ * Sends a commit for an existing transaction.
+ */
+ private CloseableHttpResponse commitTx(final CloseableHttpClient client,
+ final String txId, final String
graphAlias) throws Exception {
+ return postJson(client,
+ "{\"gremlin\":\"g.tx().commit()\",\"g\":\"" + graphAlias +
"\",\"transactionId\":\"" + txId + "\"}");
+ }
+
+ /**
+ * Sends a rollback for an existing transaction.
+ */
+ private CloseableHttpResponse rollbackTx(final CloseableHttpClient client,
+ final String txId, final String
graphAlias) throws Exception {
+ return postJson(client,
+ "{\"gremlin\":\"g.tx().rollback()\",\"g\":\"" + graphAlias +
"\",\"transactionId\":\"" + txId + "\"}");
+ }
+
+ /**
+ * Sends a non-transactional traversal (no transactionId).
+ */
+ private CloseableHttpResponse submitNonTx(final CloseableHttpClient client,
+ final String gremlin, final
String graphAlias) throws Exception {
+ return postJson(client,
+ "{\"gremlin\":\"" + gremlin + "\",\"g\":\"" + graphAlias +
"\"}");
+ }
+
+ /**
+ * Extracts the integer count from a typical count() response.
+ */
+ private int extractCount(final CloseableHttpResponse response) throws
Exception {
+ final String json = EntityUtils.toString(response.getEntity());
+ final JsonNode node = mapper.readTree(json);
+ return node.get("result").get(TOKEN_DATA)
+ .get(GraphSONTokens.VALUEPROP).get(0)
+ .get(GraphSONTokens.VALUEPROP).intValue();
+ }
+
+ /**
+ * Extracts the status message from the response body's status field.
+ */
+ private String extractStatusMessage(final CloseableHttpResponse response)
throws Exception {
+ final String json = EntityUtils.toString(response.getEntity());
+ final JsonNode node = mapper.readTree(json);
+ return node.get("status").get("message").asText();
+ }
+
+ @Test
+ public void shouldNotBeginTransactionWithUserProvidedId() throws Exception
{
+ final String txId = beginTx(client, GTX);
+
+ try (final CloseableHttpResponse r = postJson(client,
+ "{\"gremlin\":\"g.tx().begin()\",\"g\":\"" + GTX +
"\",\"transactionId\":\"" + txId + "\"}")) {
+ // Depending on whether the transaction is still open on the
server when the second request arrives, there
+ // may be two different errors that the server throws.
+ assertTrue(r.getStatusLine().getStatusCode() == 404 ||
r.getStatusLine().getStatusCode() == 400);
+ final String msg = extractStatusMessage(r);
+ assertTrue(msg.contains("Begin transaction request cannot have a
user-supplied transactionId") ||
+ msg.contains("Transaction not found"));
+ }
+ }
+
+ @Test
+ public void shouldReturn404ForInvalidCommit() throws Exception {
+ // Can't commit on non-existent transaction.
+ try (final CloseableHttpResponse r = commitTx(client, "fakeId", GTX)) {
+ assertEquals(404, r.getStatusLine().getStatusCode());
+ final String msg = extractStatusMessage(r);
+ assertTrue(msg.contains("Transaction not found"));
+ }
+
+ final String txId = beginTx(client, GTX);
+
+ try (final CloseableHttpResponse r = submitInTx(client, txId,
"g.addV('test')", GTX)) {
+ assertEquals(200, r.getStatusLine().getStatusCode());
+ }
+ try (final CloseableHttpResponse r = commitTx(client, txId, GTX)) {
+ assertEquals(200, r.getStatusLine().getStatusCode());
+ }
+
+ // #10: submit traversal on committed tx -> 404
+ try (final CloseableHttpResponse r = submitInTx(client, txId,
"g.V().count()", GTX)) {
+ assertEquals(404, r.getStatusLine().getStatusCode());
+ final String msg = extractStatusMessage(r);
+ assertTrue(msg.contains("Transaction not found"));
+ }
+
+ // #13: commit again on committed tx -> 404
+ try (final CloseableHttpResponse r = commitTx(client, txId, GTX)) {
+ assertEquals(404, r.getStatusLine().getStatusCode());
+ final String msg = extractStatusMessage(r);
+ assertTrue(msg.contains("Transaction not found"));
+ }
+ }
+
+ @Test
+ public void shouldReturn404ForInvalidRollback() throws Exception {
+ // Can't rollback a non-existent transaction.
+ try (final CloseableHttpResponse r = rollbackTx(client, "fakeId",
GTX)) {
+ assertEquals(404, r.getStatusLine().getStatusCode());
+ final String msg = extractStatusMessage(r);
+ assertTrue(msg.contains("Transaction not found"));
+ }
+
+ final String txId = beginTx(client, GTX);
+
+ // add a vertex and rollback
+ try (final CloseableHttpResponse r = submitInTx(client, txId,
"g.addV('test')", GTX)) {
+ assertEquals(200, r.getStatusLine().getStatusCode());
+ }
+ try (final CloseableHttpResponse r = rollbackTx(client, txId, GTX)) {
+ assertEquals(200, r.getStatusLine().getStatusCode());
+ }
+
+ // #11: submit traversal on rolled-back tx -> 404
+ try (final CloseableHttpResponse r = submitInTx(client, txId,
"g.V().count()", GTX)) {
+ final String msg = extractStatusMessage(r);
+ assertEquals(404, r.getStatusLine().getStatusCode());
+ assertTrue(msg.contains("Transaction not found"));
+ }
+
+ // #14: rollback again on rolled-back tx -> 404
+ try (final CloseableHttpResponse r = rollbackTx(client, txId, GTX)) {
+ assertEquals(404, r.getStatusLine().getStatusCode());
+ final String msg = extractStatusMessage(r);
+ assertTrue(msg.contains("Transaction not found"));
+ }
+ }
+
+ @Test
+ public void shouldReturnValidTransactionId() throws Exception {
+ // #20: begin returns a valid transaction ID
+ final String txId1 = beginTx(client, GTX);
+ assertNotNull(txId1);
+ assertFalse(txId1.isBlank());
+
+ // #27: second begin returns a different ID
+ final String txId2 = beginTx(client, GTX);
+ assertNotNull(txId2);
+ assertFalse(txId2.isBlank());
+ assertNotEquals(txId1, txId2);
+ }
+
+ @Test
+ public void shouldReturn404ForInvalidTransactionId() throws Exception {
+ final String fakeTxId = UUID.randomUUID().toString();
+
+ try (final CloseableHttpResponse r = submitInTx(client, fakeTxId,
"g.V().count()", GTX)) {
+ assertEquals(404, r.getStatusLine().getStatusCode());
+ final String msg = extractStatusMessage(r);
+ assertTrue(msg.contains("Transaction not found"));
+ }
+ }
+
+ @Test
+ public void shouldRejectRequestWhenMaxConcurrentTransactionsExceeded()
throws Exception {
+ // open one transaction (fills the limit)
+ beginTx(client, GTX);
+
+ // try to open another -- should fail with 503 (SERVICE_UNAVAILABLE)
+ try (final CloseableHttpResponse r = postJson(client,
+ "{\"gremlin\":\"g.tx().begin()\",\"g\":\"" + GTX + "\"}")) {
+ assertEquals(503, r.getStatusLine().getStatusCode());
+ final String msg = extractStatusMessage(r);
+ assertTrue(msg.contains("Maximum concurrent transactions
exceeded"));
+ }
+ }
+
+ @Test
+ public void shouldRejectEmptyTransactionId() throws Exception {
+ try (final CloseableHttpResponse r = submitInTx(client, "invalid",
"g.tx().begin()", GTX)) {
+ assertEquals(400, r.getStatusLine().getStatusCode());
+ final String msg = extractStatusMessage(r);
+ assertTrue(msg.contains("Begin transaction request cannot have a
user-supplied transactionId"));
+ }
+ }
+
+ @Test
+ public void shouldTimeoutIdleTransactionWithNoOperations() throws
Exception {
+ final String txId = beginTx(client, GTX);
+
+ // wait for the transaction to timeout (configured at 1ms)
+ Thread.sleep(1000);
+
+ // the transaction should be gone
+ try (final CloseableHttpResponse r = submitInTx(client, txId,
"g.V().count()", GTX)) {
+ assertEquals(404, r.getStatusLine().getStatusCode());
+ final String msg = extractStatusMessage(r);
+ assertTrue(msg.contains("Transaction not found"));
+ }
+ }
+
+ @Test
+ public void shouldTimeoutAndRejectLateCommit() throws Exception {
+ final String txId = beginTx(client, GTX);
+
+ // add a vertex
+ try (final CloseableHttpResponse r = submitInTx(client, txId,
"g.addV('timeout_test')", GTX)) {
+ assertEquals(200, r.getStatusLine().getStatusCode());
+ }
+
+ // wait for timeout (configured at 1000ms)
+ Thread.sleep(2000);
+
+ // attempt commit -- should fail with 404
+ try (final CloseableHttpResponse r = commitTx(client, txId, GTX)) {
+ assertEquals(404, r.getStatusLine().getStatusCode());
+ final String msg = extractStatusMessage(r);
+ assertTrue(msg.contains("Transaction not found"));
+ }
+
+ // verify data was not persisted
+ try (final CloseableHttpResponse r = submitNonTx(client,
"g.V().hasLabel('timeout_test').count()", GTX)) {
+ assertEquals(200, r.getStatusLine().getStatusCode());
+ assertEquals(0, extractCount(r));
+ }
+ }
+
+ @Test
+ public void shouldTrackTransactionCountAccurately() throws Exception {
+ final TransactionManager txManager =
server.getServerGremlinExecutor().getTransactionManager();
+ assertEquals(0, txManager.getActiveTransactionCount());
+
+ // open 3 transactions
+ final String txId1 = beginTx(client, GTX);
+ final String txId2 = beginTx(client, GTX);
+ final String txId3 = beginTx(client, GTX);
+
+ assertEquals(3, txManager.getActiveTransactionCount());
+
+ // commit one
+ commitTx(client, txId1, GTX);
+ assertEquals(2, txManager.getActiveTransactionCount());
+
+ // rollback one
+ rollbackTx(client, txId2, GTX);
+ assertEquals(1, txManager.getActiveTransactionCount());
+
+ // let the third one timeout
+ Thread.sleep(1500);
+ assertEquals(0, txManager.getActiveTransactionCount());
+ }
+
+ @Test
+ public void shouldTimeoutFreeSlotUnderMaxConcurrentTransactions() throws
Exception {
+ final TransactionManager tm =
server.getServerGremlinExecutor().getTransactionManager();
+
+ // fill the single slot
+ beginTx(client, GTX);
+ assertEquals(1, tm.getActiveTransactionCount());
+
+ // wait for timeout to reclaim the slot
+ Thread.sleep(2000);
+ assertEquals(0, tm.getActiveTransactionCount());
+
+ // now a new transaction should succeed
+ final String txId = beginTx(client, GTX);
+ assertNotNull(txId);
+ assertFalse(txId.isBlank());
+ }
+
+ @Test
+ public void shouldReturn404ForAllOperationsOnClosedTransaction() throws
Exception {
+ final String txId1 = beginTx(client, GTX);
+ try (final CloseableHttpResponse r = submitInTx(client, txId1,
"g.addV('test38')", GTX)) {
+ assertEquals(200, r.getStatusLine().getStatusCode());
+ }
+ try (final CloseableHttpResponse r = commitTx(client, txId1, GTX)) {
+ assertEquals(200, r.getStatusLine().getStatusCode());
+ }
+
+ // rollback-after-commit
+ try (final CloseableHttpResponse r = rollbackTx(client, txId1, GTX)) {
+ assertEquals(404, r.getStatusLine().getStatusCode());
+ }
+ // traversal-after-commit
+ try (final CloseableHttpResponse r = submitInTx(client, txId1,
"g.V().count()", GTX)) {
+ assertEquals(404, r.getStatusLine().getStatusCode());
+ }
+
+ final String txId2 = beginTx(client, GTX);
+ try (final CloseableHttpResponse r = submitInTx(client, txId2,
"g.addV('test38b')", GTX)) {
+ assertEquals(200, r.getStatusLine().getStatusCode());
+ }
+ try (final CloseableHttpResponse r = rollbackTx(client, txId2, GTX)) {
+ assertEquals(200, r.getStatusLine().getStatusCode());
+ }
+
+ // commit-after-rollback
+ try (final CloseableHttpResponse r = commitTx(client, txId2, GTX)) {
+ assertEquals(404, r.getStatusLine().getStatusCode());
+ }
+ // traversal-after-rollback
+ try (final CloseableHttpResponse r = submitInTx(client, txId2,
"g.V().count()", GTX)) {
+ assertEquals(404, r.getStatusLine().getStatusCode());
+ }
+ }
+
+ @Test
+ public void shouldNotLeakDataWhenTraversalQueuedBehindCommit() throws
Exception {
+ final String txId = beginTx(client, GTX);
+
+ // add vertices and an edge in the transaction
+ try (final CloseableHttpResponse r = submitInTx(client, txId,
"g.addV().property(T.id, 1)", GTX)) {
+ assertEquals(200, r.getStatusLine().getStatusCode());
+ }
+ try (final CloseableHttpResponse r = submitInTx(client, txId,
"g.addV().property(T.id, 2)", GTX)) {
+ assertEquals(200, r.getStatusLine().getStatusCode());
+ }
+
+ // Fire three requests concurrently: a long traversal to occupy the
server executor, then a commit that queues
+ // behind it, then a short query that queues behind the commit. The
short query should fail with 404 because the
+ // commit closes the transaction first.
+ final ExecutorService executor = Executors.newFixedThreadPool(3);
+ try {
+ final Future<CloseableHttpResponse> longFuture =
executor.submit(() ->
+ submitInTx(client, txId,
"g.V().repeat(both()).times(1000)", GTX));
+ Thread.sleep(50);
+
+ final Future<CloseableHttpResponse> commitFuture =
executor.submit(() ->
+ commitTx(client, txId, GTX));
+ Thread.sleep(50);
+
+ final Future<CloseableHttpResponse> shortFuture =
executor.submit(() ->
+ submitInTx(client, txId, "g.V().count()", GTX));
+
+ // collect responses
+ try (final CloseableHttpResponse ignored = longFuture.get(30,
TimeUnit.SECONDS)) {
+ // it doesn't matter what the long traversal returns, only
that it ran
+ }
+ try (final CloseableHttpResponse commitResp = commitFuture.get(30,
TimeUnit.SECONDS)) {
+ assertEquals(200, commitResp.getStatusLine().getStatusCode());
+ }
+ try (final CloseableHttpResponse shortResp = shortFuture.get(30,
TimeUnit.SECONDS)) {
+ assertEquals(404, shortResp.getStatusLine().getStatusCode());
+ final String msg = extractStatusMessage(shortResp);
+ assertTrue(msg.contains("Transaction not found"));
+ }
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test
+ public void shouldRollbackAbandonedTransaction() throws Exception {
+ final String txId1 = beginTx(client, GTX);
+ submitInTx(client, txId1, "g.addV()", GTX);
+
+ // wait for server-side timeout
+ Thread.sleep(1000);
+
+ // reconnect and verify data was not persisted
+ try (final CloseableHttpResponse r = submitNonTx(client,
"g.V().count()", GTX)) {
+ assertEquals(0, extractCount(r));
+ }
+ }
+
+ @Test
+ public void shouldRejectMismatchedGraphAliasInTransaction() throws
Exception {
+ final String txId = beginTx(client, GTX);
+
+ // send a request with the same txId but a different graph alias
+ try (final CloseableHttpResponse r = submitInTx(client, txId,
"g.V().count()", "gclassic")) {
+ final int status = r.getStatusLine().getStatusCode();
+ assertTrue("Expected error status for alias mismatch, got " +
status,
+ status == 400 || status == 404 || status == 500);
+ }
+ }
+
+ @Test
+ public void shouldRequireGraphAliasOnBeginRequest() throws Exception {
+ // begin with no g alias -- should default to "g" which doesn't
support transactions
+ try (final CloseableHttpResponse response = postJson(client,
+ "{\"gremlin\":\"g.tx().begin()\"}")) {
+ final int status = response.getStatusLine().getStatusCode();
+ assertEquals(400, status);
+ assertTrue(extractStatusMessage(response).contains("Graph does not
support transactions"));
+ }
+
+ // begin with an invalid alias -- should fail
+ try (final CloseableHttpResponse response = postJson(client,
+ "{\"gremlin\":\"g.tx().begin()\",\"g\":\"nonexistent\"}")) {
+ final int status = response.getStatusLine().getStatusCode();
+ assertEquals(400, status);
+ assertTrue(extractStatusMessage(response).contains("Could not
alias"));
+ }
+
+ // begin with valid alias -- should succeed (positive case)
+ final String txId = beginTx(client, GTX);
+ assertNotNull(txId);
+ assertFalse(txId.isBlank());
+ }
+
+ @Test
+ public void shouldRequireTransactionIdOnCommitAndRollback() throws
Exception {
+ // commit with no transactionId -- this is just "g.tx().commit()" with
no txId,
+ // which the server treats as a begin (since there's no txId). But the
gremlin is
+ // "g.tx().commit()" not "g.tx().begin()", so the server should route
to commit
+ // handling which requires a txId. The exact behavior depends on the
routing logic.
+ try (final CloseableHttpResponse response = postJson(client,
+ "{\"gremlin\":\"g.tx().commit()\",\"g\":\"gtx\"}")) {
+ // without a transactionId, this should not succeed as a commit
+ final int status = response.getStatusLine().getStatusCode();
+ // the server may treat this as a non-transactional request or
reject it
+ // either way it should not be a successful commit of a transaction
+ assertEquals(400, status);
+ assertTrue(extractStatusMessage(response).contains("only allowed
in transactional requests"));
+ }
+
+ // rollback with no transactionId -- same logic
+ try (final CloseableHttpResponse response = postJson(client,
+ "{\"gremlin\":\"g.tx().rollback()\",\"g\":\"gtx\"}")) {
+ final int status = response.getStatusLine().getStatusCode();
+ assertEquals(400, status);
+ assertTrue(extractStatusMessage(response).contains("only allowed
in transactional requests"));
+ }
+ }
+
+ @Test
+ public void shouldRejectBeginOnNonTransactionalGraph() throws Exception {
+ // gclassic is backed by TinkerGraph (non-transactional)
+ try (final CloseableHttpResponse response = postJson(client,
+ "{\"gremlin\":\"g.tx().begin()\",\"g\":\"gclassic\"}")) {
+ final int status = response.getStatusLine().getStatusCode();
+ assertTrue("Expected error for non-transactional graph, got " +
status,
+ status == 400 || status == 500);
+ }
+ }
+
+ @Test
+ public void shouldRoundTripTransactionIdWithGraphBinary() throws Exception
{
+ final GraphBinaryMessageSerializerV4 serializer = new
GraphBinaryMessageSerializerV4();
+
+ // begin via GraphBinary
+ final ByteBuf beginReq = serializer.serializeRequestAsBinary(
+ RequestMessage.build("g.tx().begin()").addG(GTX).create(),
+ new UnpooledByteBufAllocator(false));
+ final HttpPost beginPost = new
HttpPost(TestClientFactory.createURLString());
+ beginPost.addHeader(HttpHeaders.CONTENT_TYPE,
Serializers.GRAPHBINARY_V4.getValue());
+ beginPost.addHeader(HttpHeaders.ACCEPT,
Serializers.GRAPHBINARY_V4.getValue());
+ beginPost.setEntity(new ByteArrayEntity(beginReq.array()));
+
+ String txId;
+ try (final CloseableHttpResponse response = client.execute(beginPost))
{
+ assertEquals(200, response.getStatusLine().getStatusCode());
+ final ResponseMessage rm =
serializer.readChunk(toByteBuf(response.getEntity()), true);
+ final List<?> data = rm.getResult().getData();
+ assertNotNull(data);
+ assertTrue(data.size() > 0);
+ // the data should contain a map with transactionId
+ final Map<?, ?> map = (java.util.Map<?, ?>) data.get(0);
+ txId = (String) map.get(Tokens.ARGS_TRANSACTION_ID);
+ assertNotNull(txId);
+ assertFalse(txId.isBlank());
+ }
+
+ // addV via GraphBinary
+ final ByteBuf addVReq = serializer.serializeRequestAsBinary(
+
RequestMessage.build("g.addV('binary_test')").addG(GTX).addTransactionId(txId).create(),
+ new UnpooledByteBufAllocator(false));
+ final HttpPost addVPost = new
HttpPost(TestClientFactory.createURLString());
+ addVPost.addHeader(HttpHeaders.CONTENT_TYPE,
Serializers.GRAPHBINARY_V4.getValue());
+ addVPost.addHeader(HttpHeaders.ACCEPT,
Serializers.GRAPHBINARY_V4.getValue());
+ addVPost.setEntity(new ByteArrayEntity(addVReq.array()));
+ try (final CloseableHttpResponse response = client.execute(addVPost)) {
+ assertEquals(200, response.getStatusLine().getStatusCode());
+ }
+
+ // commit via GraphBinary
+ final ByteBuf commitReq = serializer.serializeRequestAsBinary(
+
RequestMessage.build("g.tx().commit()").addG(GTX).addTransactionId(txId).create(),
+ new UnpooledByteBufAllocator(false));
+ final HttpPost commitPost = new
HttpPost(TestClientFactory.createURLString());
+ commitPost.addHeader(HttpHeaders.CONTENT_TYPE,
Serializers.GRAPHBINARY_V4.getValue());
+ commitPost.addHeader(HttpHeaders.ACCEPT,
Serializers.GRAPHBINARY_V4.getValue());
+ commitPost.setEntity(new ByteArrayEntity(commitReq.array()));
+ try (final CloseableHttpResponse response =
client.execute(commitPost)) {
+ assertEquals(200, response.getStatusLine().getStatusCode());
+ }
+
+ // verify data persisted
+ try (final CloseableHttpResponse r = submitNonTx(client,
"g.V().hasLabel('binary_test').count()", GTX)) {
+ assertEquals(200, r.getStatusLine().getStatusCode());
+ assertEquals(1, extractCount(r));
+ }
+ }
+
+ private static ByteBuf toByteBuf(final org.apache.http.HttpEntity
httpEntity) throws java.io.IOException {
+ final byte[] asArray = EntityUtils.toByteArray(httpEntity);
+ return Unpooled.wrappedBuffer(asArray);
+ }
+ @Test
+ public void shouldRoundTripTransactionIdWithGraphSON() throws Exception {
+ final GraphSONMessageSerializerV4 serializer = new
GraphSONMessageSerializerV4();
+
+ // begin via GraphSON
+ final ByteBuf beginReq = serializer.serializeRequestAsBinary(
+ RequestMessage.build("g.tx().begin()").addG(GTX).create(),
+ new UnpooledByteBufAllocator(false));
+ final HttpPost beginPost = new
HttpPost(TestClientFactory.createURLString());
+ beginPost.addHeader(HttpHeaders.CONTENT_TYPE,
Serializers.GRAPHSON_V4.getValue());
+ beginPost.addHeader(HttpHeaders.ACCEPT,
Serializers.GRAPHSON_V4.getValue());
+ beginPost.setEntity(new ByteArrayEntity(beginReq.array()));
+
+ String txId;
+ try (final CloseableHttpResponse response = client.execute(beginPost))
{
+ assertEquals(200, response.getStatusLine().getStatusCode());
+ assertEquals(Serializers.GRAPHSON_V4.getValue(),
response.getEntity().getContentType().getValue());
+ final String json = EntityUtils.toString(response.getEntity());
+ final JsonNode node = mapper.readTree(json);
+ txId = node.get("result").get(TOKEN_DATA)
+ .get(GraphSONTokens.VALUEPROP).get(0)
+ .get(GraphSONTokens.VALUEPROP).get(1)
+ .asText();
+
+ assertNotNull(txId);
+ assertFalse(txId.isBlank());
+ }
+
+ // addV via GraphSON
+ final ByteBuf addVReq = serializer.serializeRequestAsBinary(
+
RequestMessage.build("g.addV('graphson_test')").addG(GTX).addTransactionId(txId).create(),
+ new UnpooledByteBufAllocator(false));
+ final HttpPost addVPost = new
HttpPost(TestClientFactory.createURLString());
+ addVPost.addHeader(HttpHeaders.CONTENT_TYPE,
Serializers.GRAPHSON_V4.getValue());
+ addVPost.addHeader(HttpHeaders.ACCEPT,
Serializers.GRAPHSON_V4.getValue());
+ addVPost.setEntity(new ByteArrayEntity(addVReq.array()));
+ try (final CloseableHttpResponse response = client.execute(addVPost)) {
+ assertEquals(200, response.getStatusLine().getStatusCode());
+ }
+
+ // commit via GraphSON
+ final ByteBuf commitReq = serializer.serializeRequestAsBinary(
+
RequestMessage.build("g.tx().commit()").addG(GTX).addTransactionId(txId).create(),
+ new UnpooledByteBufAllocator(false));
+ final HttpPost commitPost = new
HttpPost(TestClientFactory.createURLString());
+ commitPost.addHeader(HttpHeaders.CONTENT_TYPE,
Serializers.GRAPHSON_V4.getValue());
+ commitPost.addHeader(HttpHeaders.ACCEPT,
Serializers.GRAPHSON_V4.getValue());
+ commitPost.setEntity(new ByteArrayEntity(commitReq.array()));
+ try (final CloseableHttpResponse response =
client.execute(commitPost)) {
+ assertEquals(200, response.getStatusLine().getStatusCode());
+ }
+
+ // verify data persisted
+ try (final CloseableHttpResponse r = submitNonTx(client,
"g.V().hasLabel('graphson_test').count()", GTX)) {
+ assertEquals(200, r.getStatusLine().getStatusCode());
+ assertEquals(1, extractCount(r));
+ }
+ }
+
+ @Test
+ public void shouldReturnTransactionIdHeader() throws Exception {
+ final String beginJson = "{\"gremlin\":\"g.tx().begin()\",\"g\":\"" +
GTX + "\"}";
+ String txIdFromBegin;
+ try (final CloseableHttpResponse response = postJson(client,
beginJson)) {
+ assertEquals(200, response.getStatusLine().getStatusCode());
+
+ // header must be present
+
assertNotNull(response.getFirstHeader(Tokens.Headers.TRANSACTION_ID));
+ final String txIdFromHeader =
response.getFirstHeader(Tokens.Headers.TRANSACTION_ID).getValue();
+ assertNotNull(txIdFromHeader);
+ assertFalse(txIdFromHeader.isBlank());
+
+ // body must contain the same transaction ID
+ final String json = EntityUtils.toString(response.getEntity());
+ final JsonNode node = mapper.readTree(json);
+ txIdFromBegin = node.get(TOKEN_RESULT).get(TOKEN_DATA)
+ .get(GraphSONTokens.VALUEPROP).get(0)
+ .get(GraphSONTokens.VALUEPROP).get(1)
+ .asText();
+ assertNotNull(txIdFromBegin);
+ assertFalse(txIdFromBegin.isBlank());
+
+ assertEquals("Transaction ID in header and body must match on
begin",
+ txIdFromHeader, txIdFromBegin);
+ }
+
+ try (final CloseableHttpResponse response = submitInTx(client,
txIdFromBegin, "g.addV('dual_test')", GTX)) {
+ assertEquals(200, response.getStatusLine().getStatusCode());
+
+ // Body doesn't contain the Transaction ID for Traversals in
transactions.
+ assertEquals(txIdFromBegin,
response.getFirstHeader(Tokens.Headers.TRANSACTION_ID).getValue());
+ }
+
+ try (final CloseableHttpResponse response = commitTx(client,
txIdFromBegin, GTX)) {
+ assertEquals(200, response.getStatusLine().getStatusCode());
+
+ final JsonNode jsonResponse =
mapper.readTree(EntityUtils.toString(response.getEntity()));
+ final String txIdFromCommit =
jsonResponse.get(TOKEN_RESULT).get(TOKEN_DATA)
+ .get(GraphSONTokens.VALUEPROP).get(0)
+ .get(GraphSONTokens.VALUEPROP).get(1)
+ .asText();
+ assertEquals(txIdFromBegin, txIdFromCommit);
+
+ assertEquals("Transaction ID in header must match on submit",
+ txIdFromBegin,
response.getFirstHeader(Tokens.Headers.TRANSACTION_ID).getValue());
+ }
+ }
+}
diff --git
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/Tokens.java
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/Tokens.java
index 4e83032df4..2668b8c66d 100644
--- a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/Tokens.java
+++ b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/Tokens.java
@@ -29,6 +29,17 @@ import org.apache.tinkerpop.gremlin.structure.Graph;
public final class Tokens {
private Tokens() {}
+ /**
+ * String constants used in the HTTP API.
+ */
+ public static final class Headers {
+
+ /**
+ * The header for transaction IDs.
+ */
+ public static final String TRANSACTION_ID = "X-Transaction-Id";
+ }
+
/**
* Argument name that allows definition of the number of iterations each
HTTP chunk should contain -
* overrides the @{code resultIterationBatchSize} server setting.
@@ -95,4 +106,11 @@ public final class Tokens {
* identifying the kind of client it came from.
*/
public static final String ARGS_USER_AGENT = "userAgent";
+
+ /**
+ * Argument name for the transaction ID used to track multi-request
transactions over HTTP.
+ * The transaction ID is a UUID generated by the client at transaction
begin time and included
+ * in every request within the transaction.
+ */
+ public static final String ARGS_TRANSACTION_ID = "transactionId";
}
diff --git
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/RequestMessage.java
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/RequestMessage.java
index 30072dd3bd..f25aa3781c 100644
---
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/RequestMessage.java
+++
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/RequestMessage.java
@@ -165,6 +165,20 @@ public final class RequestMessage {
return this;
}
+ /**
+ * Adds a transaction ID to the request message. The transaction ID is
used to track
+ * multi-request transactions over HTTP. All requests within a
transaction must include
+ * the same transaction ID.
+ *
+ * @param transactionId the unique transaction identifier (typically a
UUID)
+ * @return this builder
+ */
+ public Builder addTransactionId(final String transactionId) {
+ Objects.requireNonNull(transactionId, "transactionId argument
cannot be null.");
+ this.fields.put(Tokens.ARGS_TRANSACTION_ID, transactionId);
+ return this;
+ }
+
/**
* Create the request message given the settings provided to the
{@link Builder}.
*/
diff --git
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/AbstractGraphSONMessageSerializerV4.java
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/AbstractGraphSONMessageSerializerV4.java
index bc0212dfdf..caa3def020 100644
---
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/AbstractGraphSONMessageSerializerV4.java
+++
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/AbstractGraphSONMessageSerializerV4.java
@@ -338,6 +338,9 @@ public abstract class AbstractGraphSONMessageSerializerV4
extends AbstractMessag
if (data.containsKey(Tokens.BULK_RESULTS)) {
builder.addBulkResults(Boolean.parseBoolean(data.get(Tokens.BULK_RESULTS).toString()));
}
+ if (data.containsKey(Tokens.ARGS_TRANSACTION_ID)) {
+
builder.addTransactionId(data.get(Tokens.ARGS_TRANSACTION_ID).toString());
+ }
return builder.create();
}
diff --git
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/binary/RequestMessageSerializer.java
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/binary/RequestMessageSerializer.java
index 4d37f40e33..9397e5afde 100644
---
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/binary/RequestMessageSerializer.java
+++
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/binary/RequestMessageSerializer.java
@@ -73,6 +73,9 @@ public class RequestMessageSerializer {
if (fields.containsKey(Tokens.BULK_RESULTS)) {
builder.addBulkResults(Boolean.parseBoolean(fields.get(Tokens.BULK_RESULTS).toString()));
}
+ if (fields.containsKey(Tokens.ARGS_TRANSACTION_ID)) {
+
builder.addTransactionId(fields.get(Tokens.ARGS_TRANSACTION_ID).toString());
+ }
return builder.create();
} catch (IOException ex) {
diff --git
a/gremlin-util/src/test/java/org/apache/tinkerpop/gremlin/util/message/RequestMessageTest.java
b/gremlin-util/src/test/java/org/apache/tinkerpop/gremlin/util/message/RequestMessageTest.java
index b674dd046c..d5577a48f0 100644
---
a/gremlin-util/src/test/java/org/apache/tinkerpop/gremlin/util/message/RequestMessageTest.java
+++
b/gremlin-util/src/test/java/org/apache/tinkerpop/gremlin/util/message/RequestMessageTest.java
@@ -122,12 +122,6 @@ public class RequestMessageTest {
assertEquals(g, msg.getField(Tokens.ARGS_G));
}
- @Test
- public void shouldGetGAsArgOrDefault() {
- final RequestMessage msg = RequestMessage.build("g").create();
- assertEquals("b", msg.getFieldOrDefault(Tokens.ARGS_G, "b"));
- }
-
@Test
public void shouldGetGAsArgAsOptional() {
final String g = "gmodern";