This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push: new 1381f0af golang: add SetRequestTimeout implements producer/simpleConsumer/pushConsumer (#1016) 1381f0af is described below commit 1381f0af882c9725b98f7f47b863c623725d6f42 Author: guyinyou <36399867+guyin...@users.noreply.github.com> AuthorDate: Thu Jun 12 19:44:03 2025 +0800 golang: add SetRequestTimeout implements producer/simpleConsumer/pushConsumer (#1016) Co-authored-by: guyinyou <guyinyou....@alibaba-inc.com> --- golang/client.go | 1 + golang/producer.go | 5 +++++ golang/push_consumer.go | 5 +++++ golang/simple_consumer.go | 5 +++++ 4 files changed, 16 insertions(+) diff --git a/golang/client.go b/golang/client.go index 2d92b161..eb7bed4f 100644 --- a/golang/client.go +++ b/golang/client.go @@ -46,6 +46,7 @@ type Client interface { type isClient interface { isClient() + SetRequestTimeout(timeout time.Duration) wrapHeartbeatRequest() *v2.HeartbeatRequest onRecoverOrphanedTransactionCommand(endpoints *v2.Endpoints, command *v2.RecoverOrphanedTransactionCommand) error onVerifyMessageCommand(endpoints *v2.Endpoints, command *v2.VerifyMessageCommand) error diff --git a/golang/producer.go b/golang/producer.go index 5f0a355d..58d4a226 100644 --- a/golang/producer.go +++ b/golang/producer.go @@ -455,3 +455,8 @@ func (p *defaultProducer) onRecoverOrphanedTransactionCommand(endpoints *v2.Endp func (p *defaultProducer) onVerifyMessageCommand(endpoints *v2.Endpoints, command *v2.VerifyMessageCommand) error { return nil } + +func (p *defaultProducer) SetRequestTimeout(timeout time.Duration) { + p.cli.opts.timeout = timeout + p.pSetting.requestTimeout = p.cli.opts.timeout +} diff --git a/golang/push_consumer.go b/golang/push_consumer.go index f0e9f13d..75a9c26e 100644 --- a/golang/push_consumer.go +++ b/golang/push_consumer.go @@ -70,6 +70,11 @@ type defaultPushConsumer struct { consumptionErrorQuantity atomic.Int64 } +func (pc *defaultPushConsumer) SetRequestTimeout(timeout time.Duration) { + pc.cli.opts.timeout = timeout + pc.pcSettings.requestTimeout = pc.cli.opts.timeout +} + func (pc *defaultPushConsumer) isOn() bool { return pc.cli.on.Load() } diff --git a/golang/simple_consumer.go b/golang/simple_consumer.go index abb41936..4db351e9 100644 --- a/golang/simple_consumer.go +++ b/golang/simple_consumer.go @@ -61,6 +61,11 @@ type defaultSimpleConsumer struct { subTopicRouteDataResultCache sync.Map } +func (sc *defaultSimpleConsumer) SetRequestTimeout(timeout time.Duration) { + sc.cli.opts.timeout = timeout + sc.scSettings.requestTimeout = sc.cli.opts.timeout +} + func (sc *defaultSimpleConsumer) isOn() bool { return sc.cli.on.Load() }