This is an automated email from the ASF dual-hosted git repository.

yuzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new c9e197c  [ISSUE #950] support tls connection (#951)
c9e197c is described below

commit c9e197c3af45f846ba528cbc05f2228039be5f3e
Author: yuz10 <845238...@qq.com>
AuthorDate: Mon Nov 6 10:19:16 2023 +0800

    [ISSUE #950] support tls connection (#951)
    
    * [ISSUE #950] support tls connection
    
    * add tls to admin options
    
    * support go 1.13
---
 admin/admin.go                   |  6 ++++
 consumer/option.go               |  6 ++++
 examples/consumer/tls/main.go    | 59 ++++++++++++++++++++++++++++++++++++++
 examples/producer/tls/main.go    | 62 ++++++++++++++++++++++++++++++++++++++++
 internal/remote/remote_client.go |  1 +
 internal/remote/tcp_conn.go      | 13 +++++++--
 producer/option.go               |  6 ++++
 7 files changed, 151 insertions(+), 2 deletions(-)

diff --git a/admin/admin.go b/admin/admin.go
index 487c8b4..62175af 100644
--- a/admin/admin.go
+++ b/admin/admin.go
@@ -77,6 +77,12 @@ func WithNamespace(namespace string) AdminOption {
        }
 }
 
+func WithTls(useTls bool) AdminOption {
+       return func(options *adminOptions) {
+               options.ClientOptions.RemotingClientConfig.UseTls = useTls
+       }
+}
+
 type admin struct {
        cli internal.RMQClient
 
diff --git a/consumer/option.go b/consumer/option.go
index ac7dd93..24acf7c 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -381,3 +381,9 @@ func WithLimiter(limiter Limiter) Option {
                opts.Limiter = limiter
        }
 }
+
+func WithTls(useTls bool) Option {
+       return func(opts *consumerOptions) {
+               opts.ClientOptions.RemotingClientConfig.UseTls = useTls
+       }
+}
diff --git a/examples/consumer/tls/main.go b/examples/consumer/tls/main.go
new file mode 100644
index 0000000..248c837
--- /dev/null
+++ b/examples/consumer/tls/main.go
@@ -0,0 +1,59 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+       "context"
+       "fmt"
+       "os"
+       "time"
+
+       "github.com/apache/rocketmq-client-go/v2"
+       "github.com/apache/rocketmq-client-go/v2/consumer"
+       "github.com/apache/rocketmq-client-go/v2/primitive"
+)
+
+func main() {
+       c, _ := rocketmq.NewPushConsumer(
+               consumer.WithGroupName("testGroup"),
+               
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+               consumer.WithTls(true),
+       )
+       err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx 
context.Context,
+               msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
+               for i := range msgs {
+                       fmt.Printf("subscribe callback: %v \n", msgs[i])
+               }
+
+               return consumer.ConsumeSuccess, nil
+       })
+       if err != nil {
+               fmt.Println(err.Error())
+       }
+       // Note: start after subscribe
+       err = c.Start()
+       if err != nil {
+               fmt.Println(err.Error())
+               os.Exit(-1)
+       }
+       time.Sleep(time.Hour)
+       err = c.Shutdown()
+       if err != nil {
+               fmt.Printf("shutdown Consumer error: %s", err.Error())
+       }
+}
diff --git a/examples/producer/tls/main.go b/examples/producer/tls/main.go
new file mode 100644
index 0000000..c926c05
--- /dev/null
+++ b/examples/producer/tls/main.go
@@ -0,0 +1,62 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+       "context"
+       "fmt"
+       "os"
+       "strconv"
+
+       "github.com/apache/rocketmq-client-go/v2"
+       "github.com/apache/rocketmq-client-go/v2/primitive"
+       "github.com/apache/rocketmq-client-go/v2/producer"
+)
+
+// Package main implements a simple producer to send message.
+func main() {
+       p, _ := rocketmq.NewProducer(
+               
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+               producer.WithRetry(2),
+               producer.WithTls(true),
+       )
+       err := p.Start()
+       if err != nil {
+               fmt.Printf("start producer error: %s", err.Error())
+               os.Exit(1)
+       }
+       topic := "test"
+
+       for i := 0; i < 10; i++ {
+               msg := &primitive.Message{
+                       Topic: topic,
+                       Body:  []byte("Hello RocketMQ Go Client! " + 
strconv.Itoa(i)),
+               }
+               res, err := p.SendSync(context.Background(), msg)
+
+               if err != nil {
+                       fmt.Printf("send message error: %s\n", err)
+               } else {
+                       fmt.Printf("send message success: result=%s\n", 
res.String())
+               }
+       }
+       err = p.Shutdown()
+       if err != nil {
+               fmt.Printf("shutdown producer error: %s", err.Error())
+       }
+}
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index 4013689..45dfbbf 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -38,6 +38,7 @@ type TcpOption struct {
        ConnectionTimeout time.Duration
        ReadTimeout       time.Duration
        WriteTimeout      time.Duration
+       UseTls            bool
 }
 
 //go:generate mockgen -source remote_client.go -destination 
mock_remote_client.go -self_package 
github.com/apache/rocketmq-client-go/v2/internal/remote  --package remote 
RemotingClient
diff --git a/internal/remote/tcp_conn.go b/internal/remote/tcp_conn.go
index 93ed837..3b7d164 100644
--- a/internal/remote/tcp_conn.go
+++ b/internal/remote/tcp_conn.go
@@ -18,6 +18,7 @@ package remote
 
 import (
        "context"
+       "crypto/tls"
        "net"
        "sync"
        "time"
@@ -34,11 +35,19 @@ type tcpConnWrapper struct {
 
 func initConn(ctx context.Context, addr string, config *RemotingClientConfig) 
(*tcpConnWrapper, error) {
        var d net.Dialer
-
        d.KeepAlive = config.KeepAliveDuration
        d.Deadline = time.Now().Add(config.ConnectionTimeout)
 
-       conn, err := d.DialContext(ctx, "tcp", addr)
+       var conn net.Conn
+       var err error
+       if config.UseTls {
+               conn, err = tls.DialWithDialer(&d, "tcp", addr, &tls.Config{
+                       InsecureSkipVerify: true,
+               })
+       } else {
+               conn, err = d.DialContext(ctx, "tcp", addr)
+       }
+
        if err != nil {
                return nil, err
        }
diff --git a/producer/option.go b/producer/option.go
index c3a0dc4..6e43cc2 100644
--- a/producer/option.go
+++ b/producer/option.go
@@ -178,3 +178,9 @@ func WithCompressLevel(level int) Option {
                opts.CompressLevel = level
        }
 }
+
+func WithTls(useTls bool) Option {
+       return func(opts *producerOptions) {
+               opts.ClientOptions.RemotingClientConfig.UseTls = useTls
+       }
+}

Reply via email to