This is an automated email from the ASF dual-hosted git repository.

hubcio pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 645a4e9d4 fix(go): add bounds checking to DeserializeStreams for 
payloads > 64KB (#3165)
645a4e9d4 is described below

commit 645a4e9d4143d43bd2ddca023b50172eb806ef1a
Author: Atharva Lade <[email protected]>
AuthorDate: Mon May 18 16:08:41 2026 -0500

    fix(go): add bounds checking to DeserializeStreams for payloads > 64KB 
(#3165)
---
 .../binary_response_deserializer.go                |  39 +++--
 .../binary_response_deserializer_test.go           | 160 +++++++++++++++++++++
 foreign/go/client/tcp/tcp_stream_management.go     |   2 +-
 3 files changed, 189 insertions(+), 12 deletions(-)

diff --git a/foreign/go/binary_serialization/binary_response_deserializer.go 
b/foreign/go/binary_serialization/binary_response_deserializer.go
index 3de147010..fa2bb938b 100644
--- a/foreign/go/binary_serialization/binary_response_deserializer.go
+++ b/foreign/go/binary_serialization/binary_response_deserializer.go
@@ -53,7 +53,10 @@ func DeserializeOffset(payload []byte) 
*iggcon.ConsumerOffsetInfo {
 }
 
 func DeserializeStream(payload []byte) (*iggcon.StreamDetails, error) {
-       stream, pos := DeserializeToStream(payload, 0)
+       stream, pos, err := DeserializeToStream(payload, 0)
+       if err != nil {
+               return nil, err
+       }
        topics := make([]iggcon.Topic, 0)
        for pos < len(payload) {
                topic, readBytes, err := DeserializeToTopic(payload, pos)
@@ -74,22 +77,32 @@ func DeserializeStream(payload []byte) 
(*iggcon.StreamDetails, error) {
        }, nil
 }
 
-func DeserializeStreams(payload []byte) []iggcon.Stream {
+func DeserializeStreams(payload []byte) ([]iggcon.Stream, error) {
        streams := make([]iggcon.Stream, 0)
        position := 0
 
-       //TODO there's a deserialization bug, investigate this
-       //it occurs only with payload greater than 2 pow 16
        for position < len(payload) {
-               stream, readBytes := DeserializeToStream(payload, position)
+               stream, readBytes, err := DeserializeToStream(payload, position)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to deserialize stream at 
offset %d: %w", position, err)
+               }
                streams = append(streams, stream)
                position += readBytes
        }
 
-       return streams
+       return streams, nil
 }
 
-func DeserializeToStream(payload []byte, position int) (iggcon.Stream, int) {
+const streamFixedSize = 4 + 8 + 4 + 8 + 8 + 1 // 33 bytes: id + created_at + 
topics_count + size_bytes + messages_count + name_len
+
+func DeserializeToStream(payload []byte, position int) (iggcon.Stream, int, 
error) {
+       remaining := len(payload) - position
+       if remaining < streamFixedSize {
+               return iggcon.Stream{}, 0, fmt.Errorf(
+                       "not enough data to read stream header: need %d bytes, 
got %d",
+                       streamFixedSize, remaining)
+       }
+
        id := binary.LittleEndian.Uint32(payload[position : position+4])
        createdAt := binary.LittleEndian.Uint64(payload[position+4 : 
position+12])
        topicsCount := binary.LittleEndian.Uint32(payload[position+12 : 
position+16])
@@ -97,10 +110,14 @@ func DeserializeToStream(payload []byte, position int) 
(iggcon.Stream, int) {
        messagesCount := binary.LittleEndian.Uint64(payload[position+24 : 
position+32])
        nameLength := int(payload[position+32])
 
-       nameBytes := payload[position+33 : position+33+nameLength]
-       name := string(nameBytes)
+       totalSize := streamFixedSize + nameLength
+       if remaining < totalSize {
+               return iggcon.Stream{}, 0, fmt.Errorf(
+                       "not enough data to read stream name: need %d bytes, 
got %d",
+                       totalSize, remaining)
+       }
 
-       readBytes := 4 + 8 + 4 + 8 + 8 + 1 + nameLength
+       name := string(payload[position+33 : position+33+nameLength])
 
        return iggcon.Stream{
                Id:            id,
@@ -109,7 +126,7 @@ func DeserializeToStream(payload []byte, position int) 
(iggcon.Stream, int) {
                SizeBytes:     sizeBytes,
                MessagesCount: messagesCount,
                CreatedAt:     createdAt,
-       }, readBytes
+       }, totalSize, nil
 }
 
 func DeserializeFetchMessagesResponse(payload []byte, compression 
iggcon.IggyMessageCompression) (*iggcon.PolledMessage, error) {
diff --git 
a/foreign/go/binary_serialization/binary_response_deserializer_test.go 
b/foreign/go/binary_serialization/binary_response_deserializer_test.go
index 1b4c0c023..9165c9946 100644
--- a/foreign/go/binary_serialization/binary_response_deserializer_test.go
+++ b/foreign/go/binary_serialization/binary_response_deserializer_test.go
@@ -19,6 +19,7 @@ package binaryserialization
 
 import (
        "encoding/binary"
+       "fmt"
        "strings"
        "testing"
 
@@ -86,3 +87,162 @@ func TestDeserializeFetchMessages_EmptyPayload(t 
*testing.T) {
                t.Fatalf("expected 0 messages, got %d", len(result.Messages))
        }
 }
+
+func encodeStream(id uint32, createdAt uint64, topicsCount uint32, sizeBytes, 
messagesCount uint64, name string) []byte {
+       nameBytes := []byte(name)
+       buf := make([]byte, streamFixedSize+len(nameBytes))
+       binary.LittleEndian.PutUint32(buf[0:4], id)
+       binary.LittleEndian.PutUint64(buf[4:12], createdAt)
+       binary.LittleEndian.PutUint32(buf[12:16], topicsCount)
+       binary.LittleEndian.PutUint64(buf[16:24], sizeBytes)
+       binary.LittleEndian.PutUint64(buf[24:32], messagesCount)
+       buf[32] = byte(len(nameBytes))
+       copy(buf[33:], nameBytes)
+       return buf
+}
+
+func assertStream(t *testing.T, label string, got iggcon.Stream, wantId 
uint32, wantCreatedAt uint64, wantTopicsCount uint32, wantSizeBytes, 
wantMessagesCount uint64, wantName string) {
+       t.Helper()
+       if got.Id != wantId {
+               t.Errorf("%s.Id = %d, want %d", label, got.Id, wantId)
+       }
+       if got.CreatedAt != wantCreatedAt {
+               t.Errorf("%s.CreatedAt = %d, want %d", label, got.CreatedAt, 
wantCreatedAt)
+       }
+       if got.TopicsCount != wantTopicsCount {
+               t.Errorf("%s.TopicsCount = %d, want %d", label, 
got.TopicsCount, wantTopicsCount)
+       }
+       if got.SizeBytes != wantSizeBytes {
+               t.Errorf("%s.SizeBytes = %d, want %d", label, got.SizeBytes, 
wantSizeBytes)
+       }
+       if got.MessagesCount != wantMessagesCount {
+               t.Errorf("%s.MessagesCount = %d, want %d", label, 
got.MessagesCount, wantMessagesCount)
+       }
+       if got.Name != wantName {
+               t.Errorf("%s.Name = %q, want %q", label, got.Name, wantName)
+       }
+}
+
+func TestDeserializeToStream_SingleStream(t *testing.T) {
+       payload := encodeStream(42, 1_710_000_000, 5, 2048, 100, "my-stream")
+
+       stream, readBytes, err := DeserializeToStream(payload, 0)
+       if err != nil {
+               t.Fatalf("unexpected error: %v", err)
+       }
+       if readBytes != len(payload) {
+               t.Fatalf("readBytes = %d, want %d", readBytes, len(payload))
+       }
+       assertStream(t, "stream", stream, 42, 1_710_000_000, 5, 2048, 100, 
"my-stream")
+}
+
+func TestDeserializeToStream_TruncatedHeader(t *testing.T) {
+       payload := make([]byte, streamFixedSize-1)
+       _, _, err := DeserializeToStream(payload, 0)
+       if err == nil {
+               t.Fatal("expected error for truncated header, got nil")
+       }
+}
+
+func TestDeserializeToStream_TruncatedName(t *testing.T) {
+       buf := make([]byte, streamFixedSize)
+       buf[32] = 10
+       _, _, err := DeserializeToStream(buf, 0)
+       if err == nil {
+               t.Fatal("expected error for truncated name, got nil")
+       }
+}
+
+func TestDeserializeStreams_Empty(t *testing.T) {
+       streams, err := DeserializeStreams([]byte{})
+       if err != nil {
+               t.Fatalf("unexpected error: %v", err)
+       }
+       if len(streams) != 0 {
+               t.Fatalf("expected 0 streams, got %d", len(streams))
+       }
+}
+
+func TestDeserializeStreams_MultipleStreams(t *testing.T) {
+       var payload []byte
+       payload = append(payload, encodeStream(1, 100, 2, 512, 50, 
"stream-one")...)
+       payload = append(payload, encodeStream(2, 200, 0, 0, 0, "s2")...)
+       payload = append(payload, encodeStream(3, 300, 1, 1024, 10, "third")...)
+
+       streams, err := DeserializeStreams(payload)
+       if err != nil {
+               t.Fatalf("unexpected error: %v", err)
+       }
+       if len(streams) != 3 {
+               t.Fatalf("expected 3 streams, got %d", len(streams))
+       }
+
+       assertStream(t, "stream[0]", streams[0], 1, 100, 2, 512, 50, 
"stream-one")
+       assertStream(t, "stream[1]", streams[1], 2, 200, 0, 0, 0, "s2")
+       assertStream(t, "stream[2]", streams[2], 3, 300, 1, 1024, 10, "third")
+}
+
+func TestDeserializeStreams_CorruptedPayload(t *testing.T) {
+       good := encodeStream(1, 100, 0, 0, 0, "ok")
+       truncated := make([]byte, streamFixedSize-5)
+       payload := append(good, truncated...)
+
+       _, err := DeserializeStreams(payload)
+       if err == nil {
+               t.Fatal("expected error for corrupted payload, got nil")
+       }
+}
+
+// Regression test for issue #3130: payloads > 64KB produced corrupted
+// stream lists because no bounds checking was performed.
+func TestDeserializeStreams_LargePayloadOver64KB(t *testing.T) {
+       const targetSize = 70_000
+       var payload []byte
+       var id uint32
+
+       for len(payload) < targetSize {
+               id++
+               name := fmt.Sprintf("stream-with-a-longer-name-for-padding-%d", 
id)
+               payload = append(payload, encodeStream(id, uint64(id)*1000, 
id%10, uint64(id)*512, uint64(id)*5, name)...)
+       }
+
+       if len(payload) <= 1<<16 {
+               t.Fatalf("payload size %d is not > 64KB; increase stream count 
or name length", len(payload))
+       }
+
+       streams, err := DeserializeStreams(payload)
+       if err != nil {
+               t.Fatalf("unexpected error deserializing %d-byte payload: %v", 
len(payload), err)
+       }
+
+       if uint32(len(streams)) != id {
+               t.Fatalf("expected %d streams, got %d", id, len(streams))
+       }
+
+       for i, s := range streams {
+               expectedId := uint32(i + 1)
+               expectedName := 
fmt.Sprintf("stream-with-a-longer-name-for-padding-%d", expectedId)
+               assertStream(t, fmt.Sprintf("stream[%d]", i), s,
+                       expectedId, uint64(expectedId)*1000, expectedId%10,
+                       uint64(expectedId)*512, uint64(expectedId)*5, 
expectedName)
+       }
+}
+
+func TestDeserializeStreams_MaxLengthName(t *testing.T) {
+       name := make([]byte, 255)
+       for i := range name {
+               name[i] = 'a' + byte(i%26)
+       }
+       payload := encodeStream(1, 999, 3, 4096, 200, string(name))
+
+       streams, err := DeserializeStreams(payload)
+       if err != nil {
+               t.Fatalf("unexpected error: %v", err)
+       }
+       if len(streams) != 1 {
+               t.Fatalf("expected 1 stream, got %d", len(streams))
+       }
+       if streams[0].Name != string(name) {
+               t.Errorf("Name length = %d, want 255", len(streams[0].Name))
+       }
+}
diff --git a/foreign/go/client/tcp/tcp_stream_management.go 
b/foreign/go/client/tcp/tcp_stream_management.go
index 9fbba5b4c..932c6ac4e 100644
--- a/foreign/go/client/tcp/tcp_stream_management.go
+++ b/foreign/go/client/tcp/tcp_stream_management.go
@@ -30,7 +30,7 @@ func (c *IggyTcpClient) GetStreams() ([]iggcon.Stream, error) 
{
                return nil, err
        }
 
-       return binaryserialization.DeserializeStreams(buffer), nil
+       return binaryserialization.DeserializeStreams(buffer)
 }
 
 func (c *IggyTcpClient) GetStream(streamId iggcon.Identifier) 
(*iggcon.StreamDetails, error) {

Reply via email to