This is an automated email from the ASF dual-hosted git repository.
xiazcy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/master by this push:
new 0531cbb7d0 Fix bulking behavior in gremlin-go for TP 4.0 (#3397)
0531cbb7d0 is described below
commit 0531cbb7d01476f57f6c446bb3a20588d6d532cd
Author: Yang Xia <[email protected]>
AuthorDate: Fri May 1 15:58:29 2026 -0700
Fix bulking behavior in gremlin-go for TP 4.0 (#3397)
---
docs/src/reference/gremlin-variants.asciidoc | 32 +++++--
gremlin-go/driver/client.go | 8 +-
gremlin-go/driver/client_test.go | 54 ++++++++++++
gremlin-go/driver/connection.go | 18 +++-
gremlin-go/driver/connection_test.go | 36 +++++++-
gremlin-go/driver/graphBinaryDeserializer.go | 22 +++--
gremlin-go/driver/request.go | 8 +-
gremlin-go/driver/requestOptions.go | 20 ++---
gremlin-go/driver/requestOptions_test.go | 6 --
gremlin-go/driver/request_test.go | 6 ++
gremlin-go/driver/result.go | 10 ++-
gremlin-go/driver/resultSet.go | 9 +-
gremlin-go/driver/result_test.go | 25 ++++++
gremlin-go/driver/serializer.go | 15 +++-
gremlin-go/driver/serializer_test.go | 65 ++++++++++++++
gremlin-go/driver/traversal.go | 101 +++++++++++++++++++---
gremlin-go/driver/traversal_test.go | 125 +++++++++++++++++++++++++++
17 files changed, 499 insertions(+), 61 deletions(-)
diff --git a/docs/src/reference/gremlin-variants.asciidoc
b/docs/src/reference/gremlin-variants.asciidoc
index 32c4a64661..6500a61e10 100644
--- a/docs/src/reference/gremlin-variants.asciidoc
+++ b/docs/src/reference/gremlin-variants.asciidoc
@@ -216,9 +216,13 @@ Some connection options can also be set on individual
requests made through the
results, err := g.With("evaluationTimeout", 500).V().Out("knows").ToList()
----
-The following options are allowed on a per-request basis in this fashion:
`batchSize`, `userAgent` and
+The following options are allowed on a per-request basis in this fashion:
`batchSize`, `bulkResults`, `userAgent` and
`evaluationTimeout`.
+NOTE: When submitting traversals through `DriverRemoteConnection`,
`bulkResults` defaults to `true` per-request
+to optimize result transfer. This does not apply to direct `Client.Submit()`
calls, where `bulkResults` must be
+set explicitly if desired.
+
anchor:go-imports[]
[[gremlin-go-imports]]
=== Common Imports
@@ -351,7 +355,7 @@ options := new(RequestOptionsBuilder).
resultSet, err := client.SubmitWithOptions("g.V(x).count()", options)
----
-The following options are allowed on a per-request basis in this fashion:
`batchSize`, `userAgent`,
+The following options are allowed on a per-request basis in this fashion:
`batchSize`, `bulkResults`, `userAgent`,
`evaluationTimeout` and `materializeProperties`.
`RequestOptions` may also contain a map of variable `bindings` to be applied
to the supplied
traversal string.
@@ -813,9 +817,13 @@ GraphTraversalSource g = traversal().with(conf);
List<Vertex> vertices = g.with(Tokens.ARGS_EVAL_TIMEOUT,
500L).V().out("knows").toList()
----
-The following options are allowed on a per-request basis in this fashion:
`batchSize`, `userAgent`,
+The following options are allowed on a per-request basis in this fashion:
`batchSize`, `bulkResults`, `userAgent`,
`materializeProperties` and `evaluationTimeout`. Use of `Tokens` to reference
these options is preferred.
+NOTE: When submitting traversals through `DriverRemoteConnection`,
`bulkResults` defaults to `true` per-request
+to optimize result transfer. This does not apply to direct `Client.submit()`
calls, where `bulkResults` must be
+set explicitly if desired.
+
anchor:java-imports[]
[[gremlin-java-imports]]
=== Common Imports
@@ -1590,6 +1598,10 @@ const vertices = await g.with_('evaluationTimeout',
500).V().out('knows').toList
The following options are allowed on a per-request basis in this fashion:
`batchSize`, `requestId`, `userAgent`,
`bulkResults`, `materializeProperties` and `evaluationTimeout`.
+NOTE: When submitting traversals through `DriverRemoteConnection`,
`bulkResults` defaults to `true` per-request
+to optimize result transfer. This does not apply to direct `Client.submit()`
calls, where `bulkResults` must be
+set explicitly if desired.
+
[[gremlin-javascript-imports]]
=== Common Imports
@@ -2124,10 +2136,14 @@ For instance to set request timeout to 500 milliseconds:
var l = g.With(Tokens.ArgsEvalTimeout, 500).V().Out("knows").Count().ToList();
----
-The following options are allowed on a per-request basis in this fashion:
`batchSize`, `userAgent`,
+The following options are allowed on a per-request basis in this fashion:
`batchSize`, `bulkResults`, `userAgent`,
`materializeProperties`, and `evaluationTimeout`. These options are available
as constants on the
`Gremlin.Net.Driver.Tokens` class.
+NOTE: When submitting traversals through `DriverRemoteConnection`,
`bulkResults` defaults to `true` per-request
+to optimize result transfer. This does not apply to direct
`GremlinClient.SubmitAsync()` calls, where `bulkResults`
+must be set explicitly if desired.
+
[[gremlin-dotnet-imports]]
=== Common Imports
@@ -2270,7 +2286,7 @@ feature is to set a per-request override to the
`evaluationTimeout` so that it o
include::../../../gremlin-dotnet/test/Gremlin.Net.IntegrationTest/Docs/Reference/GremlinVariantsTests.cs[tags=submittingScriptsWithTimeout]
----
-The following options are allowed on a per-request basis in this fashion:
`batchSize`, `userAgent`, `materializeProperties`
+The following options are allowed on a per-request basis in this fashion:
`batchSize`, `bulkResults`, `userAgent`, `materializeProperties`
and `evaluationTimeout`. These options are available as constants on the
`Gremlin.Net.Driver.Tokens` class.
==== Request Interceptors
@@ -2582,6 +2598,10 @@ vertices = g.with_('evaluationTimeout',
500).V().out('knows').to_list()
The following options are allowed on a per-request basis in this fashion:
`batchSize`, `bulkResults`, `language`,
`materializeProperties`, `userAgent`, and `evaluationTimeout`.
+NOTE: When submitting traversals through `DriverRemoteConnection`,
`bulkResults` defaults to `True` per-request
+to optimize result transfer. This does not apply to direct `Client.submit()`
calls, where `bulkResults` must be
+set explicitly if desired.
+
anchor:python-imports[]
[[gremlin-python-imports]]
=== Common Imports
@@ -2808,7 +2828,7 @@ request.
result_set = client.submit('g.V().repeat(both()).times(100)',
request_options={'evaluationTimeout': 5000})
----
-The following options are allowed on a per-request basis in this fashion:
`batchSize`, `requestId`, `userAgent`,
+The following options are allowed on a per-request basis in this fashion:
`batchSize`, `bulkResults`, `requestId`, `userAgent`,
`materializeProperties` and `evaluationTimeout` (formerly
`scriptEvaluationTimeout` which is also supported but now deprecated).
IMPORTANT: The preferred method for setting a per-request timeout for scripts
is demonstrated above, but those familiar
diff --git a/gremlin-go/driver/client.go b/gremlin-go/driver/client.go
index 4d799b878d..444635033c 100644
--- a/gremlin-go/driver/client.go
+++ b/gremlin-go/driver/client.go
@@ -170,6 +170,12 @@ func (client *Client) submitGremlinLang(gremlinLang
*GremlinLang) (ResultSet, er
requestOptionsBuilder =
applyOptionsConfig(requestOptionsBuilder,
gremlinLang.optionsStrategies[0].configuration)
}
+ // default bulkResults to true when using DRC through request options
+ // consistent with Java RequestOptions.getRequestOptions and Python
extract_request_options
+ if requestOptionsBuilder.bulkResults == nil {
+ requestOptionsBuilder.SetBulkResults(true)
+ }
+
request := MakeStringRequest(gremlinLang.GetGremlin(),
client.traversalSource, requestOptionsBuilder.Create())
return client.conn.submit(&request)
}
@@ -179,12 +185,12 @@ func applyOptionsConfig(builder *RequestOptionsBuilder,
config map[string]interf
// Map configuration keys to setter method names
setterMap := map[string]string{
- "requestId": "SetRequestId",
"evaluationTimeout": "SetEvaluationTimeout",
"batchSize": "SetBatchSize",
"userAgent": "SetUserAgent",
"bindings": "SetBindings",
"materializeProperties": "SetMaterializeProperties",
+ "bulkResults": "SetBulkResults",
}
for key, value := range config {
diff --git a/gremlin-go/driver/client_test.go b/gremlin-go/driver/client_test.go
index 47d70626e5..9b201efb3f 100644
--- a/gremlin-go/driver/client_test.go
+++ b/gremlin-go/driver/client_test.go
@@ -158,6 +158,60 @@ func TestClient(t *testing.T) {
AssertMarkoVertexWithoutProperties(t, result)
})
+ t.Run("Test client.SubmitWithOptions() with bulkResults true", func(t
*testing.T) {
+ skipTestsIfNotEnabled(t, integrationTestSuiteName,
testNoAuthEnable)
+ client, err := NewClient(testNoAuthUrl,
+ func(settings *ClientSettings) {
+ settings.TlsConfig = testNoAuthTlsConfig
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, client)
+ defer client.Close()
+
+ resultSet, err :=
client.SubmitWithOptions("g.inject(1,2,3,2,1)",
+
new(RequestOptionsBuilder).SetBulkResults(true).Create())
+ assert.NoError(t, err)
+ assert.NotNil(t, resultSet)
+ results, err := resultSet.All()
+ assert.NoError(t, err)
+ // With bulkResults=true, the ResultSet contains Traverser
objects (one per unique value).
+ // This is consistent with Java, Python, .NET, and JS which all
expose Traverser objects
+ // at the ResultSet level when bulking is enabled.
+ // g.inject(1,2,3,2,1) has 3 unique values: 1 (bulk=2), 2
(bulk=2), 3 (bulk=1)
+ assert.Equal(t, 3, len(results))
+ totalBulk := int64(0)
+ for _, r := range results {
+ tr, err := r.GetTraverser()
+ assert.NoError(t, err)
+ totalBulk += tr.Bulk
+ }
+ assert.Equal(t, int64(5), totalBulk)
+ })
+
+ t.Run("Test client.SubmitWithOptions() with bulkResults false", func(t
*testing.T) {
+ skipTestsIfNotEnabled(t, integrationTestSuiteName,
testNoAuthEnable)
+ client, err := NewClient(testNoAuthUrl,
+ func(settings *ClientSettings) {
+ settings.TlsConfig = testNoAuthTlsConfig
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, client)
+ defer client.Close()
+
+ resultSet, err :=
client.SubmitWithOptions("g.inject(1,2,3,2,1)",
+
new(RequestOptionsBuilder).SetBulkResults(false).Create())
+ assert.NoError(t, err)
+ assert.NotNil(t, resultSet)
+ results, err := resultSet.All()
+ assert.NoError(t, err)
+ // With bulkResults=false, the ResultSet contains raw values
(no Traverser wrapping).
+ assert.Equal(t, 5, len(results))
+ for _, r := range results {
+ _, err := r.GetTraverser()
+ assert.Error(t, err, "expected raw value, not
Traverser")
+ }
+ })
+
t.Run("Test deserialization of VertexProperty with properties", func(t
*testing.T) {
skipTestsIfNotEnabled(t, integrationTestSuiteName,
testNoAuthEnable)
client, err := NewClient(testNoAuthUrl,
diff --git a/gremlin-go/driver/connection.go b/gremlin-go/driver/connection.go
index 54def5821a..882161e36c 100644
--- a/gremlin-go/driver/connection.go
+++ b/gremlin-go/driver/connection.go
@@ -317,7 +317,23 @@ func (c *connection) streamToResultSet(reader io.Reader,
rs ResultSet) {
return
}
- rs.Channel() <- &Result{obj}
+ if d.IsBulked() {
+ bulkObj, err := d.ReadFullyQualified()
+ if err != nil {
+ c.logHandler.logf(Error,
failedToReceiveResponse, err.Error())
+ rs.setError(err)
+ return
+ }
+ bulk, ok := bulkObj.(int64)
+ if !ok {
+ c.logHandler.logf(Error,
failedToReceiveResponse, "expected int64 bulk count")
+ rs.setError(fmt.Errorf("expected int64 bulk
count, got %T", bulkObj))
+ return
+ }
+ rs.Channel() <- &Result{&Traverser{Bulk: bulk, Value:
obj}}
+ } else {
+ rs.Channel() <- &Result{obj}
+ }
}
}
diff --git a/gremlin-go/driver/connection_test.go
b/gremlin-go/driver/connection_test.go
index 4a730b96f2..aebdeb992f 100644
--- a/gremlin-go/driver/connection_test.go
+++ b/gremlin-go/driver/connection_test.go
@@ -376,8 +376,11 @@ func TestConnection(t *testing.T) {
var names []string
for _, res := range allResults {
assert.NotNil(t, res)
- vp, err := res.GetVertexProperty()
- assert.Nil(t, err)
+ // DRC defaults bulkResults=true, so results should be
Traverser-wrapped.
+ tr, ok := res.Data.(*Traverser)
+ assert.True(t, ok, "expected *Traverser from DRC path
with bulkResults=true, got %T", res.Data)
+ vp, ok := tr.Value.(*VertexProperty)
+ assert.True(t, ok)
names = append(names, vp.Value.(string))
}
assert.True(t, sortAndCompareTwoStringSlices(names, testNames))
@@ -775,6 +778,35 @@ func TestConnection(t *testing.T) {
assert.Greater(t, len(props), 0)
}
})
+
+ t.Run("Test bulkResults with DRC request option", func(t *testing.T) {
+ skipTestsIfNotEnabled(t, integrationTestSuiteName,
testNoAuthEnable)
+
+ g := getModernGraph(t, testNoAuthUrl, &tls.Config{})
+ defer g.remoteConnection.Close()
+
+ // bulkResults is defaulted to true in submitGremlinLang,
results should still be correct
+ results, err := g.Inject(1, 2, 3, 2, 1).ToList()
+ assert.Nil(t, err)
+ assert.Equal(t, 5, len(results))
+ })
+
+ t.Run("Test bulkResults with explicit With option", func(t *testing.T) {
+ skipTestsIfNotEnabled(t, integrationTestSuiteName,
testNoAuthEnable)
+
+ g := getModernGraph(t, testNoAuthUrl, &tls.Config{})
+ defer g.remoteConnection.Close()
+
+ // explicitly set bulkResults to true via With
+ results, err := g.With("bulkResults", true).Inject(1, 2, 3, 2,
1).ToList()
+ assert.Nil(t, err)
+ assert.Equal(t, 5, len(results))
+
+ // explicitly set bulkResults to false via With
+ results, err = g.With("bulkResults", false).Inject(1, 2, 3, 2,
1).ToList()
+ assert.Nil(t, err)
+ assert.Equal(t, 5, len(results))
+ })
}
func submitCount(i int, client *Client, t *testing.T) {
diff --git a/gremlin-go/driver/graphBinaryDeserializer.go
b/gremlin-go/driver/graphBinaryDeserializer.go
index dece9bcb3f..5939dbbda0 100644
--- a/gremlin-go/driver/graphBinaryDeserializer.go
+++ b/gremlin-go/driver/graphBinaryDeserializer.go
@@ -57,9 +57,10 @@ import (
// The bufio.Reader wrapper provides efficient buffering without affecting the
// streaming semantics - it simply reduces the number of underlying read
syscalls.
type GraphBinaryDeserializer struct {
- r *bufio.Reader
- buf [8]byte
- err error // sticky error
+ r *bufio.Reader
+ buf [8]byte
+ err error // sticky error
+ bulked bool // whether the response stream uses bulked encoding
}
// GraphBinary flag for bulked list/set
@@ -133,13 +134,24 @@ func (d *GraphBinaryDeserializer) readInt64() (int64,
error) {
}
// ReadHeader reads and validates the GraphBinary response header.
+// The header consists of a version byte and a bulking flag byte (0x00 = not
bulked, 0x01 = bulked).
// This must be called before reading any objects from the stream.
func (d *GraphBinaryDeserializer) ReadHeader() error {
if _, err := d.readByte(); err != nil {
return err
}
- _, err := d.readByte()
- return err
+ flag, err := d.readByte()
+ if err != nil {
+ return err
+ }
+ d.bulked = flag == 0x01
+ return nil
+}
+
+// IsBulked returns whether the response stream uses bulked encoding.
+// This is determined by the header flag read during ReadHeader().
+func (d *GraphBinaryDeserializer) IsBulked() bool {
+ return d.bulked
}
// ReadFullyQualified reads the next fully-qualified GraphBinary value from
the stream.
diff --git a/gremlin-go/driver/request.go b/gremlin-go/driver/request.go
index eafc0a5071..df02ed4b22 100644
--- a/gremlin-go/driver/request.go
+++ b/gremlin-go/driver/request.go
@@ -19,6 +19,8 @@ under the License.
package gremlingo
+import "strconv"
+
// RequestMessage represents a request to the server.
type RequestMessage struct {
Gremlin string
@@ -71,6 +73,10 @@ func MakeStringRequest(stringGremlin string, traversalSource
string, requestOpti
newFields["materializeProperties"] =
requestOptions.materializeProperties
}
+ if requestOptions.bulkResults != nil {
+ newFields["bulkResults"] =
strconv.FormatBool(*requestOptions.bulkResults)
+ }
+
return RequestMessage{
Gremlin: stringGremlin,
Fields: newFields,
@@ -82,7 +88,7 @@ func MakeStringRequest(stringGremlin string, traversalSource
string, requestOpti
var allowedReqArgs = map[string]bool{
"evaluationTimeout": true,
"batchSize": true,
- "requestId": true,
"userAgent": true,
"materializeProperties": true,
+ "bulkResults": true,
}
diff --git a/gremlin-go/driver/requestOptions.go
b/gremlin-go/driver/requestOptions.go
index abca4fbf0b..a622abca82 100644
--- a/gremlin-go/driver/requestOptions.go
+++ b/gremlin-go/driver/requestOptions.go
@@ -19,31 +19,22 @@ under the License.
package gremlingo
-import (
- "github.com/google/uuid"
-)
-
type RequestOptions struct {
- requestID uuid.UUID
evaluationTimeout int
batchSize int
userAgent string
bindings map[string]interface{}
materializeProperties string
+ bulkResults *bool
}
type RequestOptionsBuilder struct {
- requestID uuid.UUID
evaluationTimeout int
batchSize int
userAgent string
bindings map[string]interface{}
materializeProperties string
-}
-
-func (builder *RequestOptionsBuilder) SetRequestId(requestId uuid.UUID)
*RequestOptionsBuilder {
- builder.requestID = requestId
- return builder
+ bulkResults *bool
}
func (builder *RequestOptionsBuilder) SetEvaluationTimeout(evaluationTimeout
int) *RequestOptionsBuilder {
@@ -71,6 +62,11 @@ func (builder *RequestOptionsBuilder)
SetMaterializeProperties(materializeProper
return builder
}
+func (builder *RequestOptionsBuilder) SetBulkResults(bulkResults bool)
*RequestOptionsBuilder {
+ builder.bulkResults = &bulkResults
+ return builder
+}
+
func (builder *RequestOptionsBuilder) AddBinding(key string, binding
interface{}) *RequestOptionsBuilder {
if builder.bindings == nil {
builder.bindings = make(map[string]interface{})
@@ -82,12 +78,12 @@ func (builder *RequestOptionsBuilder) AddBinding(key
string, binding interface{}
func (builder *RequestOptionsBuilder) Create() RequestOptions {
requestOptions := new(RequestOptions)
- requestOptions.requestID = builder.requestID
requestOptions.evaluationTimeout = builder.evaluationTimeout
requestOptions.batchSize = builder.batchSize
requestOptions.userAgent = builder.userAgent
requestOptions.bindings = builder.bindings
requestOptions.materializeProperties = builder.materializeProperties
+ requestOptions.bulkResults = builder.bulkResults
return *requestOptions
}
diff --git a/gremlin-go/driver/requestOptions_test.go
b/gremlin-go/driver/requestOptions_test.go
index 104a7e6ed2..b715fe01f7 100644
--- a/gremlin-go/driver/requestOptions_test.go
+++ b/gremlin-go/driver/requestOptions_test.go
@@ -20,18 +20,12 @@ under the License.
package gremlingo
import (
- "github.com/google/uuid"
"testing"
"github.com/stretchr/testify/assert"
)
func TestRequestOptions(t *testing.T) {
- t.Run("Test RequestOptionsBuilder with custom requestID", func(t
*testing.T) {
- requestId := uuid.New()
- r := new(RequestOptionsBuilder).SetRequestId(requestId).Create()
- assert.Equal(t, requestId, r.requestID)
- })
t.Run("Test RequestOptionsBuilder with custom evaluationTimeout",
func(t *testing.T) {
r :=
new(RequestOptionsBuilder).SetEvaluationTimeout(1234).Create()
assert.Equal(t, 1234, r.evaluationTimeout)
diff --git a/gremlin-go/driver/request_test.go
b/gremlin-go/driver/request_test.go
index 6bd91f5756..f14f75a236 100644
--- a/gremlin-go/driver/request_test.go
+++ b/gremlin-go/driver/request_test.go
@@ -51,4 +51,10 @@ func TestRequest(t *testing.T) {
new(RequestOptionsBuilder).SetUserAgent("TestUserAgent").Create())
assert.Equal(t, "TestUserAgent", r.Fields["userAgent"])
})
+
+ t.Run("Test makeStringRequest() with bulkResults", func(t *testing.T) {
+ r := MakeStringRequest("g.V()", "g",
+
new(RequestOptionsBuilder).SetBulkResults(true).Create())
+ assert.Equal(t, "true", r.Fields["bulkResults"])
+ })
}
diff --git a/gremlin-go/driver/result.go b/gremlin-go/driver/result.go
index 79643a74e2..6c234083f4 100644
--- a/gremlin-go/driver/result.go
+++ b/gremlin-go/driver/result.go
@@ -204,11 +204,13 @@ func (r *Result) GetVertexProperty() (*VertexProperty,
error) {
// GetTraverser returns the Result if it is a Traverser, otherwise returns an
error.
func (r *Result) GetTraverser() (*Traverser, error) {
- res, ok := r.Data.(Traverser)
- if !ok {
- return nil, newError(err0607ResultNotTraverserError)
+ if res, ok := r.Data.(*Traverser); ok {
+ return res, nil
}
- return &res, nil
+ if res, ok := r.Data.(Traverser); ok {
+ return &res, nil
+ }
+ return nil, newError(err0607ResultNotTraverserError)
}
// GetSlice returns the Result if it is a Slice, otherwise returns an error.
diff --git a/gremlin-go/driver/resultSet.go b/gremlin-go/driver/resultSet.go
index f274b5a807..79fcd67f6d 100644
--- a/gremlin-go/driver/resultSet.go
+++ b/gremlin-go/driver/resultSet.go
@@ -20,7 +20,6 @@ under the License.
package gremlingo
import (
- "reflect"
"sync"
)
@@ -154,13 +153,7 @@ func (channelResultSet *channelResultSet) addResult(r
*Result) {
channelResultSet.channelMutex.Lock()
if data, ok := r.Data.([]interface{}); ok {
for _, v := range data {
- if reflect.TypeOf(v) == reflect.TypeOf(&Traverser{}) {
- for i := int64(0); i < (v.(*Traverser)).bulk;
i++ {
- channelResultSet.channel <-
&Result{(v.(*Traverser)).value}
- }
- } else {
- channelResultSet.channel <- &Result{v}
- }
+ channelResultSet.channel <- &Result{v}
}
} else {
channelResultSet.channel <- &Result{r.Data}
diff --git a/gremlin-go/driver/result_test.go b/gremlin-go/driver/result_test.go
index d311f9bc15..815f51fbb1 100644
--- a/gremlin-go/driver/result_test.go
+++ b/gremlin-go/driver/result_test.go
@@ -379,4 +379,29 @@ func TestResult(t *testing.T) {
res := r.IsNil()
assert.True(t, res)
})
+
+ t.Run("Test Result.GetTraverser() with pointer", func(t *testing.T) {
+ tr := &Traverser{Bulk: 3, Value: "marko"}
+ r := Result{tr}
+ res, err := r.GetTraverser()
+ assert.Nil(t, err)
+ assert.Equal(t, int64(3), res.Bulk)
+ assert.Equal(t, "marko", res.Value)
+ })
+
+ t.Run("Test Result.GetTraverser() with value", func(t *testing.T) {
+ tr := Traverser{Bulk: 1, Value: 42}
+ r := Result{tr}
+ res, err := r.GetTraverser()
+ assert.Nil(t, err)
+ assert.Equal(t, int64(1), res.Bulk)
+ assert.Equal(t, 42, res.Value)
+ })
+
+ t.Run("Test Result.GetTraverser() error expected", func(t *testing.T) {
+ r := Result{"not a traverser"}
+ res, err := r.GetTraverser()
+ assert.Nil(t, res)
+ assert.Error(t, err)
+ })
}
diff --git a/gremlin-go/driver/serializer.go b/gremlin-go/driver/serializer.go
index a6c4c9d79a..bd4a426208 100644
--- a/gremlin-go/driver/serializer.go
+++ b/gremlin-go/driver/serializer.go
@@ -22,6 +22,7 @@ package gremlingo
import (
"bytes"
"encoding/binary"
+ "fmt"
"io"
"sync"
)
@@ -158,7 +159,19 @@ func (gs *GraphBinarySerializer)
DeserializeMessage(message []byte) (Response, e
if n == EndOfStream() {
break
}
- results = append(results, n)
+ if d.IsBulked() {
+ bulkObj, err := d.ReadFullyQualified()
+ if err != nil {
+ return msg, err
+ }
+ bulk, ok := bulkObj.(int64)
+ if !ok {
+ return msg, fmt.Errorf("expected int64 bulk
count, got %T", bulkObj)
+ }
+ results = append(results, &Traverser{Bulk: bulk, Value:
n})
+ } else {
+ results = append(results, n)
+ }
}
msg.ResponseResult.Data = results
diff --git a/gremlin-go/driver/serializer_test.go
b/gremlin-go/driver/serializer_test.go
index 92225f25fb..14d15d2059 100644
--- a/gremlin-go/driver/serializer_test.go
+++ b/gremlin-go/driver/serializer_test.go
@@ -54,6 +54,71 @@ func TestSerializer(t *testing.T) {
assert.Equal(t, "OK", response.ResponseStatus.message)
assert.Equal(t, []interface{}{int32(0)},
response.ResponseResult.Data)
})
+
+ t.Run("test deserialized bulked response message", func(t *testing.T) {
+ // Build a bulked response: version=0x84, bulked=0x01,
+ // value: int32(7) [type=0x01, flag=0x00, value=0x00000007],
+ // bulk: int64(3) [type=0x02, flag=0x00,
value=0x0000000000000003],
+ // EndOfStream marker [0xFD, 0x00, 0x00],
+ // status code 200 [0x00, 0x00, 0x00, 0xC8],
+ // null message [0x01], null exception [0x01]
+ responseByteArray := []byte{
+ 0x84, 0x01, // version, bulked=true
+ 0x01, 0x00, 0x00, 0x00, 0x00, 0x07, // int32(7)
+ 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x03, // int64(3) bulk count
+ 0xFD, 0x00, 0x00, // EndOfStream marker
+ 0x00, 0x00, 0x00, 0xC8, // status code 200
+ 0x01, // null message
+ 0x01, // null exception
+ }
+ serializer :=
newGraphBinarySerializer(newLogHandler(&defaultLogger{}, Error,
language.English))
+ response, err :=
serializer.DeserializeMessage(responseByteArray)
+ assert.Nil(t, err)
+ assert.Equal(t, uint32(200), response.ResponseStatus.code)
+
+ // Bulked response should produce a single Traverser with
Bulk=3, Value=int32(7)
+ data, ok := response.ResponseResult.Data.([]interface{})
+ assert.True(t, ok)
+ assert.Equal(t, 1, len(data))
+ tr, ok := data[0].(*Traverser)
+ assert.True(t, ok)
+ assert.Equal(t, int64(3), tr.Bulk)
+ assert.Equal(t, int32(7), tr.Value)
+ })
+
+ t.Run("test deserialized bulked response with multiple values", func(t
*testing.T) {
+ // Bulked response with two values:
+ // int32(1) with bulk 2, int32(3) with bulk 1
+ responseByteArray := []byte{
+ 0x84, 0x01, // version, bulked=true
+ 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, // int32(1)
+ 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x02, // int64(2) bulk count
+ 0x01, 0x00, 0x00, 0x00, 0x00, 0x03, // int32(3)
+ 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x01, // int64(1) bulk count
+ 0xFD, 0x00, 0x00, // EndOfStream marker
+ 0x00, 0x00, 0x00, 0xC8, // status code 200
+ 0x01, // null message
+ 0x01, // null exception
+ }
+ serializer :=
newGraphBinarySerializer(newLogHandler(&defaultLogger{}, Error,
language.English))
+ response, err :=
serializer.DeserializeMessage(responseByteArray)
+ assert.Nil(t, err)
+ assert.Equal(t, uint32(200), response.ResponseStatus.code)
+
+ data, ok := response.ResponseResult.Data.([]interface{})
+ assert.True(t, ok)
+ assert.Equal(t, 2, len(data))
+
+ tr0, ok := data[0].(*Traverser)
+ assert.True(t, ok)
+ assert.Equal(t, int64(2), tr0.Bulk)
+ assert.Equal(t, int32(1), tr0.Value)
+
+ tr1, ok := data[1].(*Traverser)
+ assert.True(t, ok)
+ assert.Equal(t, int64(1), tr1.Bulk)
+ assert.Equal(t, int32(3), tr1.Value)
+ })
}
func TestSerializerFailures(t *testing.T) {
diff --git a/gremlin-go/driver/traversal.go b/gremlin-go/driver/traversal.go
index ed25eea2fe..075d408984 100644
--- a/gremlin-go/driver/traversal.go
+++ b/gremlin-go/driver/traversal.go
@@ -25,17 +25,71 @@ import (
)
// Traverser is the objects propagating through the traversal.
+// When bulking is enabled, each Traverser wraps a single value with a Bulk
count
+// indicating how many times that value appears in the result stream.
type Traverser struct {
- bulk int64
- value interface{}
+ Bulk int64
+ Value interface{}
}
// Traversal is the primary way in which graphs are processed.
type Traversal struct {
- graph *Graph
- GremlinLang *GremlinLang
- remote *DriverRemoteConnection
- results ResultSet
+ graph *Graph
+ GremlinLang *GremlinLang
+ remote *DriverRemoteConnection
+ results ResultSet
+ lastTraverser *Traverser // current traverser being lazily unrolled
+}
+
+// nextValue returns the next individual value, lazily unrolling Traverser
bulk counts.
+// This mirrors Java's DriverRemoteTraversal.next() and Python's
Traversal.__next__().
+// Returns (value, true, nil) on success, (nil, false, nil) when exhausted,
+// or (nil, false, err) on error.
+func (t *Traversal) nextValue() (interface{}, bool, error) {
+ for {
+ // If we have a traverser with remaining bulk, decrement and
return
+ if t.lastTraverser != nil && t.lastTraverser.Bulk > 0 {
+ val := t.lastTraverser.Value
+ t.lastTraverser.Bulk--
+ if t.lastTraverser.Bulk == 0 {
+ t.lastTraverser = nil
+ }
+ return val, true, nil
+ }
+ t.lastTraverser = nil
+
+ // Get next result from the ResultSet
+ results, err := t.GetResultSet()
+ if err != nil {
+ return nil, false, err
+ }
+ if results.IsEmpty() {
+ return nil, false, results.GetError()
+ }
+ result, ok, err := results.One()
+ if err != nil {
+ return nil, false, err
+ }
+ if !ok {
+ return nil, false, nil
+ }
+
+ // If it's a Traverser, start unrolling (loop back for bulk <=
0)
+ if tr, ok := result.Data.(*Traverser); ok {
+ if tr.Bulk <= 0 {
+ continue
+ }
+ val := tr.Value
+ tr.Bulk--
+ if tr.Bulk > 0 {
+ t.lastTraverser = tr
+ }
+ return val, true, nil
+ }
+
+ // Non-traverser result (unbulked response), return directly
+ return result.Data, true, nil
+ }
}
// ToList returns the result in a list.
@@ -44,11 +98,23 @@ func (t *Traversal) ToList() ([]*Result, error) {
return nil, newError(err0901ToListAnonTraversalError)
}
- results, err := t.remote.submitGremlinLang(t.GremlinLang)
+ _, err := t.GetResultSet()
if err != nil {
return nil, err
}
- return results.All()
+
+ var results []*Result
+ for {
+ val, ok, err := t.nextValue()
+ if err != nil {
+ return nil, err
+ }
+ if !ok {
+ break
+ }
+ results = append(results, &Result{val})
+ }
+ return results, t.results.GetError()
}
// ToSet returns the results in a set.
@@ -95,6 +161,10 @@ func (t *Traversal) Iterate() <-chan error {
// HasNext returns true if the result is not empty.
func (t *Traversal) HasNext() (bool, error) {
+ // If we have a traverser with remaining bulk, there are more results
+ if t.lastTraverser != nil && t.lastTraverser.Bulk > 0 {
+ return true, nil
+ }
results, err := t.GetResultSet()
if err != nil {
return false, err
@@ -102,21 +172,24 @@ func (t *Traversal) HasNext() (bool, error) {
return !results.IsEmpty(), nil
}
-// Next returns next result.
+// Next returns the next result.
func (t *Traversal) Next() (*Result, error) {
- results, err := t.GetResultSet()
+ _, err := t.GetResultSet()
+ if err != nil {
+ return nil, err
+ }
+ val, ok, err := t.nextValue()
if err != nil {
return nil, err
}
- if results.IsEmpty() {
- err = results.GetError()
+ if !ok {
+ err = t.results.GetError()
if err != nil {
return nil, err
}
return nil, newError(err0903NextNoResultsLeftError)
}
- result, _, err := results.One()
- return result, err
+ return &Result{val}, nil
}
// GetResultSet submits the traversal and returns the ResultSet.
diff --git a/gremlin-go/driver/traversal_test.go
b/gremlin-go/driver/traversal_test.go
index e0795567b6..8d60d42116 100644
--- a/gremlin-go/driver/traversal_test.go
+++ b/gremlin-go/driver/traversal_test.go
@@ -542,6 +542,131 @@ func TestTraversal(t *testing.T) {
})
}
+func TestTraversalNextValue(t *testing.T) {
+ // Helper to create a closed ResultSet pre-populated with results.
+ makeResultSet := func(results ...*Result) ResultSet {
+ rs := newChannelResultSetCapacity(len(results) +
1).(*channelResultSet)
+ for _, r := range results {
+ rs.channel <- r
+ }
+ rs.channelMutex.Lock()
+ rs.closed = true
+ close(rs.channel)
+ rs.channelMutex.Unlock()
+ return rs
+ }
+
+ t.Run("unrolls Traverser with bulk > 1", func(t *testing.T) {
+ rs := makeResultSet(
+ &Result{&Traverser{Bulk: 3, Value: "marko"}},
+ )
+ trav := &Traversal{results: rs}
+
+ var values []interface{}
+ for {
+ val, ok, err := trav.nextValue()
+ assert.Nil(t, err)
+ if !ok {
+ break
+ }
+ values = append(values, val)
+ }
+ assert.Equal(t, []interface{}{"marko", "marko", "marko"},
values)
+ })
+
+ t.Run("unrolls Traverser with bulk == 1", func(t *testing.T) {
+ rs := makeResultSet(
+ &Result{&Traverser{Bulk: 1, Value: 42}},
+ )
+ trav := &Traversal{results: rs}
+
+ val, ok, err := trav.nextValue()
+ assert.Nil(t, err)
+ assert.True(t, ok)
+ assert.Equal(t, 42, val)
+
+ // Should be exhausted
+ _, ok, err = trav.nextValue()
+ assert.Nil(t, err)
+ assert.False(t, ok)
+ })
+
+ t.Run("handles raw non-Traverser results", func(t *testing.T) {
+ rs := makeResultSet(
+ &Result{"hello"},
+ &Result{int32(99)},
+ )
+ trav := &Traversal{results: rs}
+
+ val, ok, err := trav.nextValue()
+ assert.Nil(t, err)
+ assert.True(t, ok)
+ assert.Equal(t, "hello", val)
+
+ val, ok, err = trav.nextValue()
+ assert.Nil(t, err)
+ assert.True(t, ok)
+ assert.Equal(t, int32(99), val)
+
+ _, ok, err = trav.nextValue()
+ assert.Nil(t, err)
+ assert.False(t, ok)
+ })
+
+ t.Run("skips Traverser with bulk == 0", func(t *testing.T) {
+ rs := makeResultSet(
+ &Result{&Traverser{Bulk: 0, Value: "skip-me"}},
+ &Result{&Traverser{Bulk: 1, Value: "keep-me"}},
+ )
+ trav := &Traversal{results: rs}
+
+ val, ok, err := trav.nextValue()
+ assert.Nil(t, err)
+ assert.True(t, ok)
+ assert.Equal(t, "keep-me", val)
+
+ _, ok, err = trav.nextValue()
+ assert.Nil(t, err)
+ assert.False(t, ok)
+ })
+
+ t.Run("empty ResultSet returns not-ok", func(t *testing.T) {
+ rs := makeResultSet()
+ trav := &Traversal{results: rs}
+
+ _, ok, err := trav.nextValue()
+ assert.Nil(t, err)
+ assert.False(t, ok)
+ })
+
+ t.Run("HasNext returns true when lastTraverser has remaining bulk",
func(t *testing.T) {
+ rs := makeResultSet(
+ &Result{&Traverser{Bulk: 3, Value: "x"}},
+ )
+ trav := &Traversal{results: rs}
+
+ // Consume first value to set lastTraverser
+ val, ok, err := trav.nextValue()
+ assert.Nil(t, err)
+ assert.True(t, ok)
+ assert.Equal(t, "x", val)
+
+ // HasNext should return true from lastTraverser (bulk=2
remaining)
+ hasNext, err := trav.HasNext()
+ assert.Nil(t, err)
+ assert.True(t, hasNext)
+
+ // Drain remaining
+ trav.nextValue() // bulk 2->1
+ trav.nextValue() // bulk 1->0, lastTraverser cleared
+
+ // Now should be empty
+ hasNext, err = trav.HasNext()
+ assert.Nil(t, err)
+ assert.False(t, hasNext)
+ })
+}
+
func newWithOptionsConnection(t *testing.T) *GraphTraversalSource {
// No authentication integration test with graphs loaded and alias
configured server
testNoAuthWithAliasUrl := getEnvOrDefaultString("GREMLIN_SERVER_URL",
noAuthUrl)