This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 322e9a2d golang: support zlib decode (#957)
322e9a2d is described below

commit 322e9a2d0b3f6b4b74d4eebb330c5e9238d93b36
Author: guyinyou <36399867+guyin...@users.noreply.github.com>
AuthorDate: Mon Mar 10 11:49:56 2025 +0800

    golang: support zlib decode (#957)
    
    Co-authored-by: guyinyou <guyinyou....@alibaba-inc.com>
---
 golang/message.go              |  2 +-
 golang/pkg/utils/utils.go      | 21 +++++++++++++++++++++
 golang/pkg/utils/utils_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 64 insertions(+), 1 deletion(-)

diff --git a/golang/message.go b/golang/message.go
index 9d1007f1..80675424 100644
--- a/golang/message.go
+++ b/golang/message.go
@@ -231,7 +231,7 @@ func fromProtobuf_MessageView2(message *v2.Message, 
messageQueue *v2.MessageQueu
        bodyEncoding := systemProperties.GetBodyEncoding()
        switch bodyEncoding {
        case v2.Encoding_GZIP:
-               unCompressBody, err := utils.GZIPDecode(message.GetBody())
+               unCompressBody, err := utils.AutoDecode(message.GetBody())
                if err != nil {
                        sugarBaseLogger.Errorf("failed to uncompress message 
body, topic=%s, messageId=%s, err=%w", mv.topic, mv.messageId, err)
                        corrupted = true
diff --git a/golang/pkg/utils/utils.go b/golang/pkg/utils/utils.go
index db145e4d..a8343ef5 100644
--- a/golang/pkg/utils/utils.go
+++ b/golang/pkg/utils/utils.go
@@ -20,6 +20,7 @@ package utils
 import (
        "bytes"
        "compress/gzip"
+       "compress/zlib"
        "context"
        "encoding/hex"
        "fmt"
@@ -140,6 +141,26 @@ func MatchMessageType(mq *v2.MessageQueue, messageType 
v2.MessageType) bool {
        return false
 }
 
+func AutoDecode(in []byte) ([]byte, error) {
+       if len(in) < 2 {
+               return in, fmt.Errorf("unknown format")
+       }
+       if in[0] == 0x1f && in[1] == 0x8b {
+               return GZIPDecode(in)
+       }
+       return ZlibDecode(in)
+}
+
+func ZlibDecode(in []byte) ([]byte, error) {
+       reader, err := zlib.NewReader(bytes.NewReader(in))
+       if err != nil {
+               var out []byte
+               return out, err
+       }
+       defer reader.Close()
+       return ioutil.ReadAll(reader)
+}
+
 func GZIPDecode(in []byte) ([]byte, error) {
        reader, err := gzip.NewReader(bytes.NewReader(in))
        if err != nil {
diff --git a/golang/pkg/utils/utils_test.go b/golang/pkg/utils/utils_test.go
index bf43349a..514f3746 100644
--- a/golang/pkg/utils/utils_test.go
+++ b/golang/pkg/utils/utils_test.go
@@ -19,6 +19,7 @@ package utils
 
 import (
        "compress/gzip"
+       "compress/zlib"
        "testing"
 
        v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
@@ -111,6 +112,33 @@ func TestMatchMessageType(t *testing.T) {
        }
 }
 
+func TestAutoDecode(t *testing.T) {
+       _, err := AutoDecode([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
+       if err != zlib.ErrHeader {
+               t.Error()
+       }
+       _, err = AutoDecode([]byte{0})
+       if err == nil {
+               t.Error()
+       }
+       // gzip
+       bytes, err := AutoDecode([]byte{31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 42, 
202, 79, 206, 78, 45, 201, 45, 212, 77, 206, 201, 76, 205, 43, 209, 77, 207, 7, 
0, 0, 0, 255, 255, 1, 0, 0, 255, 255, 97, 36, 132, 114, 18, 0, 0, 0})
+       if err != nil {
+               t.Error()
+       }
+       if string(bytes) != "rocketmq-client-go" {
+               t.Error()
+       }
+       // zlib
+       bytes, err = AutoDecode([]byte{120, 156, 42, 202, 79, 206, 78, 45, 201, 
45, 212, 77, 206, 201, 76, 205, 43, 209, 77, 207, 7, 4, 0, 0, 255, 255, 68, 
223, 7, 22})
+       if err != nil {
+               t.Error()
+       }
+       if string(bytes) != "rocketmq-client-go" {
+               t.Error()
+       }
+}
+
 func TestGZIPDecode(t *testing.T) {
        _, err := GZIPDecode([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
        if err != gzip.ErrHeader {
@@ -125,6 +153,20 @@ func TestGZIPDecode(t *testing.T) {
        }
 }
 
+func TestZlibDecode(t *testing.T) {
+       _, err := ZlibDecode([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
+       if err != zlib.ErrHeader {
+               t.Error()
+       }
+       bytes, err := ZlibDecode([]byte{120, 156, 42, 202, 79, 206, 78, 45, 
201, 45, 212, 77, 206, 201, 76, 205, 43, 209, 77, 207, 7, 4, 0, 0, 255, 255, 
68, 223, 7, 22})
+       if err != nil {
+               t.Error()
+       }
+       if string(bytes) != "rocketmq-client-go" {
+               t.Error()
+       }
+}
+
 func TestSelectAnAddress(t *testing.T) {
        if SelectAnAddress(nil) != nil {
                t.Error()

Reply via email to