This is an automated email from the ASF dual-hosted git repository. Cole-Greer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit bded3921c95f26cc231d524a03bcf1dc94c371bf Merge: 0579a14402 4e791ed92f Author: Cole Greer <[email protected]> AuthorDate: Thu Jun 11 15:42:28 2026 -0700 Merge branch '3.8-dev' .github/workflows/build-test.yml | 7 +- .github/workflows/codeql.yml | 4 ++ CHANGELOG.asciidoc | 1 + gremlin-go/driver/traversal.go | 28 ++++++++ gremlin-go/driver/traversal_test.go | 127 ++++++++++++++++++++++++++++++++++++ 5 files changed, 165 insertions(+), 2 deletions(-) diff --cc .github/workflows/build-test.yml index 7fd28b8bcc,fe87fb9840..b70a77b25f --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@@ -4,8 -4,11 +4,11 @@@ env # modules commonly excluded from builds as they have their own independent non-JVM setups and can be run in parallel. # take care when modifying this list because GLVs use shell commands to remove themselves from this list and # modifications could break patterns of replacement they are searching for. - EXCLUDE_MODULES: '-:gremlin-dotnet-source,-:gremlin-dotnet-tests,-:gremlin-go,-:gremlin-javascript,-:gremlint,-:gremlin-mcp,-:gremlin-python' - EXCLUDE_FOR_GLV: '-:gremlin-archetype,-:gremlin-console,-:gremlin-coverage,-:hadoop-gremlin,-:neo4j-gremlin,-:spark-gremlin,-:sparql-gremlin' + EXCLUDE_MODULES: '-:gremlin-dotnet-source,-:gremlin-dotnet-tests,-:gremlin-go,-:gremlin-js,-:gremlin-javascript,-:gremlint,-:gremlin-mcp,-:gremlin-python' + EXCLUDE_FOR_GLV: '-:gremlin-console,-:gremlin-coverage,-:hadoop-gremlin,-:spark-gremlin' + concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: ${{ github.event_name == 'pull_request' }} jobs: smoke: name: smoke diff --cc gremlin-go/driver/traversal.go index a78f6ee9c2,6b4084b3f0..85ad547db7 --- a/gremlin-go/driver/traversal.go +++ b/gremlin-go/driver/traversal.go @@@ -189,34 -118,38 +189,62 @@@ func (t *Traversal) Next() (*Result, er } return nil, newError(err0903NextNoResultsLeftError) } - result, _, err := results.One() - return result, err + return &Result{val}, nil +} + +// NextN returns up to n results from the traversal. If the traversal has +// fewer than n results, only those results are returned. If n is non-positive, +// an empty slice is returned. +func (t *Traversal) NextN(n int) ([]*Result, error) { + if n <= 0 { + return []*Result{}, nil + } + _, err := t.GetResultSet() + if err != nil { + return nil, err + } + results := make([]*Result, 0, n) + for i := 0; i < n; i++ { + val, ok, err := t.nextValue() + if err != nil { + return results, err + } + if !ok { + break + } + results = append(results, &Result{val}) + } + return results, t.results.GetError() } + // NextN returns up to n results from the traversal. If the traversal has + // fewer than n results, only those results are returned. If n is non-positive, + // an empty slice is returned. + func (t *Traversal) NextN(n int) ([]*Result, error) { + if n <= 0 { + return []*Result{}, nil + } + results, err := t.GetResultSet() + if err != nil { + return nil, err + } + out := make([]*Result, 0, n) + for i := 0; i < n; i++ { + if results.IsEmpty() { + break + } + result, ok, err := results.One() + if err != nil { + return out, err + } + if !ok { + break + } + out = append(out, result) + } + return out, results.GetError() + } + // GetResultSet submits the traversal and returns the ResultSet. func (t *Traversal) GetResultSet() (ResultSet, error) { if t.results == nil { diff --cc gremlin-go/driver/traversal_test.go index 83c264ec33,e156cce700..63bab4f615 --- a/gremlin-go/driver/traversal_test.go +++ b/gremlin-go/driver/traversal_test.go @@@ -188,288 -526,134 +189,414 @@@ func TestTraversal(t *testing.T) assert.True(t, results[0].GetType().Kind() == reflect.Map) }) + + t.Run("Test should extract ID from Vertex", func(t *testing.T) { + g := cloneGraphTraversalSource(&Graph{}, NewGremlinLang(nil), nil) + + // Test basic V() step with mixed ID types + vStart := g.V(1, &Vertex{Element: Element{Id: 2}}) + assert.Equal(t, "g.V(1,2)", vStart.GremlinLang.GetGremlin()) + + // Test V() step in the middle of a traversal + vMid := g.Inject("foo").V(1, &Vertex{Element: Element{Id: 2}}) + assert.Equal(t, "g.inject(\"foo\").V(1,2)", vMid.GremlinLang.GetGremlin()) + + // Test edge creation with from/to vertices + fromTo := g.AddE("Edge").From(&Vertex{Element: Element{Id: 1}}).To(&Vertex{Element: Element{Id: 2}}) + assert.Equal(t, "g.addE(\"Edge\").from(__.V(1)).to(__.V(2))", fromTo.GremlinLang.GetGremlin()) + + // Test mergeE() with Vertex in map + mergeMap := map[interface{}]interface{}{ + T.Label: "knows", + Direction.Out: &Vertex{Element: Element{Id: 1}}, + Direction.In: &Vertex{Element: Element{Id: 2}}, + } + + mergeEStart := g.MergeE(mergeMap) + // No order guarantee in map arguments when creating GremlinLang, assert individually + assert.True(t, strings.HasPrefix(mergeEStart.GremlinLang.GetGremlin(), "g.mergeE(")) + assert.Contains(t, mergeEStart.GremlinLang.GetGremlin(), "label:\"knows\"") + assert.Contains(t, mergeEStart.GremlinLang.GetGremlin(), "Direction.OUT:1") + assert.Contains(t, mergeEStart.GremlinLang.GetGremlin(), "Direction.IN:2") + + // Test mergeE() in the middle of a traversal + mergeEMid := g.Inject("foo").MergeE(mergeMap) + // No order guarantee in map arguments when creating GremlinLang, assert individually + assert.True(t, strings.HasPrefix(mergeEMid.GremlinLang.GetGremlin(), "g.inject(\"foo\").mergeE(")) + assert.Contains(t, mergeEMid.GremlinLang.GetGremlin(), "label:\"knows\"") + assert.Contains(t, mergeEMid.GremlinLang.GetGremlin(), "Direction.OUT:1") + assert.Contains(t, mergeEMid.GremlinLang.GetGremlin(), "Direction.IN:2") + }) +} + +func TestTraversalNextValue(t *testing.T) { + // Helper to create a closed ResultSet pre-populated with results. + makeResultSet := func(results ...*Result) ResultSet { + rs := newChannelResultSetCapacity(len(results) + 1).(*channelResultSet) + for _, r := range results { + rs.channel <- r + } + rs.channelMutex.Lock() + rs.closed = true + close(rs.channel) + rs.channelMutex.Unlock() + return rs + } + + t.Run("unrolls Traverser with bulk > 1", func(t *testing.T) { + rs := makeResultSet( + &Result{&Traverser{Bulk: 3, Value: "marko"}}, + ) + trav := &Traversal{results: rs} + + var values []interface{} + for { + val, ok, err := trav.nextValue() + assert.Nil(t, err) + if !ok { + break + } + values = append(values, val) + } + assert.Equal(t, []interface{}{"marko", "marko", "marko"}, values) + }) + + t.Run("unrolls Traverser with bulk == 1", func(t *testing.T) { + rs := makeResultSet( + &Result{&Traverser{Bulk: 1, Value: 42}}, + ) + trav := &Traversal{results: rs} + + val, ok, err := trav.nextValue() + assert.Nil(t, err) + assert.True(t, ok) + assert.Equal(t, 42, val) + + // Should be exhausted + _, ok, err = trav.nextValue() + assert.Nil(t, err) + assert.False(t, ok) + }) + + t.Run("handles raw non-Traverser results", func(t *testing.T) { + rs := makeResultSet( + &Result{"hello"}, + &Result{int32(99)}, + ) + trav := &Traversal{results: rs} + + val, ok, err := trav.nextValue() + assert.Nil(t, err) + assert.True(t, ok) + assert.Equal(t, "hello", val) + + val, ok, err = trav.nextValue() + assert.Nil(t, err) + assert.True(t, ok) + assert.Equal(t, int32(99), val) + + _, ok, err = trav.nextValue() + assert.Nil(t, err) + assert.False(t, ok) + }) + + t.Run("skips Traverser with bulk == 0", func(t *testing.T) { + rs := makeResultSet( + &Result{&Traverser{Bulk: 0, Value: "skip-me"}}, + &Result{&Traverser{Bulk: 1, Value: "keep-me"}}, + ) + trav := &Traversal{results: rs} + + val, ok, err := trav.nextValue() + assert.Nil(t, err) + assert.True(t, ok) + assert.Equal(t, "keep-me", val) + + _, ok, err = trav.nextValue() + assert.Nil(t, err) + assert.False(t, ok) + }) + + t.Run("empty ResultSet returns not-ok", func(t *testing.T) { + rs := makeResultSet() + trav := &Traversal{results: rs} + + _, ok, err := trav.nextValue() + assert.Nil(t, err) + assert.False(t, ok) + }) + + t.Run("HasNext returns true when lastTraverser has remaining bulk", func(t *testing.T) { + rs := makeResultSet( + &Result{&Traverser{Bulk: 3, Value: "x"}}, + ) + trav := &Traversal{results: rs} + + // Consume first value to set lastTraverser + val, ok, err := trav.nextValue() + assert.Nil(t, err) + assert.True(t, ok) + assert.Equal(t, "x", val) + + // HasNext should return true from lastTraverser (bulk=2 remaining) + hasNext, err := trav.HasNext() + assert.Nil(t, err) + assert.True(t, hasNext) + + // Drain remaining + trav.nextValue() // bulk 2->1 + trav.nextValue() // bulk 1->0, lastTraverser cleared + + // Now should be empty + hasNext, err = trav.HasNext() + assert.Nil(t, err) + assert.False(t, hasNext) + }) +} + +func TestTraversalNextN(t *testing.T) { + makeResultSet := func(results ...*Result) ResultSet { + rs := newChannelResultSetCapacity(len(results) + 1).(*channelResultSet) + for _, r := range results { + rs.channel <- r + } + rs.channelMutex.Lock() + rs.closed = true + close(rs.channel) + rs.channelMutex.Unlock() + return rs + } + + t.Run("returns exactly n when n is less than available", func(t *testing.T) { + rs := makeResultSet(&Result{"a"}, &Result{"b"}, &Result{"c"}, &Result{"d"}) + trav := &Traversal{results: rs} + + got, err := trav.NextN(3) + assert.Nil(t, err) + assert.Equal(t, 3, len(got)) + assert.Equal(t, "a", got[0].Data) + assert.Equal(t, "b", got[1].Data) + assert.Equal(t, "c", got[2].Data) + }) + + t.Run("returns exactly n when n equals available", func(t *testing.T) { + rs := makeResultSet(&Result{"a"}, &Result{"b"}) + trav := &Traversal{results: rs} + + got, err := trav.NextN(2) + assert.Nil(t, err) + assert.Equal(t, 2, len(got)) + }) + + t.Run("returns all available when n exceeds available", func(t *testing.T) { + rs := makeResultSet(&Result{"a"}, &Result{"b"}) + trav := &Traversal{results: rs} + + got, err := trav.NextN(5) + assert.Nil(t, err) + assert.Equal(t, 2, len(got)) + assert.Equal(t, "a", got[0].Data) + assert.Equal(t, "b", got[1].Data) + }) + + t.Run("returns empty slice when n is zero", func(t *testing.T) { + rs := makeResultSet(&Result{"a"}) + trav := &Traversal{results: rs} + + got, err := trav.NextN(0) + assert.Nil(t, err) + assert.NotNil(t, got) + assert.Equal(t, 0, len(got)) + }) + + t.Run("returns empty slice when n is negative", func(t *testing.T) { + rs := makeResultSet(&Result{"a"}) + trav := &Traversal{results: rs} + + got, err := trav.NextN(-3) + assert.Nil(t, err) + assert.NotNil(t, got) + assert.Equal(t, 0, len(got)) + }) + + t.Run("returns empty slice when traversal is exhausted", func(t *testing.T) { + rs := makeResultSet() + trav := &Traversal{results: rs} + + got, err := trav.NextN(3) + assert.Nil(t, err) + assert.Equal(t, 0, len(got)) + }) + + t.Run("unrolls bulked Traverser across the batch", func(t *testing.T) { + rs := makeResultSet(&Result{&Traverser{Bulk: 3, Value: "x"}}) + trav := &Traversal{results: rs} + + got, err := trav.NextN(2) + assert.Nil(t, err) + assert.Equal(t, 2, len(got)) + assert.Equal(t, "x", got[0].Data) + assert.Equal(t, "x", got[1].Data) + }) + + t.Run("can be called repeatedly to drain in batches", func(t *testing.T) { + rs := makeResultSet(&Result{1}, &Result{2}, &Result{3}, &Result{4}, &Result{5}) + trav := &Traversal{results: rs} + + first, err := trav.NextN(2) + assert.Nil(t, err) + assert.Equal(t, 2, len(first)) + + second, err := trav.NextN(10) + assert.Nil(t, err) + assert.Equal(t, 3, len(second)) + + third, err := trav.NextN(1) + assert.Nil(t, err) + assert.Equal(t, 0, len(third)) + }) + + t.Run("propagates error from ResultSet", func(t *testing.T) { + rs := newChannelResultSetCapacity(1).(*channelResultSet) + rs.setError(assert.AnError) + rs.channelMutex.Lock() + rs.closed = true + close(rs.channel) + rs.channelMutex.Unlock() + trav := &Traversal{results: rs} + + got, err := trav.NextN(5) + assert.Equal(t, assert.AnError, err) + assert.Equal(t, 0, len(got)) + }) } + func TestTraversalNextN(t *testing.T) { + // makeClosedResultSet builds a channelResultSet that is already closed + // after the given results have been pushed onto the channel directly + // (i.e. without going through addResult, so no bulk unrolling). + makeClosedResultSet := func(results ...*Result) *channelResultSet { + rs := newChannelResultSetCapacity("test", &synchronizedMap{make(map[string]ResultSet), sync.Mutex{}}, len(results)+1).(*channelResultSet) + for _, r := range results { + rs.channel <- r + } + rs.channelMutex.Lock() + rs.closed = true + close(rs.channel) + rs.channelMutex.Unlock() + return rs + } + + t.Run("returns exactly n when n is less than available", func(t *testing.T) { + rs := makeClosedResultSet(&Result{"a"}, &Result{"b"}, &Result{"c"}, &Result{"d"}) + trav := &Traversal{results: rs} + + got, err := trav.NextN(3) + assert.Nil(t, err) + assert.Equal(t, 3, len(got)) + assert.Equal(t, "a", got[0].Data) + assert.Equal(t, "b", got[1].Data) + assert.Equal(t, "c", got[2].Data) + }) + + t.Run("returns exactly n when n equals available", func(t *testing.T) { + rs := makeClosedResultSet(&Result{"a"}, &Result{"b"}) + trav := &Traversal{results: rs} + + got, err := trav.NextN(2) + assert.Nil(t, err) + assert.Equal(t, 2, len(got)) + }) + + t.Run("returns all available when n exceeds available", func(t *testing.T) { + rs := makeClosedResultSet(&Result{"a"}, &Result{"b"}) + trav := &Traversal{results: rs} + + got, err := trav.NextN(5) + assert.Nil(t, err) + assert.Equal(t, 2, len(got)) + assert.Equal(t, "a", got[0].Data) + assert.Equal(t, "b", got[1].Data) + }) + + t.Run("returns empty slice when n is zero", func(t *testing.T) { + rs := makeClosedResultSet(&Result{"a"}) + trav := &Traversal{results: rs} + + got, err := trav.NextN(0) + assert.Nil(t, err) + assert.NotNil(t, got) + assert.Equal(t, 0, len(got)) + }) + + t.Run("returns empty slice when n is negative", func(t *testing.T) { + rs := makeClosedResultSet(&Result{"a"}) + trav := &Traversal{results: rs} + + got, err := trav.NextN(-3) + assert.Nil(t, err) + assert.NotNil(t, got) + assert.Equal(t, 0, len(got)) + }) + + t.Run("returns empty slice when traversal is exhausted", func(t *testing.T) { + rs := makeClosedResultSet() + trav := &Traversal{results: rs} + + got, err := trav.NextN(3) + assert.Nil(t, err) + assert.Equal(t, 0, len(got)) + }) + + t.Run("unrolls bulked Traverser across the batch", func(t *testing.T) { + // addResult unrolls bulks when the incoming Result wraps a slice of *Traverser. + rs := newChannelResultSetCapacity("test-bulk", &synchronizedMap{make(map[string]ResultSet), sync.Mutex{}}, 8).(*channelResultSet) + rs.addResult(&Result{[]interface{}{&Traverser{bulk: 3, value: "x"}}}) + rs.channelMutex.Lock() + rs.closed = true + close(rs.channel) + rs.channelMutex.Unlock() + trav := &Traversal{results: rs} + + got, err := trav.NextN(2) + assert.Nil(t, err) + assert.Equal(t, 2, len(got)) + assert.Equal(t, "x", got[0].Data) + assert.Equal(t, "x", got[1].Data) + }) + + t.Run("can be called repeatedly to drain in batches", func(t *testing.T) { + rs := makeClosedResultSet(&Result{1}, &Result{2}, &Result{3}, &Result{4}, &Result{5}) + trav := &Traversal{results: rs} + + first, err := trav.NextN(2) + assert.Nil(t, err) + assert.Equal(t, 2, len(first)) + + second, err := trav.NextN(10) + assert.Nil(t, err) + assert.Equal(t, 3, len(second)) + + third, err := trav.NextN(1) + assert.Nil(t, err) + assert.Equal(t, 0, len(third)) + }) + + t.Run("propagates error from ResultSet", func(t *testing.T) { + rs := newChannelResultSetCapacity("test-err", &synchronizedMap{make(map[string]ResultSet), sync.Mutex{}}, 1).(*channelResultSet) + rs.setError(assert.AnError) + rs.channelMutex.Lock() + rs.closed = true + close(rs.channel) + rs.channelMutex.Unlock() + trav := &Traversal{results: rs} + + got, err := trav.NextN(5) + assert.Equal(t, assert.AnError, err) + assert.Equal(t, 0, len(got)) + }) + } + func newWithOptionsConnection(t *testing.T) *GraphTraversalSource { // No authentication integration test with graphs loaded and alias configured server testNoAuthWithAliasUrl := getEnvOrDefaultString("GREMLIN_SERVER_URL", noAuthUrl)
