This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push: new 322e9a2d golang: support zlib decode (#957) 322e9a2d is described below commit 322e9a2d0b3f6b4b74d4eebb330c5e9238d93b36 Author: guyinyou <36399867+guyin...@users.noreply.github.com> AuthorDate: Mon Mar 10 11:49:56 2025 +0800 golang: support zlib decode (#957) Co-authored-by: guyinyou <guyinyou....@alibaba-inc.com> --- golang/message.go | 2 +- golang/pkg/utils/utils.go | 21 +++++++++++++++++++++ golang/pkg/utils/utils_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 64 insertions(+), 1 deletion(-) diff --git a/golang/message.go b/golang/message.go index 9d1007f1..80675424 100644 --- a/golang/message.go +++ b/golang/message.go @@ -231,7 +231,7 @@ func fromProtobuf_MessageView2(message *v2.Message, messageQueue *v2.MessageQueu bodyEncoding := systemProperties.GetBodyEncoding() switch bodyEncoding { case v2.Encoding_GZIP: - unCompressBody, err := utils.GZIPDecode(message.GetBody()) + unCompressBody, err := utils.AutoDecode(message.GetBody()) if err != nil { sugarBaseLogger.Errorf("failed to uncompress message body, topic=%s, messageId=%s, err=%w", mv.topic, mv.messageId, err) corrupted = true diff --git a/golang/pkg/utils/utils.go b/golang/pkg/utils/utils.go index db145e4d..a8343ef5 100644 --- a/golang/pkg/utils/utils.go +++ b/golang/pkg/utils/utils.go @@ -20,6 +20,7 @@ package utils import ( "bytes" "compress/gzip" + "compress/zlib" "context" "encoding/hex" "fmt" @@ -140,6 +141,26 @@ func MatchMessageType(mq *v2.MessageQueue, messageType v2.MessageType) bool { return false } +func AutoDecode(in []byte) ([]byte, error) { + if len(in) < 2 { + return in, fmt.Errorf("unknown format") + } + if in[0] == 0x1f && in[1] == 0x8b { + return GZIPDecode(in) + } + return ZlibDecode(in) +} + +func ZlibDecode(in []byte) ([]byte, error) { + reader, err := zlib.NewReader(bytes.NewReader(in)) + if err != nil { + var out []byte + return out, err + } + defer reader.Close() + return ioutil.ReadAll(reader) +} + func GZIPDecode(in []byte) ([]byte, error) { reader, err := gzip.NewReader(bytes.NewReader(in)) if err != nil { diff --git a/golang/pkg/utils/utils_test.go b/golang/pkg/utils/utils_test.go index bf43349a..514f3746 100644 --- a/golang/pkg/utils/utils_test.go +++ b/golang/pkg/utils/utils_test.go @@ -19,6 +19,7 @@ package utils import ( "compress/gzip" + "compress/zlib" "testing" v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2" @@ -111,6 +112,33 @@ func TestMatchMessageType(t *testing.T) { } } +func TestAutoDecode(t *testing.T) { + _, err := AutoDecode([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) + if err != zlib.ErrHeader { + t.Error() + } + _, err = AutoDecode([]byte{0}) + if err == nil { + t.Error() + } + // gzip + bytes, err := AutoDecode([]byte{31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 42, 202, 79, 206, 78, 45, 201, 45, 212, 77, 206, 201, 76, 205, 43, 209, 77, 207, 7, 0, 0, 0, 255, 255, 1, 0, 0, 255, 255, 97, 36, 132, 114, 18, 0, 0, 0}) + if err != nil { + t.Error() + } + if string(bytes) != "rocketmq-client-go" { + t.Error() + } + // zlib + bytes, err = AutoDecode([]byte{120, 156, 42, 202, 79, 206, 78, 45, 201, 45, 212, 77, 206, 201, 76, 205, 43, 209, 77, 207, 7, 4, 0, 0, 255, 255, 68, 223, 7, 22}) + if err != nil { + t.Error() + } + if string(bytes) != "rocketmq-client-go" { + t.Error() + } +} + func TestGZIPDecode(t *testing.T) { _, err := GZIPDecode([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) if err != gzip.ErrHeader { @@ -125,6 +153,20 @@ func TestGZIPDecode(t *testing.T) { } } +func TestZlibDecode(t *testing.T) { + _, err := ZlibDecode([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) + if err != zlib.ErrHeader { + t.Error() + } + bytes, err := ZlibDecode([]byte{120, 156, 42, 202, 79, 206, 78, 45, 201, 45, 212, 77, 206, 201, 76, 205, 43, 209, 77, 207, 7, 4, 0, 0, 255, 255, 68, 223, 7, 22}) + if err != nil { + t.Error() + } + if string(bytes) != "rocketmq-client-go" { + t.Error() + } +} + func TestSelectAnAddress(t *testing.T) { if SelectAnAddress(nil) != nil { t.Error()