This is an automated email from the ASF dual-hosted git repository.
spetz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 87e22a1cc feat(go): add delete segments support (#3191)
87e22a1cc is described below
commit 87e22a1cc532f401e910ea728164f8a153520595
Author: matanper <[email protected]>
AuthorDate: Fri May 1 15:16:08 2026 +0300
feat(go): add delete segments support (#3191)
Expose the delete segments command in the Go SDK so clients can manage
partition segment cleanup over TCP. Closes #3190
---
bdd/go/tests/tcp_test/segments_feature_delete.go | 88 ++++++++++++++++++++++++
foreign/go/client/tcp/tcp_segment_management.go | 38 ++++++++++
foreign/go/contracts/client.go | 9 +++
foreign/go/internal/command/code.go | 1 +
foreign/go/internal/command/segment.go | 45 ++++++++++++
foreign/go/internal/command/segment_test.go | 64 +++++++++++++++++
6 files changed, 245 insertions(+)
diff --git a/bdd/go/tests/tcp_test/segments_feature_delete.go
b/bdd/go/tests/tcp_test/segments_feature_delete.go
new file mode 100644
index 000000000..011ac9e61
--- /dev/null
+++ b/bdd/go/tests/tcp_test/segments_feature_delete.go
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tcp_test
+
+import (
+ iggcon "github.com/apache/iggy/foreign/go/contracts"
+ ierror "github.com/apache/iggy/foreign/go/errors"
+ "github.com/onsi/ginkgo/v2"
+)
+
+var _ = ginkgo.Describe("DELETE SEGMENTS:", func() {
+ prefix := "DeleteSegments"
+ ginkgo.When("User is logged in", func() {
+ ginkgo.Context("and tries to delete zero segments for existing
topic partition", func() {
+ client := createAuthorizedConnection()
+ streamId, _ := successfullyCreateStream(prefix, client)
+ defer deleteStreamAfterTests(streamId, client)
+ topicId, _ := successfullyCreateTopic(streamId, client)
+
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
+ err := client.DeleteSegments(
+ streamIdentifier,
+ topicIdentifier,
+ 0,
+ 0,
+ )
+
+ itShouldNotReturnError(err)
+ })
+
+ ginkgo.Context("and tries to delete segments for a non existing
stream", func() {
+ client := createAuthorizedConnection()
+ err := client.DeleteSegments(
+ randomU32Identifier(),
+ randomU32Identifier(),
+ 0,
+ 1,
+ )
+
+ itShouldReturnSpecificError(err,
ierror.ErrStreamIdNotFound)
+ })
+
+ ginkgo.Context("and tries to delete segments for a non existing
topic", func() {
+ client := createAuthorizedConnection()
+ streamId, _ := successfullyCreateStream(prefix, client)
+ defer deleteStreamAfterTests(streamId, client)
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ err := client.DeleteSegments(
+ streamIdentifier,
+ randomU32Identifier(),
+ 0,
+ 1,
+ )
+
+ itShouldReturnSpecificError(err,
ierror.ErrTopicIdNotFound)
+ })
+ })
+
+ ginkgo.When("User is not logged in", func() {
+ ginkgo.Context("and tries to delete segments", func() {
+ client := createClient()
+ err := client.DeleteSegments(
+ randomU32Identifier(),
+ randomU32Identifier(),
+ 0,
+ 1,
+ )
+
+ itShouldReturnUnauthenticatedError(err)
+ })
+ })
+})
diff --git a/foreign/go/client/tcp/tcp_segment_management.go
b/foreign/go/client/tcp/tcp_segment_management.go
new file mode 100644
index 000000000..b4b4e7996
--- /dev/null
+++ b/foreign/go/client/tcp/tcp_segment_management.go
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tcp
+
+import (
+ iggcon "github.com/apache/iggy/foreign/go/contracts"
+ "github.com/apache/iggy/foreign/go/internal/command"
+)
+
+func (c *IggyTcpClient) DeleteSegments(
+ streamId iggcon.Identifier,
+ topicId iggcon.Identifier,
+ partitionId uint32,
+ segmentsCount uint32,
+) error {
+ _, err := c.do(&command.DeleteSegments{
+ StreamId: streamId,
+ TopicId: topicId,
+ PartitionId: partitionId,
+ SegmentsCount: segmentsCount,
+ })
+ return err
+}
diff --git a/foreign/go/contracts/client.go b/foreign/go/contracts/client.go
index f3b6dac65..40fb82d46 100644
--- a/foreign/go/contracts/client.go
+++ b/foreign/go/contracts/client.go
@@ -195,6 +195,15 @@ type Client interface {
partitionsCount uint32,
) error
+ // DeleteSegments deletes N segments from a topic partition by stream
and topic unique IDs or names.
+ // Authentication is required, and the permission to manage the
partitions.
+ DeleteSegments(
+ streamId Identifier,
+ topicId Identifier,
+ partitionId uint32,
+ segmentsCount uint32,
+ ) error
+
// GetUser get the info about a specific user by unique ID or username.
// Authentication is required, and the permission to read the users,
unless the provided user ID is the same as the authenticated user.
GetUser(identifier Identifier) (*UserInfoDetails, error)
diff --git a/foreign/go/internal/command/code.go
b/foreign/go/internal/command/code.go
index e4775eb2f..644abf2dd 100644
--- a/foreign/go/internal/command/code.go
+++ b/foreign/go/internal/command/code.go
@@ -57,6 +57,7 @@ const (
UpdateTopicCode Code = 304
CreatePartitionsCode Code = 402
DeletePartitionsCode Code = 403
+ DeleteSegmentsCode Code = 503
GetGroupCode Code = 600
GetGroupsCode Code = 601
CreateGroupCode Code = 602
diff --git a/foreign/go/internal/command/segment.go
b/foreign/go/internal/command/segment.go
new file mode 100644
index 000000000..9aaac24f1
--- /dev/null
+++ b/foreign/go/internal/command/segment.go
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package command
+
+import (
+ "encoding/binary"
+
+ iggcon "github.com/apache/iggy/foreign/go/contracts"
+)
+
+type DeleteSegments struct {
+ StreamId iggcon.Identifier `json:"streamId"`
+ TopicId iggcon.Identifier `json:"topicId"`
+ PartitionId uint32 `json:"partitionId"`
+ SegmentsCount uint32 `json:"segmentsCount"`
+}
+
+func (d *DeleteSegments) Code() Code {
+ return DeleteSegmentsCode
+}
+
+func (d *DeleteSegments) MarshalBinary() ([]byte, error) {
+ bytes, err := iggcon.MarshalIdentifiers(d.StreamId, d.TopicId)
+ if err != nil {
+ return nil, err
+ }
+ bytes = binary.LittleEndian.AppendUint32(bytes, d.PartitionId)
+ bytes = binary.LittleEndian.AppendUint32(bytes, d.SegmentsCount)
+ return bytes, nil
+}
diff --git a/foreign/go/internal/command/segment_test.go
b/foreign/go/internal/command/segment_test.go
new file mode 100644
index 000000000..e87a95622
--- /dev/null
+++ b/foreign/go/internal/command/segment_test.go
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package command
+
+import (
+ "bytes"
+ "testing"
+
+ iggcon "github.com/apache/iggy/foreign/go/contracts"
+)
+
+func TestSerialize_DeleteSegments(t *testing.T) {
+ streamId, _ := iggcon.NewIdentifier("stream")
+ topicId, _ := iggcon.NewIdentifier(uint32(1))
+ request := DeleteSegments{
+ StreamId: streamId,
+ TopicId: topicId,
+ PartitionId: 2,
+ SegmentsCount: 3,
+ }
+
+ serialized, err := request.MarshalBinary()
+ if err != nil {
+ t.Errorf("Failed to serialize DeleteSegments: %v", err)
+ }
+
+ expected := []byte{
+ 0x02, // StreamId Kind (StringId)
+ 0x06, // StreamId Length (6)
+ 0x73, 0x74, 0x72, 0x65, 0x61, 0x6D, // StreamId Value ("stream")
+ 0x01, // TopicId Kind (NumericId)
+ 0x04, // TopicId Length (4)
+ 0x01, 0x00, 0x00, 0x00, // TopicId Value (1)
+ 0x02, 0x00, 0x00, 0x00, // PartitionId (2)
+ 0x03, 0x00, 0x00, 0x00, // SegmentsCount (3)
+ }
+
+ if !bytes.Equal(serialized, expected) {
+ t.Errorf("Test case failed. \nExpected:\t%v\nGot:\t\t%v",
expected, serialized)
+ }
+}
+
+func TestDeleteSegments_Code(t *testing.T) {
+ request := DeleteSegments{}
+
+ if request.Code() != DeleteSegmentsCode {
+ t.Errorf("Expected command code %v, got %v",
DeleteSegmentsCode, request.Code())
+ }
+}