Re: [PR] fix slave acting master bug [rocketmq]
gaoyf commented on code in PR #7603: URL: https://github.com/apache/rocketmq/pull/7603#discussion_r1413285830 ## store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java: ## @@ -1140,8 +1146,8 @@ public MessageExtBrokerInner convertMessage(MessageExt msgExt, boolean needRoll) } else { msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC)); msgInner.setQueueId(Integer.parseInt(msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); -MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC); -MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID); +// MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC); Review Comment: I extracted [TimerDequeuePutMessageService](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1486)‘s some key codes, as follows: ``` while (!isStopped() && !doRes) { try { MessageExtBrokerInner msg = convert(msgExt, tr.getEnqueueTime(), needRoll(tr.getMagic())); doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic())); } catch (Throwable t) { LOGGER.info("Unknown error", t); } } ``` If method `doPut` throw exception, the logic try to execute method `convert` in a loop。 However, `MessageExtBrokerInner` and `MessageExt` hold the same Map of `properties`, [convertMessage](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1122) FYI, so the following code will return null: ``` msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC) msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID) ``` Because property `MessageConst.PROPERTY_REAL_QUEUE_ID` has been removed in the first execution. **So canot remove these properties for the retry logic.** ## store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java: ## @@ -1140,8 +1146,8 @@ public MessageExtBrokerInner convertMessage(MessageExt msgExt, boolean needRoll) } else { msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC)); msgInner.setQueueId(Integer.parseInt(msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); -MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC); -MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID); +// MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC); Review Comment: I extracted [TimerDequeuePutMessageService](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1486)‘s some key codes, as follows: ``` while (!isStopped() && !doRes) { try { MessageExtBrokerInner msg = convert(msgExt, tr.getEnqueueTime(), needRoll(tr.getMagic())); doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic())); } catch (Throwable t) { LOGGER.info("Unknown error", t); } } ``` If method `doPut` throw exception, the logic try to execute method `convert` in a loop。 However, `MessageExtBrokerInner` and `MessageExt` hold the same Map of `properties`, [convertMessage](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1122) FYI, so the following code will return null: ``` msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC) msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID) ``` Because property `MessageConst.PROPERTY_REAL_QUEUE_ID` has been removed in the first execution. **So canot remove these properties for the retry logic.** -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] fix slave acting master bug [rocketmq]
gaoyf commented on code in PR #7603: URL: https://github.com/apache/rocketmq/pull/7603#discussion_r1413285830 ## store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java: ## @@ -1140,8 +1146,8 @@ public MessageExtBrokerInner convertMessage(MessageExt msgExt, boolean needRoll) } else { msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC)); msgInner.setQueueId(Integer.parseInt(msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); -MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC); -MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID); +// MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC); Review Comment: I extracted [TimerDequeuePutMessageService](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1486)‘s some key codes, as follows: ``` while (!isStopped() && !doRes) { try { MessageExtBrokerInner msg = convert(msgExt, tr.getEnqueueTime(), needRoll(tr.getMagic())); doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic())); } catch (Throwable t) { LOGGER.info("Unknown error", t); } } ``` If method `doPut` throw exception, the logic try to execute method `convert` in a loop。 However, `MessageExtBrokerInner` and `MessageExt` hold the same Map of `properties`, [convertMessage](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1122) FYI, so the following code will return null: ``` msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC) msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID) ``` Because property `MessageConst.PROPERTY_REAL_QUEUE_ID` has been removed in the first execution. **So canot remove these properties for the retry logic.** On the other hand,`MessageExtBrokerInner`‘s will reput to commitlog,it will contains property `MessageConst.PROPERTY_REAL_TOPIC` and `MessageConst.PROPERTY_REAL_TOPIC`,because the fllowing [code](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1127): ``` msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); ``` So I don't think commenting out these codes will affect the previous functionality -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] fix slave acting master bug [rocketmq]
gaoyf commented on code in PR #7603: URL: https://github.com/apache/rocketmq/pull/7603#discussion_r1413285830 ## store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java: ## @@ -1140,8 +1146,8 @@ public MessageExtBrokerInner convertMessage(MessageExt msgExt, boolean needRoll) } else { msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC)); msgInner.setQueueId(Integer.parseInt(msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); -MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC); -MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID); +// MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC); Review Comment: I extracted [TimerDequeuePutMessageService](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1486)‘s some key codes, as follows: ``` while (!isStopped() && !doRes) { try { MessageExtBrokerInner msg = convert(msgExt, tr.getEnqueueTime(), needRoll(tr.getMagic())); doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic())); } catch (Throwable t) { LOGGER.info("Unknown error", t); } } ``` If method `doPut` throw exception, the logic try to execute method `convert` in a loop。 However, `MessageExtBrokerInner` and `MessageExt` hold the same Map of `properties`, [convertMessage](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1122) FYI, so the following code will return null: ``` msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC) msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID) ``` Because property `MessageConst.PROPERTY_REAL_QUEUE_ID` has been removed in the first execution. **So canot remove these properties for the retry logic.** On the other hand,`MessageExtBrokerInner` will be reputed to commitlog,it will contains property `MessageConst.PROPERTY_REAL_TOPIC` and `MessageConst.PROPERTY_REAL_TOPIC`,because the fllowing [code](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1127): ``` msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); ``` So I don't think commenting out these codes will affect the previous functionality -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] fix slave acting master bug [rocketmq]
gaoyf commented on code in PR #7603: URL: https://github.com/apache/rocketmq/pull/7603#discussion_r1413285830 ## store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java: ## @@ -1140,8 +1146,8 @@ public MessageExtBrokerInner convertMessage(MessageExt msgExt, boolean needRoll) } else { msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC)); msgInner.setQueueId(Integer.parseInt(msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); -MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC); -MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID); +// MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC); Review Comment: I extracted [TimerDequeuePutMessageService](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1486)‘s some key codes, as follows: ``` while (!isStopped() && !doRes) { try { MessageExtBrokerInner msg = convert(msgExt, tr.getEnqueueTime(), needRoll(tr.getMagic())); doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic())); } catch (Throwable t) { LOGGER.info("Unknown error", t); } } ``` If method `doPut` throw exception, the logic try to execute method `convert` in a loop。 However, `MessageExtBrokerInner` and `MessageExt` hold the same Map of `properties`, [convertMessage](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1122) FYI, so the following code will return null: ``` msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC) msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID) ``` Because property `MessageConst.PROPERTY_REAL_QUEUE_ID` has been removed in the first execution. **So canot remove these properties for the retry logic.** On the other hand,**the original logic**: `MessageExtBrokerInner` will be reputed to commitlog,it will contains property `MessageConst.PROPERTY_REAL_TOPIC` and `MessageConst.PROPERTY_REAL_TOPIC`,because the fllowing [code](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1127): ``` msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); ``` That is to say, when retrieving this message, it must contains these two properties,So I don't think commenting out these codes will affect the previous functionality -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] fix slave acting master bug [rocketmq]
gaoyf commented on code in PR #7603: URL: https://github.com/apache/rocketmq/pull/7603#discussion_r1413285830 ## store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java: ## @@ -1140,8 +1146,8 @@ public MessageExtBrokerInner convertMessage(MessageExt msgExt, boolean needRoll) } else { msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC)); msgInner.setQueueId(Integer.parseInt(msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); -MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC); -MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID); +// MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC); Review Comment: I extracted [TimerDequeuePutMessageService](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1486)‘s some key codes, as follows: ``` while (!isStopped() && !doRes) { try { MessageExtBrokerInner msg = convert(msgExt, tr.getEnqueueTime(), needRoll(tr.getMagic())); doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic())); } catch (Throwable t) { LOGGER.info("Unknown error", t); } } ``` If method `doPut` throw exception, the logic try to execute method `convert` in a loop。 However, `MessageExtBrokerInner` and `MessageExt` hold the same Map of `properties`, [convertMessage](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1122) FYI, so the following code will return null: ``` msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC) msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID) ``` Because property `MessageConst.PROPERTY_REAL_QUEUE_ID` has been removed in the first execution. **So canot remove these properties for the retry logic.** On the other hand,**the original logic**: `MessageExtBrokerInner` will be reputed to commitlog,it will contains property `MessageConst.PROPERTY_REAL_TOPIC` and `MessageConst.PROPERTY_REAL_TOPIC`,because the fllowing [code](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1127): ``` msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); ``` That is to say, when retrieving this message, it must contains these two properties,So I don't think commenting out these codes will affect the original logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] Inconsistent serviceAccountNames [rocketmq-operator]
sinloss opened a new issue, #196: URL: https://github.com/apache/rocketmq-operator/issues/196 **BUG REPORT** When trying to deploy the `rocketmq-operator` using the [chart](https://github.com/apache/rocketmq-operator/tree/master/charts/rocketmq-operator) , though rare, user may want to assign a different name for the `serviceAccountName` *1.1*. In those cases, the helm chart is expected to work properly*1.2*. Yet in the current configuration, use could run into an error saying **serviceaccount "rocketmq-operator" not found***1.3*. This issue was first encountered on a self-hosted `Kubernetes v1.25.4` cluster with `2` nodes. The deployment was created using `Helm version.BuildInfo{Version:"v3.12.0", GitCommit:"c9f554d75773799f72ceef38c51210f1842a1dea", GitTreeState:"clean", GoVersion:"go1.20.3"}`. The root of this issue is that on one hand in the [operator.yaml](https://github.com/apache/rocketmq-operator/blob/a8665716c8028741ec106814798e39fe07e6dcaa/charts/rocketmq-operator/templates/operator.yaml#L30) the `serviceAccountName` is expected to be `rocketmq-operator`, on the other hand in the [role_binding.yaml](https://github.com/apache/rocketmq-operator/blob/a8665716c8028741ec106814798e39fe07e6dcaa/charts/rocketmq-operator/templates/role_binding.yaml#L27C11-L27C66) and the [service_account.yaml](https://github.com/apache/rocketmq-operator/blob/a8665716c8028741ec106814798e39fe07e6dcaa/charts/rocketmq-operator/templates/service_account.yaml#L19) the `serviceAccountName` is expected to be `{{ template "rocketmq-operator.serviceAccountName" . }}`. Hence, the `{{ template "rocketmq-operator.serviceAccountName" . }}` could produce names the `operator.yaml` will never acknowledge. --- *1.1* What did you do (The steps to reproduce)? *1.2* What did you expect to see? *1.3* What did you see instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [ISSUE #196] Fix serviceAccountName inconsistency [rocketmq-operator]
sinloss opened a new pull request, #197: URL: https://github.com/apache/rocketmq-operator/pull/197 ## What is the purpose of the change Closes #196 ## Brief changelog Change the serviceAccountName in the `operator.yaml` to `{{ template "rocketmq-operator.serviceAccountName" . }}`, the same as in the `role_binding.yaml` and `service_account.yaml`. ## Verifying this change This works on a `Kubernetes v1.25.4` cluster with `2` nodes, via `Helm version.BuildInfo{Version:"v3.12.0", GitCommit:"c9f554d75773799f72ceef38c51210f1842a1dea", GitTreeState:"clean", GoVersion:"go1.20.3"}`. **Please go through this checklist to help us incorporate your contribution quickly and easily.** Notice: `It would be helpful if you could finish the following checklist (the last one is not necessary) before request the community to review your PR`. - [x] Make sure there is a [Github issue](https://github.com/apache/rocketmq-operator/issues) filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue. - [x] Format the pull request title like `[ISSUE #123] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body. - [x] Write a pull request description that is detailed enough to understand what the pull request does, how, and why. - [x] Check RBAC rights for Kubernetes roles. - [ ] Write necessary unit-test to verify your logic correction, more mock a little better when cross module dependency exist. - [ ] Run `make docker-build` to build docker image for operator, try your changes from Pod inside your Kubernetes cluster, **not just locally**. Also provide screenshots to show that the RocketMQ cluster is healthy after the changes. - [ ] Before committing your changes, remember to run `make manifests` to make sure the CRD files are updated. - [ ] Update documentation if necessary. - [ ] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] [Bug] Bug title [rocketmq]
smallemployees opened a new issue, #7610: URL: https://github.com/apache/rocketmq/issues/7610 ### Before Creating the Bug Report - [X] I found a bug, not just asking a question, which should be created in [GitHub Discussions](https://github.com/apache/rocketmq/discussions). - [X] I have searched the [GitHub Issues](https://github.com/apache/rocketmq/issues) and [GitHub Discussions](https://github.com/apache/rocketmq/discussions) of this repository and believe that this is not a duplicate. - [X] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ. ### Runtime platform environment CentOS Linux release 7.9.2009 ### RocketMQ version 4.9.2 ### JDK Version 1.8.0 ### Describe the Bug Broker Role=SLAVE and flushDiskType=ASYNC_ When both FLUSH and transientStorePoolEnable=true are configured simultaneously, a system busy request is reported ### Steps to Reproduce org.apache.rocketmq.store.DefaultMessageStore.DefaultMessageStore(MessageStoreConfig messageStoreConfig, BrokerStatsManager brokerStatsManager, MessageArrivingListener messageArrivingListener, BrokerConfig brokerConfig) throws IOException if (messageStoreConfig.isTransientStorePoolEnable()) { this.transientStorePool.init(); } boolean org.apache.rocketmq.store.config.MessageStoreConfig.isTransientStorePoolEnable() public boolean isTransientStorePoolEnable() { return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType() && BrokerRole.SLAVE != getBrokerRole(); } ### What Did You Expect to See? Can the initialization of dLeger mode remove the brokerRole ### What Did You See Instead? Removing broker Role is normal ### Additional Context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GH] (rocketmq): Workflow run "Build and Run Tests by Maven" is working again!
The GitHub Actions job "Build and Run Tests by Maven" on rocketmq.git has succeeded. Run started by GitHub user lizhimins (triggered by lizhimins). Head commit for run: 956fdd8a32359344104851d15edaa79c17873ef4 / lizhimins <707364...@qq.com> [ISSUE #7585] Support message filtering in rocketmq tiered storage Report URL: https://github.com/apache/rocketmq/actions/runs/7029081773 With regards, GitHub Actions via GitBox
Re: [PR] [ISSUE #1112] feat: optimize producer send async [rocketmq-client-go]
ShannonDing merged PR #: URL: https://github.com/apache/rocketmq-client-go/pull/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(rocketmq-client-go) branch master updated: [ISSUE #1112] feat: optimize producer send async (#1111)
This is an automated email from the ASF dual-hosted git repository. dinglei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git The following commit(s) were added to refs/heads/master by this push: new 7ffb599 [ISSUE #1112] feat: optimize producer send async (#) 7ffb599 is described below commit 7ffb599169034023d9f14e38c7599dc62be1f140 Author: WeizhongTu AuthorDate: Mon Dec 4 11:12:31 2023 +0800 [ISSUE #1112] feat: optimize producer send async (#) * feat: optimize producer send async * fix: fix mq override bug --- internal/remote/remote_client.go | 25 + producer/producer.go | 2 +- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go index 36fbea7..c0ef6ce 100644 --- a/internal/remote/remote_client.go +++ b/internal/remote/remote_client.go @@ -112,23 +112,24 @@ func (c *remotingClient) InvokeSync(ctx context.Context, addr string, request *R // InvokeAsync send request without blocking, just return immediately. func (c *remotingClient) InvokeAsync(ctx context.Context, addr string, request *RemotingCommand, callback func(*ResponseFuture)) error { - conn, err := c.connect(ctx, addr) - if err != nil { - return err - } - resp := NewResponseFuture(ctx, request.Opaque, callback) c.responseTable.Store(resp.Opaque, resp) - err = c.sendRequest(ctx, conn, request) - if err != nil { - c.responseTable.Delete(request.Opaque) - return err - } - go primitive.WithRecover(func() { + defer resp.executeInvokeCallback() + defer c.responseTable.Delete(request.Opaque) + + conn, err := c.connect(ctx, addr) + if err != nil { + resp.Err = err + return + } + err = c.sendRequest(ctx, conn, request) + if err != nil { + resp.Err = err + return + } c.receiveAsync(resp) - c.responseTable.Delete(request.Opaque) }) return nil diff --git a/producer/producer.go b/producer/producer.go index 70e8d01..eb3cd2e 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -329,7 +329,7 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message, if mq != nil { lastBrokerName = mq.BrokerName } - mq := p.selectMessageQueue(msg, lastBrokerName) + mq = p.selectMessageQueue(msg, lastBrokerName) if mq == nil { err = fmt.Errorf("the topic=%s route info not found", msg.Topic) continue
Re: [PR] [ISSUE #1114] fix: response future should close channel before callback [rocketmq-client-go]
ShannonDing merged PR #1113: URL: https://github.com/apache/rocketmq-client-go/pull/1113 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(rocketmq-client-go) branch master updated: fix: response future should close channel before callback (#1113)
This is an automated email from the ASF dual-hosted git repository. dinglei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git The following commit(s) were added to refs/heads/master by this push: new 3f9c59f fix: response future should close channel before callback (#1113) 3f9c59f is described below commit 3f9c59fdf5d4cef0994939228f40886517d42d6f Author: WeizhongTu AuthorDate: Mon Dec 4 11:13:16 2023 +0800 fix: response future should close channel before callback (#1113) --- internal/remote/remote_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go index c0ef6ce..d433b8d 100644 --- a/internal/remote/remote_client.go +++ b/internal/remote/remote_client.go @@ -231,10 +231,10 @@ func (c *remotingClient) processCMD(cmd *RemotingCommand, r *tcpConnWrapper) { responseFuture := resp.(*ResponseFuture) go primitive.WithRecover(func() { responseFuture.ResponseCommand = cmd - responseFuture.executeInvokeCallback() if responseFuture.Done != nil { close(responseFuture.Done) } + responseFuture.executeInvokeCallback() }) } } else {
Re: [PR] fix WithRecover 无法recover问题 [rocketmq-client-go]
twz915 commented on PR #1074: URL: https://github.com/apache/rocketmq-client-go/pull/1074#issuecomment-1837775085 Previously, users could reassign the PanicHandler to their own handler, but now they can’t do it. Please keep the code compatible, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Support Timing Messages with Arbitrary Time Delay.(#83) [rocketmq-apis]
echooymxq commented on PR #84: URL: https://github.com/apache/rocketmq-apis/pull/84#issuecomment-1837784542 @lizhanhui Sorry for late reply, really respect your wrok. As I understand it, like (JMS(ActiveMQ), the delivery_delay should be send to broker and the broker calculated the actually delivery_time. I don't know if I'm understanding you correctly, you mean like pulsar? pulsar support `deliverAt` and `deliverAfter`, the actual delivery time is calculated by the client. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [ISSUE #7545] Fix set mapped file to null cause file can not destroy [rocketmq]
lizhimins opened a new pull request, #7612: URL: https://github.com/apache/rocketmq/pull/7612 ### Which Issue(s) This PR Fixes Fixes #7545, Fix set mapped file to null cause file can not destroy in tiered storage index ### Brief Description ### How Did You Test This Change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GH] (rocketmq): Workflow run "E2E test for pull request" failed!
The GitHub Actions job "E2E test for pull request" on rocketmq.git has failed. Run started by GitHub user lizhimins (triggered by lizhimins). Head commit for run: 65faea22fd54fd9875f2ca9d3088b4dc46d31cce / keranbingaa <397294...@qq.com> [ISSUE #7534] Use high performance concurrent set to replace copyonwriteset (#7583) * fix ISSUE #7534 * reformat code * Remove the useless unit test - Co-authored-by: RongtongJin Report URL: https://github.com/apache/rocketmq/actions/runs/7081741202 With regards, GitHub Actions via GitBox
Re: [PR] [ISSUE #7545] Fix set mapped file to null cause file can not destroy [rocketmq]
codecov-commenter commented on PR #7612: URL: https://github.com/apache/rocketmq/pull/7612#issuecomment-1837805978 ## [Codecov](https://app.codecov.io/gh/apache/rocketmq/pull/7612?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report Attention: `3 lines` in your changes are missing coverage. Please review. > Comparison is base [(`65faea2`)](https://app.codecov.io/gh/apache/rocketmq/commit/65faea22fd54fd9875f2ca9d3088b4dc46d31cce?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) 43.19% compared to head [(`c9c80a5`)](https://app.codecov.io/gh/apache/rocketmq/pull/7612?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) 43.22%. | [Files](https://app.codecov.io/gh/apache/rocketmq/pull/7612?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines | |---|---|---| | [.../rocketmq/tieredstore/index/IndexStoreService.java](https://app.codecov.io/gh/apache/rocketmq/pull/7612?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-dGllcmVkc3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3RpZXJlZHN0b3JlL2luZGV4L0luZGV4U3RvcmVTZXJ2aWNlLmphdmE=) | 50.00% | [2 Missing and 1 partial :warning: ](https://app.codecov.io/gh/apache/rocketmq/pull/7612?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Additional details and impacted files ```diff @@ Coverage Diff @@ ## develop#7612 +/- ## = + Coverage 43.19% 43.22% +0.02% - Complexity 9782 9792 +10 = Files 1162 1162 Lines 8437884382 +4 Branches 1095510956 +1 = + Hits 3645136477 +26 + Misses 4339743374 -23 - Partials4530 4531 +1 ``` [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/rocketmq/pull/7612?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). :loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GH] (rocketmq): Workflow run "Build and Run Tests by Maven" failed!
The GitHub Actions job "Build and Run Tests by Maven" on rocketmq.git has failed. Run started by GitHub user lizhimins (triggered by lizhimins). Head commit for run: c9c80a5d49d57da0e9de6006d16679ec9b986a05 / lizhimins <707364...@qq.com> [ISSUE #7545] Fix set mapped file to null cause file can not destroy Report URL: https://github.com/apache/rocketmq/actions/runs/7081726564 With regards, GitHub Actions via GitBox
Re: [PR] Delete logback1.2.3 version conflict [rocketmq-mqtt]
pingww merged PR #227: URL: https://github.com/apache/rocketmq-mqtt/pull/227 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(rocketmq-mqtt) branch develop updated: delete logback1.2.3 version conflict
This is an automated email from the ASF dual-hosted git repository. pingww pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git The following commit(s) were added to refs/heads/develop by this push: new f66ecb0 delete logback1.2.3 version conflict new 0d51d3a Merge pull request #227 from EssinZhang/delete-logback-conflict f66ecb0 is described below commit f66ecb08dd46106adab62de1f5d519dd1fbfbbc9 Author: EssinZhang AuthorDate: Wed Nov 29 21:43:52 2023 +0800 delete logback1.2.3 version conflict --- pom.xml | 5 - 1 file changed, 5 deletions(-) diff --git a/pom.xml b/pom.xml index 7e9ac4e..c99cc65 100644 --- a/pom.xml +++ b/pom.xml @@ -127,11 +127,6 @@ slf4j-api 1.7.15 - -ch.qos.logback -logback-classic -1.2.3 - ch.qos.logback logback-classic
Re: [I] 项目升级rocketmq-client 5.1.4版本后,org.apache.rocketmq.mqtt.ds.store.LmqQueueStoreManager#maxOffset 方法,报throw new MQClientException("The broker[" + queue.getBrokerName() + "] not exist", null); [roc
DongyuanPan commented on issue #228: URL: https://github.com/apache/rocketmq-mqtt/issues/228#issuecomment-1837899458 下面这个逻辑也没走到吗?  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [ISSUE #103] Fix seek offset and deserialize bug in newSource [rocketmq-flink]
lizhimins merged PR #104: URL: https://github.com/apache/rocketmq-flink/pull/104 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(rocketmq-flink) branch main updated: [ISSUE #103] Fix seek offset and deserialize bug in newSource (#104)
This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git The following commit(s) were added to refs/heads/main by this push: new 1576ae2 [ISSUE #103] Fix seek offset and deserialize bug in newSource (#104) 1576ae2 is described below commit 1576ae2d4d51604a464df0fb21074f67c9e50c0f Author: hejunjie <844028...@qq.com> AuthorDate: Mon Dec 4 14:34:52 2023 +0800 [ISSUE #103] Fix seek offset and deserialize bug in newSource (#104) --- pom.xml | 1 + .../flink/connector/rocketmq/source/InnerConsumerImpl.java | 13 ++--- .../enumerator/RocketMQSourceEnumStateSerializer.java | 5 ++--- .../source/enumerator/RocketMQSourceEnumerator.java | 11 ++- .../rocketmq/source/reader/RocketMQSplitReader.java | 3 --- 5 files changed, 19 insertions(+), 14 deletions(-) diff --git a/pom.xml b/pom.xml index 67f322d..2eddd1a 100644 --- a/pom.xml +++ b/pom.xml @@ -46,6 +46,7 @@ 4.13.2 5.9.2 1.7.4 +2.3.1 diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java b/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java index edc438b..317031f 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java @@ -17,6 +17,7 @@ package org.apache.flink.connector.rocketmq.source; +import com.alibaba.fastjson.JSON; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.rocketmq.source.enumerator.offset.OffsetsSelector; @@ -25,8 +26,6 @@ import org.apache.flink.connector.rocketmq.source.reader.MessageViewExt; import org.apache.flink.connector.rocketmq.source.util.UtilAll; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.StringUtils; - -import com.alibaba.fastjson.JSON; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; @@ -240,7 +239,15 @@ public class InnerConsumerImpl implements InnerConsumer { long offset = consumer.getOffsetStore() .readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE); -LOG.error( + +if (offset == -1) { +offset = adminExt.minOffset(messageQueue); +LOG.info( +"Consumer seek committed offset from remote, offset=-1,mq={},use minOffset={}", +UtilAll.getQueueDescription(messageQueue), +offset); +} +LOG.info( "Consumer seek committed offset from remote, mq={}, offset={}", UtilAll.getQueueDescription(messageQueue), offset); diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java index 7589ba4..805df1b 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java @@ -18,9 +18,8 @@ package org.apache.flink.connector.rocketmq.source.enumerator; -import org.apache.flink.core.io.SimpleVersionedSerializer; - import com.alibaba.fastjson.JSON; +import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.rocketmq.common.message.MessageQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,7 +92,7 @@ public class RocketMQSourceEnumStateSerializer String topic = in.readUTF(); int queueId = in.readInt(); -MessageQueue queue = new MessageQueue(brokerName, topic, queueId); +MessageQueue queue = new MessageQueue(topic, brokerName, queueId); result.add(queue); } diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java index 6103444..77c0c33 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnu
[GH] (rocketmq): Workflow run "Build and Run Tests by Maven" failed!
The GitHub Actions job "Build and Run Tests by Maven" on rocketmq.git has failed. Run started by GitHub user lizhimins (triggered by lollipopjin). Head commit for run: c9c80a5d49d57da0e9de6006d16679ec9b986a05 / lizhimins <707364...@qq.com> [ISSUE #7545] Fix set mapped file to null cause file can not destroy Report URL: https://github.com/apache/rocketmq/actions/runs/7081726564 With regards, GitHub Actions via GitBox
Re: [PR] [ISSUE #7585] Support message filtering in rocketmq tiered storage [rocketmq]
lollipopjin commented on code in PR #7594: URL: https://github.com/apache/rocketmq/pull/7594#discussion_r1413322104 ## tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java: ## @@ -303,74 +270,94 @@ public CompletableFuture getMessageFromCacheAsync(CompositeQue recordCacheAccess(flatFile, group, queueOffset, resultWrapperList); -// if cache hit, result will be returned immediately and asynchronously prefetch messages for later requests -if (!resultWrapperList.isEmpty()) { -LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: cache hit: " + -"topic: {}, queue: {}, queue offset: {}, max message num: {}, cache hit num: {}", -mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, resultWrapperList.size()); -prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1); +if (resultWrapperList.isEmpty()) { +// If cache miss, pull messages immediately +LOGGER.info("MessageFetcher cache miss, group: {}, topic: {}, queueId: {}, offset: {}, maxCount: {}", +group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount); +} else { +// If cache hit, return buffer result immediately and asynchronously prefetch messages +LOGGER.debug("MessageFetcher cache hit, group: {}, topic: {}, queueId: {}, offset: {}, maxCount: {}, resultSize: {}", +group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, resultWrapperList.size()); -GetMessageResult result = new GetMessageResult(); +GetMessageResultExt result = new GetMessageResultExt(); result.setStatus(GetMessageStatus.FOUND); result.setMinOffset(flatFile.getConsumeQueueMinOffset()); result.setMaxOffset(flatFile.getConsumeQueueCommitOffset()); result.setNextBeginOffset(queueOffset + resultWrapperList.size()); -resultWrapperList.forEach(wrapper -> result.addMessage(wrapper.getDuplicateResult(), wrapper.getCurOffset())); +resultWrapperList.forEach(wrapper -> result.addMessageExt( +wrapper.getDuplicateResult(), wrapper.getOffset(), wrapper.getTagCode())); + +if (lastGetOffset < result.getMaxOffset()) { +this.prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1); +} return CompletableFuture.completedFuture(result); } -// if cache is miss, immediately pull messages -LOGGER.info("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: " + -"topic: {}, queue: {}, queue offset: {}, max message num: {}", -mq.getTopic(), mq.getQueueId(), queueOffset, maxCount); - -CompletableFuture resultFuture; +CompletableFuture resultFuture; synchronized (flatFile) { int batchSize = maxCount * storeConfig.getReadAheadMinFactor(); resultFuture = getMessageFromTieredStoreAsync(flatFile, queueOffset, batchSize) -.thenApplyAsync(result -> { +.thenApply(result -> { if (result.getStatus() != GetMessageStatus.FOUND) { return result; } -GetMessageResult newResult = new GetMessageResult(); -newResult.setStatus(GetMessageStatus.FOUND); - newResult.setMinOffset(flatFile.getConsumeQueueMinOffset()); - newResult.setMaxOffset(flatFile.getConsumeQueueCommitOffset()); +GetMessageResultExt newResult = new GetMessageResultExt(); List offsetList = result.getMessageQueueOffset(); +List tagCodeList = result.getTagCodeList(); List msgList = result.getMessageMapedList(); -Long minOffset = offsetList.get(0); -Long maxOffset = offsetList.get(offsetList.size() - 1); -int size = offsetList.size(); + for (int i = 0; i < offsetList.size(); i++) { -Long offset = offsetList.get(i); SelectMappedBufferResult msg = msgList.get(i); -// put message into cache -SelectMappedBufferResultWrapper resultWrapper = putMessageToCache(flatFile, offset, msg, minOffset, maxOffset, size, true); -// try to meet maxCount +SelectBufferResultWrapper bufferResult = new SelectBufferResultWrapper( +msg, offsetList.get(i), tagCodeList.get(i), true); +this.putMessageToCache(flatFile, bufferResult); if (newResult.getMessageMapedList().size() < maxCount) { - newResult.addMessage(resultWrapper.getDuplicateResult(), offset); +
[GH] (rocketmq): Workflow run "Build and Run Tests by Maven" failed!
The GitHub Actions job "Build and Run Tests by Maven" on rocketmq.git has failed. Run started by GitHub user lizhimins (triggered by lizhimins). Head commit for run: c9c80a5d49d57da0e9de6006d16679ec9b986a05 / lizhimins <707364...@qq.com> [ISSUE #7545] Fix set mapped file to null cause file can not destroy Report URL: https://github.com/apache/rocketmq/actions/runs/7081726564 With regards, GitHub Actions via GitBox
Re: [PR] [ISSUE #7585] Support message filtering in rocketmq tiered storage [rocketmq]
lizhimins commented on code in PR #7594: URL: https://github.com/apache/rocketmq/pull/7594#discussion_r1413454902 ## tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java: ## @@ -303,74 +270,94 @@ public CompletableFuture getMessageFromCacheAsync(CompositeQue recordCacheAccess(flatFile, group, queueOffset, resultWrapperList); -// if cache hit, result will be returned immediately and asynchronously prefetch messages for later requests -if (!resultWrapperList.isEmpty()) { -LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: cache hit: " + -"topic: {}, queue: {}, queue offset: {}, max message num: {}, cache hit num: {}", -mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, resultWrapperList.size()); -prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1); +if (resultWrapperList.isEmpty()) { +// If cache miss, pull messages immediately +LOGGER.info("MessageFetcher cache miss, group: {}, topic: {}, queueId: {}, offset: {}, maxCount: {}", +group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount); +} else { +// If cache hit, return buffer result immediately and asynchronously prefetch messages +LOGGER.debug("MessageFetcher cache hit, group: {}, topic: {}, queueId: {}, offset: {}, maxCount: {}, resultSize: {}", +group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, resultWrapperList.size()); -GetMessageResult result = new GetMessageResult(); +GetMessageResultExt result = new GetMessageResultExt(); result.setStatus(GetMessageStatus.FOUND); result.setMinOffset(flatFile.getConsumeQueueMinOffset()); result.setMaxOffset(flatFile.getConsumeQueueCommitOffset()); result.setNextBeginOffset(queueOffset + resultWrapperList.size()); -resultWrapperList.forEach(wrapper -> result.addMessage(wrapper.getDuplicateResult(), wrapper.getCurOffset())); +resultWrapperList.forEach(wrapper -> result.addMessageExt( +wrapper.getDuplicateResult(), wrapper.getOffset(), wrapper.getTagCode())); + +if (lastGetOffset < result.getMaxOffset()) { +this.prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1); +} return CompletableFuture.completedFuture(result); } -// if cache is miss, immediately pull messages -LOGGER.info("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: " + -"topic: {}, queue: {}, queue offset: {}, max message num: {}", -mq.getTopic(), mq.getQueueId(), queueOffset, maxCount); - -CompletableFuture resultFuture; +CompletableFuture resultFuture; synchronized (flatFile) { int batchSize = maxCount * storeConfig.getReadAheadMinFactor(); resultFuture = getMessageFromTieredStoreAsync(flatFile, queueOffset, batchSize) -.thenApplyAsync(result -> { +.thenApply(result -> { if (result.getStatus() != GetMessageStatus.FOUND) { return result; } -GetMessageResult newResult = new GetMessageResult(); -newResult.setStatus(GetMessageStatus.FOUND); - newResult.setMinOffset(flatFile.getConsumeQueueMinOffset()); - newResult.setMaxOffset(flatFile.getConsumeQueueCommitOffset()); +GetMessageResultExt newResult = new GetMessageResultExt(); List offsetList = result.getMessageQueueOffset(); +List tagCodeList = result.getTagCodeList(); List msgList = result.getMessageMapedList(); -Long minOffset = offsetList.get(0); -Long maxOffset = offsetList.get(offsetList.size() - 1); -int size = offsetList.size(); + for (int i = 0; i < offsetList.size(); i++) { -Long offset = offsetList.get(i); SelectMappedBufferResult msg = msgList.get(i); -// put message into cache -SelectMappedBufferResultWrapper resultWrapper = putMessageToCache(flatFile, offset, msg, minOffset, maxOffset, size, true); -// try to meet maxCount +SelectBufferResultWrapper bufferResult = new SelectBufferResultWrapper( +msg, offsetList.get(i), tagCodeList.get(i), true); +this.putMessageToCache(flatFile, bufferResult); if (newResult.getMessageMapedList().size() < maxCount) { - newResult.addMessage(resultWrapper.getDuplicateResult(), offset); +
Re: [PR] [ISSUE #7585] Support message filtering in rocketmq tiered storage [rocketmq]
lizhimins commented on code in PR #7594: URL: https://github.com/apache/rocketmq/pull/7594#discussion_r1413454902 ## tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java: ## @@ -303,74 +270,94 @@ public CompletableFuture getMessageFromCacheAsync(CompositeQue recordCacheAccess(flatFile, group, queueOffset, resultWrapperList); -// if cache hit, result will be returned immediately and asynchronously prefetch messages for later requests -if (!resultWrapperList.isEmpty()) { -LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: cache hit: " + -"topic: {}, queue: {}, queue offset: {}, max message num: {}, cache hit num: {}", -mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, resultWrapperList.size()); -prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1); +if (resultWrapperList.isEmpty()) { +// If cache miss, pull messages immediately +LOGGER.info("MessageFetcher cache miss, group: {}, topic: {}, queueId: {}, offset: {}, maxCount: {}", +group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount); +} else { +// If cache hit, return buffer result immediately and asynchronously prefetch messages +LOGGER.debug("MessageFetcher cache hit, group: {}, topic: {}, queueId: {}, offset: {}, maxCount: {}, resultSize: {}", +group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, resultWrapperList.size()); -GetMessageResult result = new GetMessageResult(); +GetMessageResultExt result = new GetMessageResultExt(); result.setStatus(GetMessageStatus.FOUND); result.setMinOffset(flatFile.getConsumeQueueMinOffset()); result.setMaxOffset(flatFile.getConsumeQueueCommitOffset()); result.setNextBeginOffset(queueOffset + resultWrapperList.size()); -resultWrapperList.forEach(wrapper -> result.addMessage(wrapper.getDuplicateResult(), wrapper.getCurOffset())); +resultWrapperList.forEach(wrapper -> result.addMessageExt( +wrapper.getDuplicateResult(), wrapper.getOffset(), wrapper.getTagCode())); + +if (lastGetOffset < result.getMaxOffset()) { +this.prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1); +} return CompletableFuture.completedFuture(result); } -// if cache is miss, immediately pull messages -LOGGER.info("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: " + -"topic: {}, queue: {}, queue offset: {}, max message num: {}", -mq.getTopic(), mq.getQueueId(), queueOffset, maxCount); - -CompletableFuture resultFuture; +CompletableFuture resultFuture; synchronized (flatFile) { int batchSize = maxCount * storeConfig.getReadAheadMinFactor(); resultFuture = getMessageFromTieredStoreAsync(flatFile, queueOffset, batchSize) -.thenApplyAsync(result -> { +.thenApply(result -> { if (result.getStatus() != GetMessageStatus.FOUND) { return result; } -GetMessageResult newResult = new GetMessageResult(); -newResult.setStatus(GetMessageStatus.FOUND); - newResult.setMinOffset(flatFile.getConsumeQueueMinOffset()); - newResult.setMaxOffset(flatFile.getConsumeQueueCommitOffset()); +GetMessageResultExt newResult = new GetMessageResultExt(); List offsetList = result.getMessageQueueOffset(); +List tagCodeList = result.getTagCodeList(); List msgList = result.getMessageMapedList(); -Long minOffset = offsetList.get(0); -Long maxOffset = offsetList.get(offsetList.size() - 1); -int size = offsetList.size(); + for (int i = 0; i < offsetList.size(); i++) { -Long offset = offsetList.get(i); SelectMappedBufferResult msg = msgList.get(i); -// put message into cache -SelectMappedBufferResultWrapper resultWrapper = putMessageToCache(flatFile, offset, msg, minOffset, maxOffset, size, true); -// try to meet maxCount +SelectBufferResultWrapper bufferResult = new SelectBufferResultWrapper( +msg, offsetList.get(i), tagCodeList.get(i), true); +this.putMessageToCache(flatFile, bufferResult); if (newResult.getMessageMapedList().size() < maxCount) { - newResult.addMessage(resultWrapper.getDuplicateResult(), offset); +
Re: [PR] [ISSUE #7263& ISSUE #7264]Fix to delete synchronously the consumption offset data of the subscription group on the standby node, when execute command 'mqadmin deleteSubGroup -b xxx -g xxx -r
frinda commented on PR #7273: URL: https://github.com/apache/rocketmq/pull/7273#issuecomment-1837987681 > I think the title might be too long, suggest renaming `[ISSUE xx& ISSUE xxx] Fix: xxx` It's been changed. Please submit it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] golang: optimize the logic of obtaining routing tasks [rocketmq-clients]
leizhiyuan commented on PR #580: URL: https://github.com/apache/rocketmq-clients/pull/580#issuecomment-1837994026 After running for a period of time, the consumer may encounter a timeout in the request routing table due to network fluctuations or other abnormal reasons. In previous versions, an error message "messageQueues is empty" would occur and it seems unable to recover. If you encounter this issue, please upgrade to this version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [ISSUE #7263& ISSUE #7264]Fix to delete synchronously the consumption offset data of the subscription group on the standby node, when execute command 'mqadmin deleteSubGroup -b xxx -g xxx -r
iamgd67 commented on PR #7273: URL: https://github.com/apache/rocketmq/pull/7273#issuecomment-1837998805 > > I think the title might be too long, suggest renaming `[ISSUE xx& ISSUE xxx] Fix: xxx` > > @ joeCarf It's been changed. Please submit it,Thanks still too long, maybe this? > [ISSUE #7263& ISSUE #7264] consumption offset data and topic consumerqueue not deleted on the standby node -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] [Bug] Consumer still consume new message but the offset of two queues always keep the old value [rocketmq]
tongtaodragon opened a new issue, #7613: URL: https://github.com/apache/rocketmq/issues/7613 ### Before Creating the Bug Report - [X] I found a bug, not just asking a question, which should be created in [GitHub Discussions](https://github.com/apache/rocketmq/discussions). - [X] I have searched the [GitHub Issues](https://github.com/apache/rocketmq/issues) and [GitHub Discussions](https://github.com/apache/rocketmq/discussions) of this repository and believe that this is not a duplicate. - [X] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ. ### Runtime platform environment Linux ### RocketMQ version Broker: 4.8.0 Client: 4.9.3 ### JDK Version java version "1.8.0_121" ### Describe the Bug A topic has 4 brokers and 16 queues totally. One consumer group has two instance. After the issue observed, find the cg lag keeps growing. We identify two queues in one broker have the issue. The offset of these two queues has no change. We use tcpdump check the message sent from client to broker and found the offset sent from client always use the old value. Then we check the dump file of client and find the offset in dump of these two queues use old value. At the same time we find consumer still consume message successfully from all queues. Since it's production environment we use admin tool reset offset. Now the problem has gone. ### Steps to Reproduce Don't know how to reproduce it currently. Some clues 1. Our client started in docker container. 2. Due to node issue we have to restart all nodes one by one. 3. The message of that offset was consumed by one old container but this message is still pulled by new started container. 4. We use pushConsumer model. 5. Other 14 queues offset is normal ### What Did You Expect to See? No lag and offset is normal ### What Did You See Instead? Lag keep growing and offset maintain old value ### Additional Context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org