This is an automated email from the ASF dual-hosted git repository. spmallette pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit a76c72a3f6b91d1238aeee12c74f01c40a73efb3 Merge: be19534d73 213609d4ff Author: Stephen Mallette <[email protected]> AuthorDate: Wed Jun 24 16:53:52 2026 -0400 Merge branch '3.8-dev' Adapts the 3.x fail() step FailResponseException feature to the 4.0 HTTP protocol. The driver now creates a FailResponseException (a ResponseException that implements Failure) when a server response carries the ServerFailStepException exception name, replacing the 3.x reliance on the removed SERVER_ERROR_FAIL_STEP status code. Detection is centralized in ResponseException.create() and used by both the GraphBinary streaming reader and GremlinResponseHandler. Assisted-by: Claude Code:claude-opus-4-8 CHANGELOG.asciidoc | 1 + docs/src/reference/the-traversal.asciidoc | 12 +++ docs/src/upgrade/release-3.7.x.asciidoc | 17 ++++ .../gremlin/process/traversal/Failure.java | 105 ++++++++++++++------- .../gremlin/process/traversal/FailureTest.java | 101 ++++++++++++++++++++ .../driver/exception/FailResponseException.java | 61 ++++++++++++ .../driver/exception/ResponseException.java | 13 +++ .../driver/handler/GremlinResponseHandler.java | 2 +- .../stream/GraphBinaryStreamResponseReader.java | 2 +- .../exception/FailResponseExceptionTest.java | 83 ++++++++++++++++ .../gremlin/server/GremlinServerIntegrateTest.java | 4 + 11 files changed, 367 insertions(+), 34 deletions(-) diff --cc CHANGELOG.asciidoc index 0aba84c011,5a06e91af6..0fe949254e --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@@ -389,6 -242,7 +389,7 @@@ image::https://raw.githubusercontent.co * Added `NextN(n)` to `Traversal` in `gremlin-go` for batched result iteration, providing API parity with `next(n)` in the Java, Python, and .NET GLVs. * Added `next(n)` to `Traversal` in `gremlin-javascript` for batched result iteration, providing API parity with `next(n)` in the Java, Python, and .NET GLVs. * Added `WithComputer()` to `GraphTraversalSource` in `gremlin-go`, providing OLAP configuration parity with other language variants. -* Added `FailResponseException` to `gremlin-driver` which is thrown `fail()` step is triggered on the server making it more consistent with embedded behavior. ++* Added `FailResponseException` to `gremlin-driver` which is thrown when a `fail()` step is triggered on the server, making remote behavior more consistent with embedded. * Fixed conjoin has incorrect null handling. * Expanded `gremlin-python` CI matrix to test against Python 3.9, 3.10, 3.11, 3.12, and 3.13. * Add Node 26 support for `gremlin-javascript` and `gremlint`. diff --cc docs/src/reference/the-traversal.asciidoc index 46c53047a9,1ddb0dfd74..b8886a651a --- a/docs/src/reference/the-traversal.asciidoc +++ b/docs/src/reference/the-traversal.asciidoc @@@ -1856,6 -1855,20 +1856,18 @@@ rollback. Moreover, the ability to roll configured without transaction support, will simply be left in a partially mutated state whether the action to rollback on `fail()` was implemented or not. + The type of exception that `fail()` produces depends on how the traversal is executed: + + * *Embedded* - When executed directly against a `Graph` instance in the same JVM, `fail()` throws a + `FailStep.FailException` which implements the `Failure` interface. The `Failure` provides programmatic access to the + failure `Message`, `Traverser`, `Traversal` and `Metadata` as shown in the formatted output above. -* *Remote (HTTP)* - When executed against Gremlin Server over HTTP, the server returns a `500` error with a message that -indicates that the `fail()` step was triggered. The richer `Failure` data described above is not retained. -* *Remote (WebSockets)* - When executed against Gremlin Server over WebSockets, the server returns the -`595`/`SERVER_ERROR_FAIL_STEP` status code. In the Java driver this surfaces as a `FailResponseException`, a subclass of -`ResponseException` that also implements the `Failure` interface so that the failure can be caught and handled in a -manner more consistent with the embedded case. Note that the `Failure` data (e.g. `Traverser`, `Traversal`, `Metadata`) -is not transmitted from the server, so those accessors return empty or `null` values. Other Gremlin Language Variants -continue to surface this condition as their standard remote exception type. ++* *Remote* - When executed against Gremlin Server, the server returns a `500` error that indicates that the `fail()` ++step was triggered. In the Java driver this surfaces as a `FailResponseException`, a subclass of `ResponseException` ++that also implements the `Failure` interface so that the failure can be caught and handled in a manner more consistent ++with the embedded case. Note that the `Failure` data (e.g. `Traverser`, `Traversal`, `Metadata`) is not transmitted ++from the server, so those accessors return empty or `null` values. Other Gremlin Language Variants continue to surface ++this condition as their standard remote exception type. + *Additional References* link:++https://tinkerpop.apache.org/javadocs/x.y.z/core/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversal.html#fail()++[`fail()`], diff --cc gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/Failure.java index 9b1d06eba6,3c5b2ed0e8..6c5d7ca9ea --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/Failure.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/Failure.java @@@ -18,7 -18,9 +18,8 @@@ */ package org.apache.tinkerpop.gremlin.process.traversal; + import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep; -import org.apache.tinkerpop.gremlin.process.traversal.translator.GroovyTranslator; import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement; import java.util.ArrayList; @@@ -28,14 -30,42 +29,40 @@@ import java.util.Map import java.util.Set; import java.util.stream.Collectors; + /** + * Represents a failure raised by the {@link GraphTraversal#fail()} step which forces a traversal to immediately stop + * in an error state. Implementations carry the contextual information about where and why the failure occurred so that + * it can be inspected programmatically or rendered for display by way of {@link #format()}. + * <p/> + * When a {@code fail()} occurs in embedded mode the full context (the offending {@link Traverser} and {@link Traversal}) + * is available. When reconstructed from a remote server response, that context is not transmitted and the relevant + * accessors may return {@code null}. Implementations and callers should account for that possibility. + * + * @author Stephen Mallette (http://stephen.genoprime.com) + */ public interface Failure { - static Translator.ScriptTranslator TRANSLATOR = GroovyTranslator.of(""); - + /** + * Gets the message associated with the failure as provided to the {@code fail()} step. + */ String getMessage(); + /** + * Gets any additional metadata associated with the failure. Returns an empty {@code Map} when there is no metadata. + */ Map<String,Object> getMetadata(); + /** + * Gets the {@link Traverser} that was being processed when the failure was triggered. May return {@code null} when + * the {@code Failure} was reconstructed from a remote response where this context is not available. + */ Traverser.Admin getTraverser(); + /** + * Gets the {@link Traversal} that contained the {@code fail()} step that triggered the failure. May return + * {@code null} when the {@code Failure} was reconstructed from a remote response where this context is not + * available. + */ Traversal.Admin getTraversal(); /** @@@ -43,44 -73,58 +70,58 @@@ */ public default String format() { final List<String> lines = new ArrayList<>(); - final Step parentStep = (Step) getTraversal().getParent(); + + // some Failure implementations - notably those constructed from a remote response such as + // FailResponseException - do not carry the traversal/traverser context as that data is not transmitted + // from the server. guard against null returns so that format() degrades gracefully rather than throwing + // a NullPointerException. + final Traversal.Admin traversal = getTraversal(); + final Traverser.Admin traverser = getTraverser(); + final Step parentStep = traversal != null ? (Step) traversal.getParent() : null; lines.add(String.format("Message > %s", getMessage())); - lines.add(String.format("Traverser> %s", getTraverser().toString())); - final TraverserGenerator generator = getTraversal().getTraverserGenerator(); - final Traverser.Admin traverser = getTraverser(); - if (generator.getProvidedRequirements().contains(TraverserRequirement.BULK)) { - lines.add(String.format(" Bulk > %s", traverser.bulk())); - } - if (generator.getProvidedRequirements().contains(TraverserRequirement.SACK)) { - lines.add(String.format(" Sack > %s", traverser.sack())); - } - if (generator.getProvidedRequirements().contains(TraverserRequirement.PATH)) { - lines.add(String.format(" Path > %s", traverser.path())); - } - if (generator.getProvidedRequirements().contains(TraverserRequirement.SINGLE_LOOP) || - generator.getProvidedRequirements().contains(TraverserRequirement.NESTED_LOOP) ) { - final Set<String> loopNames = traverser.getLoopNames(); - final String loopsLine = loopNames.isEmpty() ? - String.valueOf(traverser.asAdmin().loops()) : - loopNames.stream().collect(Collectors.toMap(loopName -> loopName, traverser::loops)).toString(); - lines.add(String.format(" Loops > %s", loopsLine)); - } - if (generator.getProvidedRequirements().contains(TraverserRequirement.SIDE_EFFECTS)) { - final TraversalSideEffects tse = traverser.getSideEffects(); - final Set<String> keys = tse.keys(); - lines.add(String.format(" S/E > %s", keys.stream().collect(Collectors.toMap(k -> k, tse::get)))); + // not sure how you'd have one without the other really + if (traverser != null) { + lines.add(String.format("Traverser> %s", traverser.toString())); + + if (traversal != null) { + final TraverserGenerator generator = traversal.getTraverserGenerator(); + if (generator.getProvidedRequirements().contains(TraverserRequirement.BULK)) { + lines.add(String.format(" Bulk > %s", traverser.bulk())); + } + if (generator.getProvidedRequirements().contains(TraverserRequirement.SACK)) { + lines.add(String.format(" Sack > %s", traverser.sack())); + } + if (generator.getProvidedRequirements().contains(TraverserRequirement.PATH)) { + lines.add(String.format(" Path > %s", traverser.path())); + } + if (generator.getProvidedRequirements().contains(TraverserRequirement.SINGLE_LOOP) || + generator.getProvidedRequirements().contains(TraverserRequirement.NESTED_LOOP) ) { + final Set<String> loopNames = traverser.getLoopNames(); + final String loopsLine = loopNames.isEmpty() ? + String.valueOf(traverser.asAdmin().loops()) : + loopNames.stream().collect(Collectors.toMap(loopName -> loopName, traverser::loops)).toString(); + lines.add(String.format(" Loops > %s", loopsLine)); + } + if (generator.getProvidedRequirements().contains(TraverserRequirement.SIDE_EFFECTS)) { + final TraversalSideEffects tse = traverser.getSideEffects(); + final Set<String> keys = tse.keys(); + lines.add(String.format(" S/E > %s", keys.stream().collect(Collectors.toMap(k -> k, tse::get)))); + } + } } - // removes the starting period so that "__.out()" simply presents as "out()" - lines.add(String.format("Traversal> %s", getTraversal().getGremlinLang().getGremlin().substring(1))); + if (traversal != null) { + // removes the starting period so that "__.out()" simply presents as "out()" - lines.add(String.format("Traversal> %s", TRANSLATOR.translate(traversal).getScript().substring(1))); ++ lines.add(String.format("Traversal> %s", getTraversal().getGremlinLang().getGremlin().substring(1))); - // not sure there is a situation where fail() would be used where it was not wrapped in a parent, - // but on the odd case that it is it can be handled - if (parentStep != EmptyStep.instance()) { - lines.add(String.format("Parent > %s [%s]", - parentStep.getClass().getSimpleName(), getTraversal().getGremlinLang().getGremlin().substring(1))); + // not sure there is a situation where fail() would be used where it was not wrapped in a parent, + // but on the odd case that it is it can be handled + if (parentStep != null && parentStep != EmptyStep.instance()) { + lines.add(String.format("Parent > %s [%s]", - parentStep.getClass().getSimpleName(), TRANSLATOR.translate(parentStep.getTraversal()).getScript().substring(1))); ++ parentStep.getClass().getSimpleName(), getTraversal().getGremlinLang().getGremlin().substring(1))); + } } lines.add(String.format("Metadata > %s", getMetadata())); diff --cc gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/FailResponseException.java index 0000000000,2529349de1..272f0e83dd mode 000000,100644..100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/FailResponseException.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/FailResponseException.java @@@ -1,0 -1,60 +1,61 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.tinkerpop.gremlin.driver.exception; + ++import io.netty.handler.codec.http.HttpResponseStatus; + import org.apache.tinkerpop.gremlin.process.traversal.Failure; + import org.apache.tinkerpop.gremlin.process.traversal.Traversal; + import org.apache.tinkerpop.gremlin.process.traversal.Traverser; + import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; -import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode; + + import java.util.Collections; -import java.util.List; + import java.util.Map; + + /** + * Provides a {@link Failure} implementation for {@link ResponseException}. This exception is thrown instead of - * a {@code ResponseException} when the server returns a {@code status.code} of - * {@link ResponseStatusCode#SERVER_ERROR_FAIL_STEP} which indicates that a step in the traversal failed by way of - * {@link GraphTraversal#fail()}. This approach helps make remote exception handling for that step more consistent - * with the local {@link GraphTraversal#fail()} behavior. ++ * a {@code ResponseException} when the server response carries an {@code exception} of {@link #EXCEPTION_NAME} which ++ * indicates that a step in the traversal failed by way of {@link GraphTraversal#fail()}. This approach helps make ++ * remote exception handling for that step more consistent with the local {@link GraphTraversal#fail()} behavior. + */ + public class FailResponseException extends ResponseException implements Failure { - public FailResponseException(final String serverMessage, - final List<String> remoteExceptionHierarchy, final String remoteStackTrace, - final Map<String,Object> statusAttributes) { - super(ResponseStatusCode.SERVER_ERROR_FAIL_STEP, serverMessage, remoteExceptionHierarchy, - remoteStackTrace, statusAttributes); ++ ++ /** ++ * The {@code exception} name returned in a server response when a {@link GraphTraversal#fail()} step is triggered. ++ */ ++ public static final String EXCEPTION_NAME = "ServerFailStepException"; ++ ++ public FailResponseException(final HttpResponseStatus responseStatusCode, final String serverMessage) { ++ super(responseStatusCode, serverMessage, EXCEPTION_NAME); + } + + @Override + public Map<String, Object> getMetadata() { + return Collections.emptyMap(); + } + + @Override + public Traverser.Admin getTraverser() { + return null; + } + + @Override + public Traversal.Admin getTraversal() { + return null; + } + } diff --cc gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ResponseException.java index 1c0d8e77c8,35ca1e2bb1..978beeef02 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ResponseException.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ResponseException.java @@@ -24,22 -29,47 +24,35 @@@ import io.netty.handler.codec.http.Http * @author Stephen Mallette (http://stephen.genoprime.com) */ public class ResponseException extends Exception { - private final ResponseStatusCode responseStatusCode; - private final String remoteStackTrace; - private final List<String> remoteExceptionHierarchy; - private final Map<String,Object> attributes; - - public ResponseException(final ResponseStatusCode responseStatusCode, final String serverMessage) { - this(responseStatusCode, serverMessage, null, null); - } + private final HttpResponseStatus responseStatusCode; + private final String remoteException; - public ResponseException(final ResponseStatusCode responseStatusCode, final String serverMessage, - final List<String> remoteExceptionHierarchy, final String remoteStackTrace) { - this(responseStatusCode, serverMessage, remoteExceptionHierarchy, remoteStackTrace, null); + public ResponseException(final HttpResponseStatus responseStatusCode, final String serverMessage) { + this(responseStatusCode, serverMessage, null); } - public ResponseException(final ResponseStatusCode responseStatusCode, final String serverMessage, - final List<String> remoteExceptionHierarchy, final String remoteStackTrace, - final Map<String,Object> statusAttributes) { + public ResponseException(final HttpResponseStatus responseStatusCode, final String serverMessage, + final String remoteException) { super(serverMessage); this.responseStatusCode = responseStatusCode; - this.remoteExceptionHierarchy = remoteExceptionHierarchy != null ? Collections.unmodifiableList(remoteExceptionHierarchy) : null; - this.remoteStackTrace = remoteStackTrace; - this.attributes = statusAttributes != null ? Collections.unmodifiableMap(statusAttributes) : null; - } - - public ResponseStatusCode getResponseStatusCode() { - return responseStatusCode; + this.remoteException = remoteException; } + /** - * The stacktrace produced by the remote server. ++ * Creates a {@code ResponseException} from a server response, returning the more specific ++ * {@link FailResponseException} when the {@code remoteException} indicates that a {@link ++ * org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal#fail()} step was triggered on the ++ * server. This allows remote {@code fail()} handling to be made consistent with the local behavior. + */ - public Optional<String> getRemoteStackTrace() { - return Optional.ofNullable(remoteStackTrace); ++ public static ResponseException create(final HttpResponseStatus responseStatusCode, final String serverMessage, ++ final String remoteException) { ++ return FailResponseException.EXCEPTION_NAME.equals(remoteException) ++ ? new FailResponseException(responseStatusCode, serverMessage) ++ : new ResponseException(responseStatusCode, serverMessage, remoteException); + } + - /** - * The list of exceptions generated by the server starting with the top-most one followed by its "cause". That - * cause is then followed by its cause and so on down the line. - */ - public Optional<List<String>> getRemoteExceptionHierarchy() { - return Optional.ofNullable(remoteExceptionHierarchy); + public HttpResponseStatus getResponseStatusCode() { + return responseStatusCode; } /** diff --cc gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java index 94898783eb,0000000000..5b445941d0 mode 100644,000000..100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java @@@ -1,141 -1,0 +1,141 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.handler; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.util.AttributeKey; +import javax.net.ssl.SSLException; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.tinkerpop.gremlin.driver.Result; +import org.apache.tinkerpop.gremlin.driver.ResultSet; +import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; +import org.apache.tinkerpop.gremlin.process.remote.traversal.DefaultRemoteTraverser; +import org.apache.tinkerpop.gremlin.util.ExceptionHelper; +import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; +import org.apache.tinkerpop.gremlin.util.message.ResponseMessage; +import org.apache.tinkerpop.gremlin.util.ser.SerializationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE; + +/** + * Writes responses to the {@link ResultSet} of a request as the {@link ResponseMessage} objects are deserialized. + */ +public class GremlinResponseHandler extends SimpleChannelInboundHandler<ResponseMessage> { + public static final AttributeKey<Throwable> INBOUND_SSL_EXCEPTION = AttributeKey.valueOf("inboundSslException"); + private static final Logger logger = LoggerFactory.getLogger(GremlinResponseHandler.class); + private static final AttributeKey<ResponseException> CAUGHT_EXCEPTION = AttributeKey.valueOf("caughtException"); + private final AtomicReference<ResultSet> pendingResultSet; + private final Runnable onResponseComplete; + private final boolean streaming; + + public GremlinResponseHandler(final AtomicReference<ResultSet> pending, final Runnable onResponseComplete, final boolean streaming) { + this.pendingResultSet = pending; + this.onResponseComplete = onResponseComplete; + this.streaming = streaming; + } + + @Override + public void channelInactive(final ChannelHandlerContext ctx) throws Exception { + // occurs when the server shuts down in a disorderly fashion, otherwise in an orderly shutdown the server + // should fire off a close message which will properly release the driver. + super.channelInactive(ctx); + + final ResultSet current = pendingResultSet.getAndSet(null); + if (current != null) { + current.markError(new IllegalStateException("Connection to server is no longer active")); + } + } + + @Override + protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final ResponseMessage response) { + final HttpResponseStatus statusCode = response.getStatus() == null ? null : response.getStatus().getCode(); + final ResultSet resultSet = pendingResultSet.get(); + + if ((null == statusCode) || (statusCode == HttpResponseStatus.OK)) { + final List<Object> data = response.getResult().getData(); + final boolean bulked = channelHandlerContext.channel().attr(HttpGremlinResponseStreamDecoder.IS_BULKED).get(); + // unrolls the collection into individual results to be handled by the ResultSet. + if (bulked) { + for (Iterator<Object> iter = data.iterator(); iter.hasNext(); ) { + final Object obj = iter.next(); + final long bulk = (long) iter.next(); + DefaultRemoteTraverser<Object> item = new DefaultRemoteTraverser<>(obj, bulk); + resultSet.add(new Result(item)); + } + } else { + data.forEach(item -> resultSet.add(new Result(item))); + } + + } else { + // this is a "success" but represents no results otherwise it is an error + if (statusCode != HttpResponseStatus.NO_CONTENT) { + // Save the error because there could be a subsequent HttpContent coming (probably just trailers). All + // content should be read first before marking the ResultSet or else this channel might get reused too early. + channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).set( - new ResponseException(response.getStatus().getCode(), response.getStatus().getMessage(), ++ ResponseException.create(response.getStatus().getCode(), response.getStatus().getMessage(), + response.getStatus().getException()) + ); + } + } + + // Stream is done when the last content signaling response message is read. + if (LAST_CONTENT_READ_RESPONSE == response) { + if (!streaming) { + final ResultSet rs = pendingResultSet.getAndSet(null); + if (rs != null) { + if (null == channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).get()) { + rs.markComplete(); + } else { + rs.markError(channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).getAndSet(null)); + } + } + } + onResponseComplete.run(); + } + } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { + // if this happens enough times (like the client is unable to deserialize a response) the pending + // messages queue will not clear. wonder if there is some way to cope with that. of course, if + // there are that many failures someone would take notice and hopefully stop the client. + logger.error("Could not process the response", cause); + + final ResultSet resultSet = this.pendingResultSet.getAndSet(null); + if (resultSet != null) resultSet.markError(cause); + + if (ExceptionHelper.getRootCause(cause) instanceof SSLException) { + // inbound ssl error can happen with tls 1.3 because client certification auth can fail after the handshake completes + // store the inbound ssl error so that outbound can retrieve it + ctx.channel().attr(INBOUND_SSL_EXCEPTION).set(cause); + } + + // serialization exceptions should not close the channel - that's worth a retry + if (!IteratorUtils.anyMatch(ExceptionUtils.getThrowableList(cause).iterator(), t -> t instanceof SerializationException)) + if (ctx.channel().isActive()) ctx.close(); + } +} diff --cc gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/GraphBinaryStreamResponseReader.java index 11491903e8,0000000000..8432d01562 mode 100644,000000..100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/GraphBinaryStreamResponseReader.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/GraphBinaryStreamResponseReader.java @@@ -1,105 -1,0 +1,105 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.stream; + +import io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.tinkerpop.gremlin.driver.Result; +import org.apache.tinkerpop.gremlin.driver.ResultSet; +import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; +import org.apache.tinkerpop.gremlin.process.remote.traversal.DefaultRemoteTraverser; +import org.apache.tinkerpop.gremlin.structure.io.Buffer; +import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader; +import org.apache.tinkerpop.gremlin.structure.io.binary.Marker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * Performs pull-based streaming deserialization of a GraphBinary v4 response from an {@link InputStreamBuffer}. + * Reads one item at a time using the {@link GraphBinaryReader} and {@code TypeSerializer} infrastructure, + * pushing each result to the {@link ResultSet} as it is deserialized. + * <p> + * Wire format: {@code [version_byte][bulked_flag_byte][items...][EndOfStream marker][status_code][message][exception]} + */ +public class GraphBinaryStreamResponseReader implements Runnable { + + private static final Logger logger = LoggerFactory.getLogger(GraphBinaryStreamResponseReader.class); + + private final Buffer buffer; + private final GraphBinaryReader reader; + private final ResultSet resultSet; + private final AtomicReference<ResultSet> pendingResultSet; + + public GraphBinaryStreamResponseReader(final Buffer buffer, + final GraphBinaryReader reader, + final ResultSet resultSet, + final AtomicReference<ResultSet> pendingResultSet) { + this.buffer = buffer; + this.reader = reader; + this.resultSet = resultSet; + this.pendingResultSet = pendingResultSet; + } + + @Override + public void run() { + try { + // Read header: version byte (MSB must be 1) and bulking flag + final byte version = buffer.readByte(); + if ((version & 0x80) == 0) { + throw new RuntimeException("Invalid GraphBinary response version: " + version); + } + final boolean bulked = (buffer.readByte() & 1) == 1; + + // Read items until EndOfStream marker + while (true) { + final Object obj = reader.read(buffer); + if (obj instanceof Marker) { + break; + } + + if (bulked) { + final long bulk = reader.read(buffer); + resultSet.add(new Result(new DefaultRemoteTraverser<>(obj, bulk))); + } else { + resultSet.add(new Result(obj)); + } + } + + // Read footer: status code, nullable message, nullable exception + final int statusCode = reader.readValue(buffer, Integer.class, false); + final String message = reader.readValue(buffer, String.class, true); + final String exception = reader.readValue(buffer, String.class, true); + + // Status code 0 means success in GraphBinary v4 — the server omits the HTTP status code + // in the binary footer when the response is successful. + if (statusCode == 0 || statusCode == HttpResponseStatus.OK.code()) { + resultSet.markComplete(); + } else { - resultSet.markError(new ResponseException(HttpResponseStatus.valueOf(statusCode), message, exception)); ++ resultSet.markError(ResponseException.create(HttpResponseStatus.valueOf(statusCode), message, exception)); + } + } catch (Throwable t) { + logger.warn("Error reading streaming response", t); + resultSet.markError(t); + } finally { + pendingResultSet.compareAndSet(resultSet, null); + buffer.release(); + } + } +} diff --cc gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/exception/FailResponseExceptionTest.java index 0000000000,53dbea7247..e8e8959ed4 mode 000000,100644..100644 --- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/exception/FailResponseExceptionTest.java +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/exception/FailResponseExceptionTest.java @@@ -1,0 -1,65 +1,83 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.tinkerpop.gremlin.driver.exception; + ++import io.netty.handler.codec.http.HttpResponseStatus; + import org.apache.tinkerpop.gremlin.process.traversal.Failure; -import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode; + import org.junit.Test; + + import static org.hamcrest.MatcherAssert.assertThat; + import static org.hamcrest.Matchers.containsString; + import static org.hamcrest.Matchers.instanceOf; ++import static org.hamcrest.Matchers.not; + import static org.junit.Assert.assertEquals; + import static org.junit.Assert.assertNotNull; + import static org.junit.Assert.assertNull; + import static org.junit.Assert.assertTrue; + + public class FailResponseExceptionTest { + + @Test + public void shouldBeAResponseExceptionAndFailure() { - final FailResponseException ex = new FailResponseException("make it stop", null, null, null); ++ final FailResponseException ex = new FailResponseException(HttpResponseStatus.INTERNAL_SERVER_ERROR, "make it stop"); + assertThat(ex, instanceOf(ResponseException.class)); + assertThat(ex, instanceOf(Failure.class)); - assertEquals(ResponseStatusCode.SERVER_ERROR_FAIL_STEP, ex.getResponseStatusCode()); ++ assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, ex.getResponseStatusCode()); ++ assertEquals(FailResponseException.EXCEPTION_NAME, ex.getRemoteException()); + assertEquals("make it stop", ex.getMessage()); + } + ++ @Test ++ public void shouldCreateFailResponseExceptionWhenRemoteExceptionIsFailStep() { ++ final ResponseException ex = ResponseException.create(HttpResponseStatus.INTERNAL_SERVER_ERROR, ++ "make it stop", FailResponseException.EXCEPTION_NAME); ++ assertThat(ex, instanceOf(FailResponseException.class)); ++ assertThat(ex, instanceOf(Failure.class)); ++ } ++ ++ @Test ++ public void shouldCreatePlainResponseExceptionWhenRemoteExceptionIsNotFailStep() { ++ final ResponseException ex = ResponseException.create(HttpResponseStatus.INTERNAL_SERVER_ERROR, ++ "boom", "ServerErrorException"); ++ assertThat(ex, instanceOf(ResponseException.class)); ++ assertThat(ex, not(instanceOf(FailResponseException.class))); ++ } ++ + @Test + public void shouldReturnEmptyFailureContextSinceItIsNotTransmittedFromServer() { - final FailResponseException ex = new FailResponseException("make it stop", null, null, null); ++ final FailResponseException ex = new FailResponseException(HttpResponseStatus.INTERNAL_SERVER_ERROR, "make it stop"); + assertTrue(ex.getMetadata().isEmpty()); + assertNull(ex.getTraverser()); + assertNull(ex.getTraversal()); + } + + @Test + public void shouldFormatWithoutThrowingWhenFailureContextIsAbsent() { - final FailResponseException ex = new FailResponseException("make it stop", null, null, null); ++ final FailResponseException ex = new FailResponseException(HttpResponseStatus.INTERNAL_SERVER_ERROR, "make it stop"); + + // format() must not NPE even though getTraversal()/getTraverser() return null for a remotely + // reconstructed Failure + final String formatted = ex.format(); + + assertNotNull(formatted); + assertThat(formatted, containsString("fail() Step Triggered")); + assertThat(formatted, containsString("Message > make it stop")); + assertThat(formatted, containsString("Metadata > {}")); + } + } diff --cc gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java index 7075e65ead,5e80a2d4d6..40489760c0 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java @@@ -25,10 -26,19 +25,12 @@@ import java.math.BigDecimal import nl.altindag.log.LogCaptor; import org.apache.commons.configuration2.BaseConfiguration; import org.apache.commons.configuration2.Configuration; -import org.apache.commons.lang3.RandomStringUtils; import org.apache.tinkerpop.gremlin.driver.RequestOptions; + import org.apache.tinkerpop.gremlin.driver.exception.FailResponseException; + import org.apache.tinkerpop.gremlin.process.traversal.Failure; import org.apache.tinkerpop.gremlin.server.channel.HttpTestChannelizer; import org.apache.tinkerpop.gremlin.server.channel.TestChannelizer; -import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer; -import org.apache.tinkerpop.gremlin.server.channel.UnifiedTestChannelizer; -import org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer; -import org.apache.tinkerpop.gremlin.server.channel.WebSocketTestChannelizer; -import org.apache.tinkerpop.gremlin.server.channel.WsAndHttpTestChannelizer; import org.apache.tinkerpop.gremlin.structure.VertexProperty; -import org.apache.tinkerpop.gremlin.util.ExceptionHelper; import org.apache.tinkerpop.gremlin.TestHelper; import org.apache.tinkerpop.gremlin.driver.Client; import org.apache.tinkerpop.gremlin.driver.Cluster; @@@ -976,8 -1379,10 +978,10 @@@ public class GremlinServerIntegrateTes } catch (Exception ex) { final Throwable t = ex.getCause(); assertThat(t, instanceOf(ResponseException.class)); + assertThat(t, instanceOf(FailResponseException.class)); + assertThat(t, instanceOf(Failure.class)); - assertEquals("make it stop", t.getMessage()); - assertEquals(ResponseStatusCode.SERVER_ERROR_FAIL_STEP, ((ResponseException) t).getResponseStatusCode()); + assertThat(t.getMessage(), startsWith("make it stop")); + assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, ((ResponseException) t).getResponseStatusCode()); } }
