This is an automated email from the ASF dual-hosted git repository. kenhuuu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit ae07eb2d741262e76d33fb4270e17dadeaf36d07 Author: Ken Hu <[email protected]> AuthorDate: Tue May 26 17:35:09 2026 -0700 Update gremlin-go submit to block until response headers arrive Split executeAndStream() into sendRequest() (synchronous HTTP call) and streamResponse() (async body streaming). submit() now blocks until the server acknowledges the request (response headers received), then streams the body in the background. Non-GraphBinary HTTP errors (400/500 with text/JSON bodies) are now returned directly from submit() instead of being embedded in the ResultSet. Tests updated accordingly. Assisted-by: Kiro:claude-opus-4-6 --- CHANGELOG.asciidoc | 1 + gremlin-go/driver/connection.go | 113 +++++++++++++++++++--------------- gremlin-go/driver/connection_test.go | 55 ++++++++--------- gremlin-go/driver/interceptor_test.go | 32 +++------- 4 files changed, 97 insertions(+), 104 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index e418c5e4db..92c5776ab0 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -30,6 +30,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima * Added typed numeric wrappers and `preciseNumbers` connection option to `gremlin-javascript` for explicit control over numeric type serialization and deserialization. * Added `NextN(n)` to `Traversal` in `gremlin-go` for batched result iteration, providing API parity with `next(n)` in the Java, Python, and .NET GLVs, and updated the Go translators in `gremlin-core` and `gremlin-javascript` to emit `NextN(n)` for the batched form. * Added Gremlator, a single page web application, that translates Gremlin into various programming languages like Javascript and Python. +* Refactored Go driver connection to block until response headers arrive, enabling synchronous error returns and proper transaction ordering. * Removed `uuid` dependency from `gremlin-javascript` in favor of the built-in `globalThis.crypto.randomUUID()`. * Added streaming HTTP response support to `gremlin-driver` for incremental result deserialization over GraphBinary. * Connected HTTP streaming response deserialization to the traversal API in `gremlin-javascript`, enabling `next()` to return the first result without waiting for the full response. diff --git a/gremlin-go/driver/connection.go b/gremlin-go/driver/connection.go index 882161e36c..8354ec0574 100644 --- a/gremlin-go/driver/connection.go +++ b/gremlin-go/driver/connection.go @@ -119,28 +119,71 @@ func (c *connection) AddInterceptor(interceptor RequestInterceptor) { c.interceptors = append(c.interceptors, interceptor) } -// submit sends request and streams results directly to ResultSet +// submit sends request and streams results directly to ResultSet. +// Blocks until response headers arrive, ensuring the server has acknowledged +// receipt of the request before returning. func (c *connection) submit(req *RequestMessage) (ResultSet, error) { rs := newChannelResultSet() + // Send the HTTP request synchronously — blocks until response headers arrive + resp, err := c.sendRequest(req) + if err != nil { + rs.Close() + return nil, err + } + + // If the HTTP status indicates an error and the response is not GraphBinary, + // read the body as a text/JSON error message instead of attempting binary + // deserialization which would produce cryptic errors. + contentType := resp.Header.Get(HeaderContentType) + if resp.StatusCode >= 400 && !strings.Contains(contentType, graphBinaryMimeType) { + defer func() { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + }() + bodyBytes, readErr := io.ReadAll(resp.Body) + if readErr != nil { + rs.Close() + return nil, fmt.Errorf("Gremlin Server returned HTTP %d and failed to read body: %w", + resp.StatusCode, readErr) + } + errorBody := string(bodyBytes) + errorMsg := tryExtractJSONError(errorBody) + if errorMsg == "" { + errorMsg = fmt.Sprintf("Gremlin Server returned HTTP %d: %s", resp.StatusCode, errorBody) + } + c.logHandler.logf(Error, failedToReceiveResponse, errorMsg) + rs.Close() + return nil, fmt.Errorf("%s", errorMsg) + } + + // Stream the response body into the ResultSet asynchronously c.wg.Add(1) go func() { defer c.wg.Done() - c.executeAndStream(req, rs) + // Drain any unread bytes so the connection can be reused gracefully. + // Without this, Go's HTTP client sends a TCP RST instead of FIN, + // causing "Connection reset by peer" errors on the server. + defer func() { + io.Copy(io.Discard, resp.Body) + if err := resp.Body.Close(); err != nil { + c.logHandler.logf(Debug, failedToCloseResponseBody, err.Error()) + } + }() + defer rs.Close() + c.streamResponse(resp, rs) }() return rs, nil } -func (c *connection) executeAndStream(req *RequestMessage, rs ResultSet) { - defer rs.Close() - +// sendRequest builds and sends the HTTP request, blocking until response headers arrive. +func (c *connection) sendRequest(req *RequestMessage) (*http.Response, error) { // Create HttpRequest for interceptors httpReq, err := NewHttpRequest(http.MethodPost, c.url) if err != nil { c.logHandler.logf(Error, failedToSendRequest, err.Error()) - rs.setError(err) - return + return nil, err } // Set default headers before interceptors @@ -154,8 +197,7 @@ func (c *connection) executeAndStream(req *RequestMessage, rs ResultSet) { for _, interceptor := range c.interceptors { if err := interceptor(httpReq); err != nil { c.logHandler.logf(Error, failedToSendRequest, err.Error()) - rs.setError(err) - return + return nil, err } } @@ -165,15 +207,13 @@ func (c *connection) executeAndStream(req *RequestMessage, rs ResultSet) { data, err := c.serializer.SerializeMessage(r) if err != nil { c.logHandler.logf(Error, failedToSendRequest, err.Error()) - rs.setError(err) - return + return nil, err } httpReq.Body = data } else { errMsg := "request body was not serialized; either provide a serializer or add an interceptor that serializes the request" c.logHandler.logf(Error, failedToSendRequest, errMsg) - rs.setError(fmt.Errorf("%s", errMsg)) - return + return nil, fmt.Errorf("%s", errMsg) } } @@ -184,16 +224,14 @@ func (c *connection) executeAndStream(req *RequestMessage, rs ResultSet) { httpGoReq, err = http.NewRequest(httpReq.Method, httpReq.URL.String(), bytes.NewReader(body)) if err != nil { c.logHandler.logf(Error, failedToSendRequest, err.Error()) - rs.setError(err) - return + return nil, err } httpGoReq.Header = httpReq.Headers case io.Reader: httpGoReq, err = http.NewRequest(httpReq.Method, httpReq.URL.String(), body) if err != nil { c.logHandler.logf(Error, failedToSendRequest, err.Error()) - rs.setError(err) - return + return nil, err } httpGoReq.Header = httpReq.Headers case *http.Request: @@ -201,48 +239,21 @@ func (c *connection) executeAndStream(req *RequestMessage, rs ResultSet) { default: errMsg := fmt.Sprintf("unsupported body type after interceptors: %T", body) c.logHandler.logf(Error, failedToSendRequest, errMsg) - rs.setError(fmt.Errorf("%s", errMsg)) - return + return nil, fmt.Errorf("%s", errMsg) } + // This blocks until response headers arrive resp, err := c.httpClient.Do(httpGoReq) if err != nil { c.logHandler.logf(Error, failedToSendRequest, err.Error()) - rs.setError(err) - return + return nil, err } - defer func() { - // Drain any unread bytes so the connection can be reused gracefully. - // Without this, Go's HTTP client sends a TCP RST instead of FIN, - // causing "Connection reset by peer" errors on the server. - io.Copy(io.Discard, resp.Body) - if err := resp.Body.Close(); err != nil { - c.logHandler.logf(Debug, failedToCloseResponseBody, err.Error()) - } - }() - // If the HTTP status indicates an error and the response is not GraphBinary, - // read the body as a text/JSON error message instead of attempting binary - // deserialization which would produce cryptic errors. - contentType := resp.Header.Get(HeaderContentType) - if resp.StatusCode >= 400 && !strings.Contains(contentType, graphBinaryMimeType) { - bodyBytes, readErr := io.ReadAll(resp.Body) - if readErr != nil { - c.logHandler.logf(Error, failedToReceiveResponse, readErr.Error()) - rs.setError(fmt.Errorf("Gremlin Server returned HTTP %d and failed to read body: %w", - resp.StatusCode, readErr)) - return - } - errorBody := string(bodyBytes) - errorMsg := tryExtractJSONError(errorBody) - if errorMsg == "" { - errorMsg = fmt.Sprintf("Gremlin Server returned HTTP %d: %s", resp.StatusCode, errorBody) - } - c.logHandler.logf(Error, failedToReceiveResponse, errorMsg) - rs.setError(fmt.Errorf("%s", errorMsg)) - return - } + return resp, nil +} +// streamResponse reads the response body and pushes results into the ResultSet. +func (c *connection) streamResponse(resp *http.Response, rs ResultSet) { reader, zlibReader, err := c.getReader(resp) if err != nil { c.logHandler.logf(Error, failedToReceiveResponse, err.Error()) diff --git a/gremlin-go/driver/connection_test.go b/gremlin-go/driver/connection_test.go index aebdeb992f..1b4620a376 100644 --- a/gremlin-go/driver/connection_test.go +++ b/gremlin-go/driver/connection_test.go @@ -990,12 +990,8 @@ func TestConnectionWithMockServer(t *testing.T) { connectionTimeout: 100 * time.Millisecond, }) - rs, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: map[string]interface{}{}}) - assert.NoError(t, err) // submit returns nil, error goes to ResultSet - - // All() blocks until stream closes, then we can check error - _, _ = rs.All() - assert.Error(t, rs.GetError()) + _, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: map[string]interface{}{}}) + assert.Error(t, err) // connection errors are now returned directly }) t.Run("receives headers from request", func(t *testing.T) { @@ -1035,14 +1031,10 @@ func TestConnectionWithMockServer(t *testing.T) { defer server.Close() conn := newConnection(newTestLogHandler(), server.URL, &connectionSettings{}) - rs, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: map[string]interface{}{}}) - require.NoError(t, err) - - _, _ = rs.All() - rsErr := rs.GetError() - require.Error(t, rsErr) - assert.Contains(t, rsErr.Error(), "HTTP 500") - assert.Contains(t, rsErr.Error(), "Internal Server Error") + _, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: map[string]interface{}{}}) + require.Error(t, err) + assert.Contains(t, err.Error(), "HTTP 500") + assert.Contains(t, err.Error(), "Internal Server Error") }) t.Run("extracts message from JSON error response", func(t *testing.T) { @@ -1054,13 +1046,9 @@ func TestConnectionWithMockServer(t *testing.T) { defer server.Close() conn := newConnection(newTestLogHandler(), server.URL, &connectionSettings{}) - rs, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: map[string]interface{}{}}) - require.NoError(t, err) - - _, _ = rs.All() - rsErr := rs.GetError() - require.Error(t, rsErr) - assert.Equal(t, "Authentication required", rsErr.Error()) + _, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: map[string]interface{}{}}) + require.Error(t, err) + assert.Equal(t, "Authentication required", err.Error()) }) t.Run("falls back to raw body for non-JSON error response", func(t *testing.T) { @@ -1072,14 +1060,10 @@ func TestConnectionWithMockServer(t *testing.T) { defer server.Close() conn := newConnection(newTestLogHandler(), server.URL, &connectionSettings{}) - rs, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: map[string]interface{}{}}) - require.NoError(t, err) - - _, _ = rs.All() - rsErr := rs.GetError() - require.Error(t, rsErr) - assert.Contains(t, rsErr.Error(), "HTTP 502") - assert.Contains(t, rsErr.Error(), "<html>Bad Gateway</html>") + _, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: map[string]interface{}{}}) + require.Error(t, err) + assert.Contains(t, err.Error(), "HTTP 502") + assert.Contains(t, err.Error(), "<html>Bad Gateway</html>") }) t.Run("falls through to GraphBinary deserialization for GraphBinary error responses", func(t *testing.T) { @@ -1127,9 +1111,18 @@ func TestConnectionWithMockServer(t *testing.T) { }) t.Run("close waits for in-flight requests", func(t *testing.T) { + // Server responds with headers immediately but streams body slowly. + // This tests that close() waits for the body-streaming goroutine. server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - time.Sleep(200 * time.Millisecond) + w.Header().Set("Content-Type", graphBinaryMimeType) w.WriteHeader(http.StatusOK) + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + // Delay before writing body so the streaming goroutine is still active + time.Sleep(200 * time.Millisecond) + // Write a minimal valid GraphBinary response (version + no-bulking flag) + w.Write([]byte{0x84, 0x00}) })) defer server.Close() @@ -1142,7 +1135,7 @@ func TestConnectionWithMockServer(t *testing.T) { conn.close() elapsed := time.Since(start) - // close() should have waited for the in-flight goroutine + // close() should have waited for the body-streaming goroutine to finish assert.GreaterOrEqual(t, elapsed.Milliseconds(), int64(150), "close() should wait for in-flight requests to complete") diff --git a/gremlin-go/driver/interceptor_test.go b/gremlin-go/driver/interceptor_test.go index 40a947ac9d..adb51786fc 100644 --- a/gremlin-go/driver/interceptor_test.go +++ b/gremlin-go/driver/interceptor_test.go @@ -246,13 +246,9 @@ func TestInterceptor_NilSerializerNoSerialization(t *testing.T) { conn := newConnection(newTestLogHandler(), server.URL, &connectionSettings{}) conn.serializer = nil // explicitly nil serializer - rs, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: map[string]interface{}{}}) - require.NoError(t, err) - - _, _ = rs.All() // drain — this triggers the async executeAndStream - rsErr := rs.GetError() - require.Error(t, rsErr, "should get an error when serializer is nil and no interceptor serializes") - assert.Contains(t, rsErr.Error(), "request body was not serialized", + _, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: map[string]interface{}{}}) + require.Error(t, err, "should get an error when serializer is nil and no interceptor serializes") + assert.Contains(t, err.Error(), "request body was not serialized", "error message should indicate the body was not serialized") } @@ -327,14 +323,10 @@ func TestInterceptor_ErrorPropagation(t *testing.T) { return fmt.Errorf("interceptor failed") }) - rs, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: map[string]interface{}{}}) - require.NoError(t, err) - - _, _ = rs.All() // drain — triggers async executeAndStream - rsErr := rs.GetError() - require.Error(t, rsErr, "interceptor error should propagate to ResultSet") - assert.Contains(t, rsErr.Error(), "interceptor failed", - "ResultSet error should contain the interceptor's error message") + _, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: map[string]interface{}{}}) + require.Error(t, err, "interceptor error should propagate") + assert.Contains(t, err.Error(), "interceptor failed", + "error should contain the interceptor's error message") } // TestInterceptor_UnsupportedBodyType verifies that setting Body to an unsupported type @@ -353,13 +345,9 @@ func TestInterceptor_UnsupportedBodyType(t *testing.T) { return nil }) - rs, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: map[string]interface{}{}}) - require.NoError(t, err) - - _, _ = rs.All() // drain - rsErr := rs.GetError() - require.Error(t, rsErr, "unsupported body type should produce an error") - assert.Contains(t, rsErr.Error(), "unsupported body type", + _, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: map[string]interface{}{}}) + require.Error(t, err, "unsupported body type should produce an error") + assert.Contains(t, err.Error(), "unsupported body type", "error message should indicate unsupported body type") }
