This is an automated email from the ASF dual-hosted git repository. xiazcy pushed a commit to branch go-bulk-fix in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 0ca0641453c52b43ae7cd8d1731930746e227c89 Author: Yang Xia <[email protected]> AuthorDate: Wed Apr 1 15:16:54 2026 -0700 preserve bulk counts as Traverser objects in gremlin-go and lazily unroll during traversal iteration, matching Java/Python/.NET/JS GLV behavior --- gremlin-go/driver/client.go | 7 ++ gremlin-go/driver/client_test.go | 31 ++++++ gremlin-go/driver/connection.go | 18 +++- gremlin-go/driver/connection_test.go | 36 ++++++- gremlin-go/driver/graphBinaryDeserializer.go | 22 +++- gremlin-go/driver/request.go | 5 + gremlin-go/driver/requestOptions.go | 10 ++ gremlin-go/driver/request_test.go | 6 ++ gremlin-go/driver/result.go | 10 +- gremlin-go/driver/resultSet.go | 9 +- gremlin-go/driver/result_test.go | 25 +++++ gremlin-go/driver/serializer.go | 15 ++- gremlin-go/driver/serializer_test.go | 65 ++++++++++++ gremlin-go/driver/traversal.go | 101 ++++++++++++++++--- gremlin-go/driver/traversal_test.go | 145 +++++++++++++++++++++++++++ 15 files changed, 470 insertions(+), 35 deletions(-) diff --git a/gremlin-go/driver/client.go b/gremlin-go/driver/client.go index 4d799b878d..e1ec2ad176 100644 --- a/gremlin-go/driver/client.go +++ b/gremlin-go/driver/client.go @@ -170,6 +170,12 @@ func (client *Client) submitGremlinLang(gremlinLang *GremlinLang) (ResultSet, er requestOptionsBuilder = applyOptionsConfig(requestOptionsBuilder, gremlinLang.optionsStrategies[0].configuration) } + // default bulkResults to true when using DRC through request options + // consistent with Java RequestOptions.getRequestOptions and Python extract_request_options + if requestOptionsBuilder.bulkResults == "" { + requestOptionsBuilder.SetBulkResults(true) + } + request := MakeStringRequest(gremlinLang.GetGremlin(), client.traversalSource, requestOptionsBuilder.Create()) return client.conn.submit(&request) } @@ -185,6 +191,7 @@ func applyOptionsConfig(builder *RequestOptionsBuilder, config map[string]interf "userAgent": "SetUserAgent", "bindings": "SetBindings", "materializeProperties": "SetMaterializeProperties", + "bulkResults": "SetBulkResults", } for key, value := range config { diff --git a/gremlin-go/driver/client_test.go b/gremlin-go/driver/client_test.go index 47d70626e5..ec3e148522 100644 --- a/gremlin-go/driver/client_test.go +++ b/gremlin-go/driver/client_test.go @@ -158,6 +158,37 @@ func TestClient(t *testing.T) { AssertMarkoVertexWithoutProperties(t, result) }) + t.Run("Test client.SubmitWithOptions() with bulkResults", func(t *testing.T) { + skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable) + client, err := NewClient(testNoAuthUrl, + func(settings *ClientSettings) { + settings.TlsConfig = testNoAuthTlsConfig + }) + assert.NoError(t, err) + assert.NotNil(t, client) + defer client.Close() + + resultSet, err := client.SubmitWithOptions("g.inject(1,2,3,2,1)", + new(RequestOptionsBuilder).SetBulkResults(true).Create()) + assert.NoError(t, err) + assert.NotNil(t, resultSet) + results, err := resultSet.All() + assert.NoError(t, err) + // With bulkResults=true, the ResultSet contains Traverser objects (one per unique value). + // This is consistent with Java, Python, .NET, and JS which all expose Traverser objects + // at the ResultSet level when bulking is enabled. + totalBulk := int64(0) + for _, r := range results { + tr, err := r.GetTraverser() + if err == nil { + totalBulk += tr.Bulk + } else { + totalBulk++ + } + } + assert.Equal(t, int64(5), totalBulk) + }) + t.Run("Test deserialization of VertexProperty with properties", func(t *testing.T) { skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable) client, err := NewClient(testNoAuthUrl, diff --git a/gremlin-go/driver/connection.go b/gremlin-go/driver/connection.go index 54def5821a..882161e36c 100644 --- a/gremlin-go/driver/connection.go +++ b/gremlin-go/driver/connection.go @@ -317,7 +317,23 @@ func (c *connection) streamToResultSet(reader io.Reader, rs ResultSet) { return } - rs.Channel() <- &Result{obj} + if d.IsBulked() { + bulkObj, err := d.ReadFullyQualified() + if err != nil { + c.logHandler.logf(Error, failedToReceiveResponse, err.Error()) + rs.setError(err) + return + } + bulk, ok := bulkObj.(int64) + if !ok { + c.logHandler.logf(Error, failedToReceiveResponse, "expected int64 bulk count") + rs.setError(fmt.Errorf("expected int64 bulk count, got %T", bulkObj)) + return + } + rs.Channel() <- &Result{&Traverser{Bulk: bulk, Value: obj}} + } else { + rs.Channel() <- &Result{obj} + } } } diff --git a/gremlin-go/driver/connection_test.go b/gremlin-go/driver/connection_test.go index 4a730b96f2..aebdeb992f 100644 --- a/gremlin-go/driver/connection_test.go +++ b/gremlin-go/driver/connection_test.go @@ -376,8 +376,11 @@ func TestConnection(t *testing.T) { var names []string for _, res := range allResults { assert.NotNil(t, res) - vp, err := res.GetVertexProperty() - assert.Nil(t, err) + // DRC defaults bulkResults=true, so results should be Traverser-wrapped. + tr, ok := res.Data.(*Traverser) + assert.True(t, ok, "expected *Traverser from DRC path with bulkResults=true, got %T", res.Data) + vp, ok := tr.Value.(*VertexProperty) + assert.True(t, ok) names = append(names, vp.Value.(string)) } assert.True(t, sortAndCompareTwoStringSlices(names, testNames)) @@ -775,6 +778,35 @@ func TestConnection(t *testing.T) { assert.Greater(t, len(props), 0) } }) + + t.Run("Test bulkResults with DRC request option", func(t *testing.T) { + skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable) + + g := getModernGraph(t, testNoAuthUrl, &tls.Config{}) + defer g.remoteConnection.Close() + + // bulkResults is defaulted to true in submitGremlinLang, results should still be correct + results, err := g.Inject(1, 2, 3, 2, 1).ToList() + assert.Nil(t, err) + assert.Equal(t, 5, len(results)) + }) + + t.Run("Test bulkResults with explicit With option", func(t *testing.T) { + skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable) + + g := getModernGraph(t, testNoAuthUrl, &tls.Config{}) + defer g.remoteConnection.Close() + + // explicitly set bulkResults to true via With + results, err := g.With("bulkResults", true).Inject(1, 2, 3, 2, 1).ToList() + assert.Nil(t, err) + assert.Equal(t, 5, len(results)) + + // explicitly set bulkResults to false via With + results, err = g.With("bulkResults", false).Inject(1, 2, 3, 2, 1).ToList() + assert.Nil(t, err) + assert.Equal(t, 5, len(results)) + }) } func submitCount(i int, client *Client, t *testing.T) { diff --git a/gremlin-go/driver/graphBinaryDeserializer.go b/gremlin-go/driver/graphBinaryDeserializer.go index dece9bcb3f..5939dbbda0 100644 --- a/gremlin-go/driver/graphBinaryDeserializer.go +++ b/gremlin-go/driver/graphBinaryDeserializer.go @@ -57,9 +57,10 @@ import ( // The bufio.Reader wrapper provides efficient buffering without affecting the // streaming semantics - it simply reduces the number of underlying read syscalls. type GraphBinaryDeserializer struct { - r *bufio.Reader - buf [8]byte - err error // sticky error + r *bufio.Reader + buf [8]byte + err error // sticky error + bulked bool // whether the response stream uses bulked encoding } // GraphBinary flag for bulked list/set @@ -133,13 +134,24 @@ func (d *GraphBinaryDeserializer) readInt64() (int64, error) { } // ReadHeader reads and validates the GraphBinary response header. +// The header consists of a version byte and a bulking flag byte (0x00 = not bulked, 0x01 = bulked). // This must be called before reading any objects from the stream. func (d *GraphBinaryDeserializer) ReadHeader() error { if _, err := d.readByte(); err != nil { return err } - _, err := d.readByte() - return err + flag, err := d.readByte() + if err != nil { + return err + } + d.bulked = flag == 0x01 + return nil +} + +// IsBulked returns whether the response stream uses bulked encoding. +// This is determined by the header flag read during ReadHeader(). +func (d *GraphBinaryDeserializer) IsBulked() bool { + return d.bulked } // ReadFullyQualified reads the next fully-qualified GraphBinary value from the stream. diff --git a/gremlin-go/driver/request.go b/gremlin-go/driver/request.go index eafc0a5071..49c8161eb5 100644 --- a/gremlin-go/driver/request.go +++ b/gremlin-go/driver/request.go @@ -71,6 +71,10 @@ func MakeStringRequest(stringGremlin string, traversalSource string, requestOpti newFields["materializeProperties"] = requestOptions.materializeProperties } + if requestOptions.bulkResults != "" { + newFields["bulkResults"] = requestOptions.bulkResults + } + return RequestMessage{ Gremlin: stringGremlin, Fields: newFields, @@ -85,4 +89,5 @@ var allowedReqArgs = map[string]bool{ "requestId": true, "userAgent": true, "materializeProperties": true, + "bulkResults": true, } diff --git a/gremlin-go/driver/requestOptions.go b/gremlin-go/driver/requestOptions.go index abca4fbf0b..ec5d0cb21a 100644 --- a/gremlin-go/driver/requestOptions.go +++ b/gremlin-go/driver/requestOptions.go @@ -20,6 +20,8 @@ under the License. package gremlingo import ( + "strconv" + "github.com/google/uuid" ) @@ -30,6 +32,7 @@ type RequestOptions struct { userAgent string bindings map[string]interface{} materializeProperties string + bulkResults string } type RequestOptionsBuilder struct { @@ -39,6 +42,7 @@ type RequestOptionsBuilder struct { userAgent string bindings map[string]interface{} materializeProperties string + bulkResults string } func (builder *RequestOptionsBuilder) SetRequestId(requestId uuid.UUID) *RequestOptionsBuilder { @@ -71,6 +75,11 @@ func (builder *RequestOptionsBuilder) SetMaterializeProperties(materializeProper return builder } +func (builder *RequestOptionsBuilder) SetBulkResults(bulkResults bool) *RequestOptionsBuilder { + builder.bulkResults = strconv.FormatBool(bulkResults) + return builder +} + func (builder *RequestOptionsBuilder) AddBinding(key string, binding interface{}) *RequestOptionsBuilder { if builder.bindings == nil { builder.bindings = make(map[string]interface{}) @@ -88,6 +97,7 @@ func (builder *RequestOptionsBuilder) Create() RequestOptions { requestOptions.userAgent = builder.userAgent requestOptions.bindings = builder.bindings requestOptions.materializeProperties = builder.materializeProperties + requestOptions.bulkResults = builder.bulkResults return *requestOptions } diff --git a/gremlin-go/driver/request_test.go b/gremlin-go/driver/request_test.go index 6bd91f5756..f14f75a236 100644 --- a/gremlin-go/driver/request_test.go +++ b/gremlin-go/driver/request_test.go @@ -51,4 +51,10 @@ func TestRequest(t *testing.T) { new(RequestOptionsBuilder).SetUserAgent("TestUserAgent").Create()) assert.Equal(t, "TestUserAgent", r.Fields["userAgent"]) }) + + t.Run("Test makeStringRequest() with bulkResults", func(t *testing.T) { + r := MakeStringRequest("g.V()", "g", + new(RequestOptionsBuilder).SetBulkResults(true).Create()) + assert.Equal(t, "true", r.Fields["bulkResults"]) + }) } diff --git a/gremlin-go/driver/result.go b/gremlin-go/driver/result.go index 79643a74e2..6c234083f4 100644 --- a/gremlin-go/driver/result.go +++ b/gremlin-go/driver/result.go @@ -204,11 +204,13 @@ func (r *Result) GetVertexProperty() (*VertexProperty, error) { // GetTraverser returns the Result if it is a Traverser, otherwise returns an error. func (r *Result) GetTraverser() (*Traverser, error) { - res, ok := r.Data.(Traverser) - if !ok { - return nil, newError(err0607ResultNotTraverserError) + if res, ok := r.Data.(*Traverser); ok { + return res, nil } - return &res, nil + if res, ok := r.Data.(Traverser); ok { + return &res, nil + } + return nil, newError(err0607ResultNotTraverserError) } // GetSlice returns the Result if it is a Slice, otherwise returns an error. diff --git a/gremlin-go/driver/resultSet.go b/gremlin-go/driver/resultSet.go index f274b5a807..79fcd67f6d 100644 --- a/gremlin-go/driver/resultSet.go +++ b/gremlin-go/driver/resultSet.go @@ -20,7 +20,6 @@ under the License. package gremlingo import ( - "reflect" "sync" ) @@ -154,13 +153,7 @@ func (channelResultSet *channelResultSet) addResult(r *Result) { channelResultSet.channelMutex.Lock() if data, ok := r.Data.([]interface{}); ok { for _, v := range data { - if reflect.TypeOf(v) == reflect.TypeOf(&Traverser{}) { - for i := int64(0); i < (v.(*Traverser)).bulk; i++ { - channelResultSet.channel <- &Result{(v.(*Traverser)).value} - } - } else { - channelResultSet.channel <- &Result{v} - } + channelResultSet.channel <- &Result{v} } } else { channelResultSet.channel <- &Result{r.Data} diff --git a/gremlin-go/driver/result_test.go b/gremlin-go/driver/result_test.go index d311f9bc15..815f51fbb1 100644 --- a/gremlin-go/driver/result_test.go +++ b/gremlin-go/driver/result_test.go @@ -379,4 +379,29 @@ func TestResult(t *testing.T) { res := r.IsNil() assert.True(t, res) }) + + t.Run("Test Result.GetTraverser() with pointer", func(t *testing.T) { + tr := &Traverser{Bulk: 3, Value: "marko"} + r := Result{tr} + res, err := r.GetTraverser() + assert.Nil(t, err) + assert.Equal(t, int64(3), res.Bulk) + assert.Equal(t, "marko", res.Value) + }) + + t.Run("Test Result.GetTraverser() with value", func(t *testing.T) { + tr := Traverser{Bulk: 1, Value: 42} + r := Result{tr} + res, err := r.GetTraverser() + assert.Nil(t, err) + assert.Equal(t, int64(1), res.Bulk) + assert.Equal(t, 42, res.Value) + }) + + t.Run("Test Result.GetTraverser() error expected", func(t *testing.T) { + r := Result{"not a traverser"} + res, err := r.GetTraverser() + assert.Nil(t, res) + assert.Error(t, err) + }) } diff --git a/gremlin-go/driver/serializer.go b/gremlin-go/driver/serializer.go index a6c4c9d79a..bd4a426208 100644 --- a/gremlin-go/driver/serializer.go +++ b/gremlin-go/driver/serializer.go @@ -22,6 +22,7 @@ package gremlingo import ( "bytes" "encoding/binary" + "fmt" "io" "sync" ) @@ -158,7 +159,19 @@ func (gs *GraphBinarySerializer) DeserializeMessage(message []byte) (Response, e if n == EndOfStream() { break } - results = append(results, n) + if d.IsBulked() { + bulkObj, err := d.ReadFullyQualified() + if err != nil { + return msg, err + } + bulk, ok := bulkObj.(int64) + if !ok { + return msg, fmt.Errorf("expected int64 bulk count, got %T", bulkObj) + } + results = append(results, &Traverser{Bulk: bulk, Value: n}) + } else { + results = append(results, n) + } } msg.ResponseResult.Data = results diff --git a/gremlin-go/driver/serializer_test.go b/gremlin-go/driver/serializer_test.go index 92225f25fb..14d15d2059 100644 --- a/gremlin-go/driver/serializer_test.go +++ b/gremlin-go/driver/serializer_test.go @@ -54,6 +54,71 @@ func TestSerializer(t *testing.T) { assert.Equal(t, "OK", response.ResponseStatus.message) assert.Equal(t, []interface{}{int32(0)}, response.ResponseResult.Data) }) + + t.Run("test deserialized bulked response message", func(t *testing.T) { + // Build a bulked response: version=0x84, bulked=0x01, + // value: int32(7) [type=0x01, flag=0x00, value=0x00000007], + // bulk: int64(3) [type=0x02, flag=0x00, value=0x0000000000000003], + // EndOfStream marker [0xFD, 0x00, 0x00], + // status code 200 [0x00, 0x00, 0x00, 0xC8], + // null message [0x01], null exception [0x01] + responseByteArray := []byte{ + 0x84, 0x01, // version, bulked=true + 0x01, 0x00, 0x00, 0x00, 0x00, 0x07, // int32(7) + 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, // int64(3) bulk count + 0xFD, 0x00, 0x00, // EndOfStream marker + 0x00, 0x00, 0x00, 0xC8, // status code 200 + 0x01, // null message + 0x01, // null exception + } + serializer := newGraphBinarySerializer(newLogHandler(&defaultLogger{}, Error, language.English)) + response, err := serializer.DeserializeMessage(responseByteArray) + assert.Nil(t, err) + assert.Equal(t, uint32(200), response.ResponseStatus.code) + + // Bulked response should produce a single Traverser with Bulk=3, Value=int32(7) + data, ok := response.ResponseResult.Data.([]interface{}) + assert.True(t, ok) + assert.Equal(t, 1, len(data)) + tr, ok := data[0].(*Traverser) + assert.True(t, ok) + assert.Equal(t, int64(3), tr.Bulk) + assert.Equal(t, int32(7), tr.Value) + }) + + t.Run("test deserialized bulked response with multiple values", func(t *testing.T) { + // Bulked response with two values: + // int32(1) with bulk 2, int32(3) with bulk 1 + responseByteArray := []byte{ + 0x84, 0x01, // version, bulked=true + 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, // int32(1) + 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, // int64(2) bulk count + 0x01, 0x00, 0x00, 0x00, 0x00, 0x03, // int32(3) + 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, // int64(1) bulk count + 0xFD, 0x00, 0x00, // EndOfStream marker + 0x00, 0x00, 0x00, 0xC8, // status code 200 + 0x01, // null message + 0x01, // null exception + } + serializer := newGraphBinarySerializer(newLogHandler(&defaultLogger{}, Error, language.English)) + response, err := serializer.DeserializeMessage(responseByteArray) + assert.Nil(t, err) + assert.Equal(t, uint32(200), response.ResponseStatus.code) + + data, ok := response.ResponseResult.Data.([]interface{}) + assert.True(t, ok) + assert.Equal(t, 2, len(data)) + + tr0, ok := data[0].(*Traverser) + assert.True(t, ok) + assert.Equal(t, int64(2), tr0.Bulk) + assert.Equal(t, int32(1), tr0.Value) + + tr1, ok := data[1].(*Traverser) + assert.True(t, ok) + assert.Equal(t, int64(1), tr1.Bulk) + assert.Equal(t, int32(3), tr1.Value) + }) } func TestSerializerFailures(t *testing.T) { diff --git a/gremlin-go/driver/traversal.go b/gremlin-go/driver/traversal.go index ed25eea2fe..075d408984 100644 --- a/gremlin-go/driver/traversal.go +++ b/gremlin-go/driver/traversal.go @@ -25,17 +25,71 @@ import ( ) // Traverser is the objects propagating through the traversal. +// When bulking is enabled, each Traverser wraps a single value with a Bulk count +// indicating how many times that value appears in the result stream. type Traverser struct { - bulk int64 - value interface{} + Bulk int64 + Value interface{} } // Traversal is the primary way in which graphs are processed. type Traversal struct { - graph *Graph - GremlinLang *GremlinLang - remote *DriverRemoteConnection - results ResultSet + graph *Graph + GremlinLang *GremlinLang + remote *DriverRemoteConnection + results ResultSet + lastTraverser *Traverser // current traverser being lazily unrolled +} + +// nextValue returns the next individual value, lazily unrolling Traverser bulk counts. +// This mirrors Java's DriverRemoteTraversal.next() and Python's Traversal.__next__(). +// Returns (value, true, nil) on success, (nil, false, nil) when exhausted, +// or (nil, false, err) on error. +func (t *Traversal) nextValue() (interface{}, bool, error) { + for { + // If we have a traverser with remaining bulk, decrement and return + if t.lastTraverser != nil && t.lastTraverser.Bulk > 0 { + val := t.lastTraverser.Value + t.lastTraverser.Bulk-- + if t.lastTraverser.Bulk == 0 { + t.lastTraverser = nil + } + return val, true, nil + } + t.lastTraverser = nil + + // Get next result from the ResultSet + results, err := t.GetResultSet() + if err != nil { + return nil, false, err + } + if results.IsEmpty() { + return nil, false, results.GetError() + } + result, ok, err := results.One() + if err != nil { + return nil, false, err + } + if !ok { + return nil, false, nil + } + + // If it's a Traverser, start unrolling (loop back for bulk <= 0) + if tr, ok := result.Data.(*Traverser); ok { + if tr.Bulk <= 0 { + continue + } + val := tr.Value + tr.Bulk-- + if tr.Bulk > 0 { + t.lastTraverser = tr + } + return val, true, nil + } + + // Non-traverser result (unbulked response), return directly + return result.Data, true, nil + } } // ToList returns the result in a list. @@ -44,11 +98,23 @@ func (t *Traversal) ToList() ([]*Result, error) { return nil, newError(err0901ToListAnonTraversalError) } - results, err := t.remote.submitGremlinLang(t.GremlinLang) + _, err := t.GetResultSet() if err != nil { return nil, err } - return results.All() + + var results []*Result + for { + val, ok, err := t.nextValue() + if err != nil { + return nil, err + } + if !ok { + break + } + results = append(results, &Result{val}) + } + return results, t.results.GetError() } // ToSet returns the results in a set. @@ -95,6 +161,10 @@ func (t *Traversal) Iterate() <-chan error { // HasNext returns true if the result is not empty. func (t *Traversal) HasNext() (bool, error) { + // If we have a traverser with remaining bulk, there are more results + if t.lastTraverser != nil && t.lastTraverser.Bulk > 0 { + return true, nil + } results, err := t.GetResultSet() if err != nil { return false, err @@ -102,21 +172,24 @@ func (t *Traversal) HasNext() (bool, error) { return !results.IsEmpty(), nil } -// Next returns next result. +// Next returns the next result. func (t *Traversal) Next() (*Result, error) { - results, err := t.GetResultSet() + _, err := t.GetResultSet() + if err != nil { + return nil, err + } + val, ok, err := t.nextValue() if err != nil { return nil, err } - if results.IsEmpty() { - err = results.GetError() + if !ok { + err = t.results.GetError() if err != nil { return nil, err } return nil, newError(err0903NextNoResultsLeftError) } - result, _, err := results.One() - return result, err + return &Result{val}, nil } // GetResultSet submits the traversal and returns the ResultSet. diff --git a/gremlin-go/driver/traversal_test.go b/gremlin-go/driver/traversal_test.go index e0795567b6..0b814bc6a5 100644 --- a/gremlin-go/driver/traversal_test.go +++ b/gremlin-go/driver/traversal_test.go @@ -542,6 +542,151 @@ func TestTraversal(t *testing.T) { }) } +func TestTraversalNextValue(t *testing.T) { + // Helper to create a closed ResultSet pre-populated with results. + makeResultSet := func(results ...*Result) ResultSet { + rs := newChannelResultSetCapacity(len(results) + 1).(*channelResultSet) + for _, r := range results { + rs.channel <- r + } + rs.channelMutex.Lock() + rs.closed = true + close(rs.channel) + rs.channelMutex.Unlock() + return rs + } + + t.Run("unrolls Traverser with bulk > 1", func(t *testing.T) { + rs := makeResultSet( + &Result{&Traverser{Bulk: 3, Value: "marko"}}, + ) + trav := &Traversal{results: rs} + + var values []interface{} + for { + val, ok, err := trav.nextValue() + assert.Nil(t, err) + if !ok { + break + } + values = append(values, val) + } + assert.Equal(t, []interface{}{"marko", "marko", "marko"}, values) + }) + + t.Run("unrolls Traverser with bulk == 1", func(t *testing.T) { + rs := makeResultSet( + &Result{&Traverser{Bulk: 1, Value: 42}}, + ) + trav := &Traversal{results: rs} + + val, ok, err := trav.nextValue() + assert.Nil(t, err) + assert.True(t, ok) + assert.Equal(t, 42, val) + + // Should be exhausted + _, ok, err = trav.nextValue() + assert.Nil(t, err) + assert.False(t, ok) + }) + + t.Run("handles raw non-Traverser results", func(t *testing.T) { + rs := makeResultSet( + &Result{"hello"}, + &Result{int32(99)}, + ) + trav := &Traversal{results: rs} + + val, ok, err := trav.nextValue() + assert.Nil(t, err) + assert.True(t, ok) + assert.Equal(t, "hello", val) + + val, ok, err = trav.nextValue() + assert.Nil(t, err) + assert.True(t, ok) + assert.Equal(t, int32(99), val) + + _, ok, err = trav.nextValue() + assert.Nil(t, err) + assert.False(t, ok) + }) + + t.Run("handles mixed Traverser and raw results", func(t *testing.T) { + rs := makeResultSet( + &Result{&Traverser{Bulk: 2, Value: "a"}}, + &Result{"b"}, + &Result{&Traverser{Bulk: 1, Value: "c"}}, + ) + trav := &Traversal{results: rs} + + var values []interface{} + for { + val, ok, err := trav.nextValue() + assert.Nil(t, err) + if !ok { + break + } + values = append(values, val) + } + assert.Equal(t, []interface{}{"a", "a", "b", "c"}, values) + }) + + t.Run("skips Traverser with bulk == 0", func(t *testing.T) { + rs := makeResultSet( + &Result{&Traverser{Bulk: 0, Value: "skip-me"}}, + &Result{&Traverser{Bulk: 1, Value: "keep-me"}}, + ) + trav := &Traversal{results: rs} + + val, ok, err := trav.nextValue() + assert.Nil(t, err) + assert.True(t, ok) + assert.Equal(t, "keep-me", val) + + _, ok, err = trav.nextValue() + assert.Nil(t, err) + assert.False(t, ok) + }) + + t.Run("empty ResultSet returns not-ok", func(t *testing.T) { + rs := makeResultSet() + trav := &Traversal{results: rs} + + _, ok, err := trav.nextValue() + assert.Nil(t, err) + assert.False(t, ok) + }) + + t.Run("HasNext returns true when lastTraverser has remaining bulk", func(t *testing.T) { + rs := makeResultSet( + &Result{&Traverser{Bulk: 3, Value: "x"}}, + ) + trav := &Traversal{results: rs} + + // Consume first value to set lastTraverser + val, ok, err := trav.nextValue() + assert.Nil(t, err) + assert.True(t, ok) + assert.Equal(t, "x", val) + + // HasNext should return true from lastTraverser (bulk=2 remaining) + hasNext, err := trav.HasNext() + assert.Nil(t, err) + assert.True(t, hasNext) + + // Drain remaining + trav.nextValue() // bulk 2->1 + trav.nextValue() // bulk 1->0, lastTraverser cleared + + // Now should be empty + hasNext, err = trav.HasNext() + assert.Nil(t, err) + assert.False(t, hasNext) + }) +} + func newWithOptionsConnection(t *testing.T) *GraphTraversalSource { // No authentication integration test with graphs loaded and alias configured server testNoAuthWithAliasUrl := getEnvOrDefaultString("GREMLIN_SERVER_URL", noAuthUrl)
