Re: [PR] fix slave acting master bug [rocketmq]

2023-12-03 Thread via GitHub


gaoyf commented on code in PR #7603:
URL: https://github.com/apache/rocketmq/pull/7603#discussion_r1413285830


##
store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java:
##
@@ -1140,8 +1146,8 @@ public MessageExtBrokerInner convertMessage(MessageExt 
msgExt, boolean needRoll)
 } else {
 
msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
 
msgInner.setQueueId(Integer.parseInt(msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
-MessageAccessor.clearProperty(msgInner, 
MessageConst.PROPERTY_REAL_TOPIC);
-MessageAccessor.clearProperty(msgInner, 
MessageConst.PROPERTY_REAL_QUEUE_ID);
+// MessageAccessor.clearProperty(msgInner, 
MessageConst.PROPERTY_REAL_TOPIC);

Review Comment:
   I extracted 
[TimerDequeuePutMessageService](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1486)‘s
 some key codes, as follows:
   ```
   while (!isStopped() && !doRes) {
   try {
   MessageExtBrokerInner msg = convert(msgExt, tr.getEnqueueTime(), 
needRoll(tr.getMagic()));
   doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic()));
   } catch (Throwable t) {
   LOGGER.info("Unknown error", t);
   }
   }
   ```
   If method  `doPut` throw exception, the logic try to execute method 
`convert` in a loop。
   However, `MessageExtBrokerInner` and `MessageExt` hold the same Map of 
`properties`, 
[convertMessage](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1122)
 FYI, so the following code will return null:
   ```
   msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC)
   msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)
   ```
   Because property `MessageConst.PROPERTY_REAL_QUEUE_ID` has been removed in 
the first execution.
   **So canot remove these properties for the retry logic.**



##
store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java:
##
@@ -1140,8 +1146,8 @@ public MessageExtBrokerInner convertMessage(MessageExt 
msgExt, boolean needRoll)
 } else {
 
msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
 
msgInner.setQueueId(Integer.parseInt(msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
-MessageAccessor.clearProperty(msgInner, 
MessageConst.PROPERTY_REAL_TOPIC);
-MessageAccessor.clearProperty(msgInner, 
MessageConst.PROPERTY_REAL_QUEUE_ID);
+// MessageAccessor.clearProperty(msgInner, 
MessageConst.PROPERTY_REAL_TOPIC);

Review Comment:
   I extracted 
[TimerDequeuePutMessageService](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1486)‘s
 some key codes, as follows:
   ```
   while (!isStopped() && !doRes) {
   try {
   MessageExtBrokerInner msg = convert(msgExt, tr.getEnqueueTime(), 
needRoll(tr.getMagic()));
   doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic()));
   } catch (Throwable t) {
   LOGGER.info("Unknown error", t);
   }
   }
   ```
   If method  `doPut` throw exception, the logic try to execute method 
`convert` in a loop。
   However, `MessageExtBrokerInner` and `MessageExt` hold the same Map of 
`properties`, 
[convertMessage](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1122)
 FYI, so the following code will return null:
   ```
   msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC)
   msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)
   ```
   Because property `MessageConst.PROPERTY_REAL_QUEUE_ID` has been removed in 
the first execution.
   **So canot remove these properties for the retry logic.**



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] fix slave acting master bug [rocketmq]

2023-12-03 Thread via GitHub


gaoyf commented on code in PR #7603:
URL: https://github.com/apache/rocketmq/pull/7603#discussion_r1413285830


##
store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java:
##
@@ -1140,8 +1146,8 @@ public MessageExtBrokerInner convertMessage(MessageExt 
msgExt, boolean needRoll)
 } else {
 
msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
 
msgInner.setQueueId(Integer.parseInt(msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
-MessageAccessor.clearProperty(msgInner, 
MessageConst.PROPERTY_REAL_TOPIC);
-MessageAccessor.clearProperty(msgInner, 
MessageConst.PROPERTY_REAL_QUEUE_ID);
+// MessageAccessor.clearProperty(msgInner, 
MessageConst.PROPERTY_REAL_TOPIC);

Review Comment:
   I extracted 
[TimerDequeuePutMessageService](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1486)‘s
 some key codes, as follows:
   ```
   while (!isStopped() && !doRes) {
   try {
   MessageExtBrokerInner msg = convert(msgExt, tr.getEnqueueTime(), 
needRoll(tr.getMagic()));
   doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic()));
   } catch (Throwable t) {
   LOGGER.info("Unknown error", t);
   }
   }
   ```
   If method  `doPut` throw exception, the logic try to execute method 
`convert` in a loop。
   However, `MessageExtBrokerInner` and `MessageExt` hold the same Map of 
`properties`, 
[convertMessage](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1122)
 FYI, so the following code will return null:
   ```
   msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC)
   msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)
   ```
   Because property `MessageConst.PROPERTY_REAL_QUEUE_ID` has been removed in 
the first execution.
   **So canot remove these properties for the retry logic.**
   On the other hand,`MessageExtBrokerInner`‘s will reput to commitlog,it will 
contains property `MessageConst.PROPERTY_REAL_TOPIC` and 
`MessageConst.PROPERTY_REAL_TOPIC`,because the fllowing 
[code](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1127):
   ```
   
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
   ```
   So I don't think commenting out these codes will affect the previous 
functionality



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] fix slave acting master bug [rocketmq]

2023-12-03 Thread via GitHub


gaoyf commented on code in PR #7603:
URL: https://github.com/apache/rocketmq/pull/7603#discussion_r1413285830


##
store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java:
##
@@ -1140,8 +1146,8 @@ public MessageExtBrokerInner convertMessage(MessageExt 
msgExt, boolean needRoll)
 } else {
 
msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
 
msgInner.setQueueId(Integer.parseInt(msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
-MessageAccessor.clearProperty(msgInner, 
MessageConst.PROPERTY_REAL_TOPIC);
-MessageAccessor.clearProperty(msgInner, 
MessageConst.PROPERTY_REAL_QUEUE_ID);
+// MessageAccessor.clearProperty(msgInner, 
MessageConst.PROPERTY_REAL_TOPIC);

Review Comment:
   I extracted 
[TimerDequeuePutMessageService](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1486)‘s
 some key codes, as follows:
   ```
   while (!isStopped() && !doRes) {
   try {
   MessageExtBrokerInner msg = convert(msgExt, tr.getEnqueueTime(), 
needRoll(tr.getMagic()));
   doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic()));
   } catch (Throwable t) {
   LOGGER.info("Unknown error", t);
   }
   }
   ```
   If method  `doPut` throw exception, the logic try to execute method 
`convert` in a loop。
   However, `MessageExtBrokerInner` and `MessageExt` hold the same Map of 
`properties`, 
[convertMessage](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1122)
 FYI, so the following code will return null:
   ```
   msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC)
   msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)
   ```
   Because property `MessageConst.PROPERTY_REAL_QUEUE_ID` has been removed in 
the first execution.
   **So canot remove these properties for the retry logic.**
   On the other hand,`MessageExtBrokerInner` will be reputed to commitlog,it 
will contains property `MessageConst.PROPERTY_REAL_TOPIC` and 
`MessageConst.PROPERTY_REAL_TOPIC`,because the fllowing 
[code](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1127):
   ```
   
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
   ```
   So I don't think commenting out these codes will affect the previous 
functionality



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] fix slave acting master bug [rocketmq]

2023-12-03 Thread via GitHub


gaoyf commented on code in PR #7603:
URL: https://github.com/apache/rocketmq/pull/7603#discussion_r1413285830


##
store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java:
##
@@ -1140,8 +1146,8 @@ public MessageExtBrokerInner convertMessage(MessageExt 
msgExt, boolean needRoll)
 } else {
 
msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
 
msgInner.setQueueId(Integer.parseInt(msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
-MessageAccessor.clearProperty(msgInner, 
MessageConst.PROPERTY_REAL_TOPIC);
-MessageAccessor.clearProperty(msgInner, 
MessageConst.PROPERTY_REAL_QUEUE_ID);
+// MessageAccessor.clearProperty(msgInner, 
MessageConst.PROPERTY_REAL_TOPIC);

Review Comment:
   I extracted 
[TimerDequeuePutMessageService](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1486)‘s
 some key codes, as follows:
   ```
   while (!isStopped() && !doRes) {
   try {
   MessageExtBrokerInner msg = convert(msgExt, tr.getEnqueueTime(), 
needRoll(tr.getMagic()));
   doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic()));
   } catch (Throwable t) {
   LOGGER.info("Unknown error", t);
   }
   }
   ```
   If method  `doPut` throw exception, the logic try to execute method 
`convert` in a loop。
   However, `MessageExtBrokerInner` and `MessageExt` hold the same Map of 
`properties`, 
[convertMessage](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1122)
 FYI, so the following code will return null:
   ```
   msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC)
   msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)
   ```
   Because property `MessageConst.PROPERTY_REAL_QUEUE_ID` has been removed in 
the first execution.
   **So canot remove these properties for the retry logic.**
   On the other hand,**the original logic**: `MessageExtBrokerInner` will be 
reputed to commitlog,it will contains property 
`MessageConst.PROPERTY_REAL_TOPIC` and 
`MessageConst.PROPERTY_REAL_TOPIC`,because the fllowing 
[code](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1127):
   ```
   
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
   ```
   That is to say, when retrieving this message, it must contains these two 
properties,So I don't think commenting out these codes will affect the previous 
functionality



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] fix slave acting master bug [rocketmq]

2023-12-03 Thread via GitHub


gaoyf commented on code in PR #7603:
URL: https://github.com/apache/rocketmq/pull/7603#discussion_r1413285830


##
store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java:
##
@@ -1140,8 +1146,8 @@ public MessageExtBrokerInner convertMessage(MessageExt 
msgExt, boolean needRoll)
 } else {
 
msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
 
msgInner.setQueueId(Integer.parseInt(msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
-MessageAccessor.clearProperty(msgInner, 
MessageConst.PROPERTY_REAL_TOPIC);
-MessageAccessor.clearProperty(msgInner, 
MessageConst.PROPERTY_REAL_QUEUE_ID);
+// MessageAccessor.clearProperty(msgInner, 
MessageConst.PROPERTY_REAL_TOPIC);

Review Comment:
   I extracted 
[TimerDequeuePutMessageService](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1486)‘s
 some key codes, as follows:
   ```
   while (!isStopped() && !doRes) {
   try {
   MessageExtBrokerInner msg = convert(msgExt, tr.getEnqueueTime(), 
needRoll(tr.getMagic()));
   doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic()));
   } catch (Throwable t) {
   LOGGER.info("Unknown error", t);
   }
   }
   ```
   If method  `doPut` throw exception, the logic try to execute method 
`convert` in a loop。
   However, `MessageExtBrokerInner` and `MessageExt` hold the same Map of 
`properties`, 
[convertMessage](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1122)
 FYI, so the following code will return null:
   ```
   msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC)
   msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)
   ```
   Because property `MessageConst.PROPERTY_REAL_QUEUE_ID` has been removed in 
the first execution.
   **So canot remove these properties for the retry logic.**
   On the other hand,**the original logic**: `MessageExtBrokerInner` will be 
reputed to commitlog,it will contains property 
`MessageConst.PROPERTY_REAL_TOPIC` and 
`MessageConst.PROPERTY_REAL_TOPIC`,because the fllowing 
[code](https://github.com/apache/rocketmq/blob/develop/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java#L1127):
   ```
   
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
   ```
   That is to say, when retrieving this message, it must contains these two 
properties,So I don't think commenting out these codes will affect the original 
logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] Inconsistent serviceAccountNames [rocketmq-operator]

2023-12-03 Thread via GitHub


sinloss opened a new issue, #196:
URL: https://github.com/apache/rocketmq-operator/issues/196

   **BUG REPORT**
   
   When trying to deploy the `rocketmq-operator` using the 
[chart](https://github.com/apache/rocketmq-operator/tree/master/charts/rocketmq-operator)
 , though rare, user may want to assign a different name for the 
`serviceAccountName` *1.1*. In those cases, the helm chart is 
expected to work properly*1.2*. Yet in the current configuration, 
use could run into an error saying **serviceaccount "rocketmq-operator" not 
found***1.3*.
   
   This issue was first encountered on a self-hosted `Kubernetes v1.25.4` 
cluster with `2` nodes. The deployment was created using `Helm 
version.BuildInfo{Version:"v3.12.0", 
GitCommit:"c9f554d75773799f72ceef38c51210f1842a1dea", GitTreeState:"clean", 
GoVersion:"go1.20.3"}`.
   
   The root of this issue is that on one hand in the 
[operator.yaml](https://github.com/apache/rocketmq-operator/blob/a8665716c8028741ec106814798e39fe07e6dcaa/charts/rocketmq-operator/templates/operator.yaml#L30)
 the `serviceAccountName` is expected to be `rocketmq-operator`, on the other 
hand in the 
[role_binding.yaml](https://github.com/apache/rocketmq-operator/blob/a8665716c8028741ec106814798e39fe07e6dcaa/charts/rocketmq-operator/templates/role_binding.yaml#L27C11-L27C66)
 and the 
[service_account.yaml](https://github.com/apache/rocketmq-operator/blob/a8665716c8028741ec106814798e39fe07e6dcaa/charts/rocketmq-operator/templates/service_account.yaml#L19)
 the `serviceAccountName` is expected to be `{{ template 
"rocketmq-operator.serviceAccountName" . }}`. Hence, the `{{ template 
"rocketmq-operator.serviceAccountName" . }}` could produce names the 
`operator.yaml` will never acknowledge.
   
   ---
   *1.1* What did you do (The steps to reproduce)?
   *1.2* What did you expect to see?
   *1.3* What did you see instead?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [ISSUE #196] Fix serviceAccountName inconsistency [rocketmq-operator]

2023-12-03 Thread via GitHub


sinloss opened a new pull request, #197:
URL: https://github.com/apache/rocketmq-operator/pull/197

   ## What is the purpose of the change
   
   Closes #196 
   
   ## Brief changelog
   
   Change the serviceAccountName in the `operator.yaml` to `{{ template 
"rocketmq-operator.serviceAccountName" . }}`, the same as in the 
`role_binding.yaml` and `service_account.yaml`.
   
   ## Verifying this change
   
   This works on a `Kubernetes v1.25.4` cluster with `2` nodes, via `Helm 
version.BuildInfo{Version:"v3.12.0", 
GitCommit:"c9f554d75773799f72ceef38c51210f1842a1dea", GitTreeState:"clean", 
GoVersion:"go1.20.3"}`.
   
   **Please go through this checklist to help us incorporate your contribution 
quickly and easily.**
   
   Notice: `It would be helpful if you could finish the following checklist 
(the last one is not necessary) before request the community to review your PR`.
   
   - [x] Make sure there is a [Github 
issue](https://github.com/apache/rocketmq-operator/issues) filed for the change 
(usually before you start working on it). Trivial changes like typos do not 
require a Github issue. Your pull request should address just this issue, 
without pulling in other changes - one PR resolves one issue. 
   - [x] Format the pull request title like `[ISSUE #123] Fix UnknownException 
when host config not exist`. Each commit in the pull request should have a 
meaningful subject line and body.
   - [x] Write a pull request description that is detailed enough to understand 
what the pull request does, how, and why.
   - [x] Check RBAC rights for Kubernetes roles.
   - [ ] Write necessary unit-test to verify your logic correction, more mock a 
little better when cross module dependency exist. 
   - [ ] Run `make docker-build` to build docker image for operator, try your 
changes from Pod inside your Kubernetes cluster, **not just locally**. Also 
provide screenshots to show that the RocketMQ cluster is healthy after the 
changes. 
   - [ ] Before committing your changes, remember to run `make manifests` to 
make sure the CRD files are updated. 
   - [ ] Update documentation if necessary.
   - [ ] If this contribution is large, please file an [Apache Individual 
Contributor License Agreement](http://www.apache.org/licenses/#clas).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] [Bug] Bug title [rocketmq]

2023-12-03 Thread via GitHub


smallemployees opened a new issue, #7610:
URL: https://github.com/apache/rocketmq/issues/7610

   ### Before Creating the Bug Report
   
   - [X] I found a bug, not just asking a question, which should be created in 
[GitHub Discussions](https://github.com/apache/rocketmq/discussions).
   
   - [X] I have searched the [GitHub 
Issues](https://github.com/apache/rocketmq/issues) and [GitHub 
Discussions](https://github.com/apache/rocketmq/discussions)  of this 
repository and believe that this is not a duplicate.
   
   - [X] I have confirmed that this bug belongs to the current repository, not 
other repositories of RocketMQ.
   
   
   ### Runtime platform environment
   
   CentOS Linux release 7.9.2009
   
   ### RocketMQ version
   
   4.9.2
   
   ### JDK Version
   
   1.8.0
   
   ### Describe the Bug
   
   Broker Role=SLAVE and flushDiskType=ASYNC_ When both FLUSH and 
transientStorePoolEnable=true are configured simultaneously, a system busy 
request is reported
   
   ### Steps to Reproduce
   
   
org.apache.rocketmq.store.DefaultMessageStore.DefaultMessageStore(MessageStoreConfig
 messageStoreConfig, BrokerStatsManager brokerStatsManager, 
MessageArrivingListener messageArrivingListener, BrokerConfig brokerConfig) 
throws IOException
   if (messageStoreConfig.isTransientStorePoolEnable()) {
   this.transientStorePool.init();
   }
   
   boolean 
org.apache.rocketmq.store.config.MessageStoreConfig.isTransientStorePoolEnable()
   public boolean isTransientStorePoolEnable() {
   return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == 
getFlushDiskType()
   && BrokerRole.SLAVE != getBrokerRole();
   }
   
   ### What Did You Expect to See?
   
   Can the initialization of dLeger mode remove the brokerRole
   
   ### What Did You See Instead?
   
   Removing broker Role is normal
   
   ### Additional Context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GH] (rocketmq): Workflow run "Build and Run Tests by Maven" is working again!

2023-12-03 Thread GitBox


The GitHub Actions job "Build and Run Tests by Maven" on rocketmq.git has 
succeeded.
Run started by GitHub user lizhimins (triggered by lizhimins).

Head commit for run:
956fdd8a32359344104851d15edaa79c17873ef4 / lizhimins <707364...@qq.com>
[ISSUE #7585] Support message filtering in rocketmq tiered storage

Report URL: https://github.com/apache/rocketmq/actions/runs/7029081773

With regards,
GitHub Actions via GitBox



Re: [PR] [ISSUE #1112] feat: optimize producer send async [rocketmq-client-go]

2023-12-03 Thread via GitHub


ShannonDing merged PR #:
URL: https://github.com/apache/rocketmq-client-go/pull/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(rocketmq-client-go) branch master updated: [ISSUE #1112] feat: optimize producer send async (#1111)

2023-12-03 Thread dinglei
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 7ffb599  [ISSUE #1112] feat: optimize producer send async (#)
7ffb599 is described below

commit 7ffb599169034023d9f14e38c7599dc62be1f140
Author: WeizhongTu 
AuthorDate: Mon Dec 4 11:12:31 2023 +0800

[ISSUE #1112] feat: optimize producer send async (#)

* feat: optimize producer send async

* fix: fix mq override bug
---
 internal/remote/remote_client.go | 25 +
 producer/producer.go |  2 +-
 2 files changed, 14 insertions(+), 13 deletions(-)

diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index 36fbea7..c0ef6ce 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -112,23 +112,24 @@ func (c *remotingClient) InvokeSync(ctx context.Context, 
addr string, request *R
 
 // InvokeAsync send request without blocking, just return immediately.
 func (c *remotingClient) InvokeAsync(ctx context.Context, addr string, request 
*RemotingCommand, callback func(*ResponseFuture)) error {
-   conn, err := c.connect(ctx, addr)
-   if err != nil {
-   return err
-   }
-
resp := NewResponseFuture(ctx, request.Opaque, callback)
c.responseTable.Store(resp.Opaque, resp)
 
-   err = c.sendRequest(ctx, conn, request)
-   if err != nil {
-   c.responseTable.Delete(request.Opaque)
-   return err
-   }
-
go primitive.WithRecover(func() {
+   defer resp.executeInvokeCallback()
+   defer c.responseTable.Delete(request.Opaque)
+
+   conn, err := c.connect(ctx, addr)
+   if err != nil {
+   resp.Err = err
+   return
+   }
+   err = c.sendRequest(ctx, conn, request)
+   if err != nil {
+   resp.Err = err
+   return
+   }
c.receiveAsync(resp)
-   c.responseTable.Delete(request.Opaque)
})
 
return nil
diff --git a/producer/producer.go b/producer/producer.go
index 70e8d01..eb3cd2e 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -329,7 +329,7 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg 
*primitive.Message,
if mq != nil {
lastBrokerName = mq.BrokerName
}
-   mq := p.selectMessageQueue(msg, lastBrokerName)
+   mq = p.selectMessageQueue(msg, lastBrokerName)
if mq == nil {
err = fmt.Errorf("the topic=%s route info not found", 
msg.Topic)
continue



Re: [PR] [ISSUE #1114] fix: response future should close channel before callback [rocketmq-client-go]

2023-12-03 Thread via GitHub


ShannonDing merged PR #1113:
URL: https://github.com/apache/rocketmq-client-go/pull/1113


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(rocketmq-client-go) branch master updated: fix: response future should close channel before callback (#1113)

2023-12-03 Thread dinglei
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 3f9c59f  fix: response future should close channel before callback 
(#1113)
3f9c59f is described below

commit 3f9c59fdf5d4cef0994939228f40886517d42d6f
Author: WeizhongTu 
AuthorDate: Mon Dec 4 11:13:16 2023 +0800

fix: response future should close channel before callback (#1113)
---
 internal/remote/remote_client.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index c0ef6ce..d433b8d 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -231,10 +231,10 @@ func (c *remotingClient) processCMD(cmd *RemotingCommand, 
r *tcpConnWrapper) {
responseFuture := resp.(*ResponseFuture)
go primitive.WithRecover(func() {
responseFuture.ResponseCommand = cmd
-   responseFuture.executeInvokeCallback()
if responseFuture.Done != nil {
close(responseFuture.Done)
}
+   responseFuture.executeInvokeCallback()
})
}
} else {



Re: [PR] fix WithRecover 无法recover问题 [rocketmq-client-go]

2023-12-03 Thread via GitHub


twz915 commented on PR #1074:
URL: 
https://github.com/apache/rocketmq-client-go/pull/1074#issuecomment-1837775085

   Previously, users could reassign the PanicHandler to their own handler, but 
now they can’t do it.
   Please keep the code compatible, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Support Timing Messages with Arbitrary Time Delay.(#83) [rocketmq-apis]

2023-12-03 Thread via GitHub


echooymxq commented on PR #84:
URL: https://github.com/apache/rocketmq-apis/pull/84#issuecomment-1837784542

   @lizhanhui Sorry for late reply, really respect your wrok. As I understand 
it, like (JMS(ActiveMQ), the delivery_delay should be send to broker and the 
broker calculated the actually delivery_time. I don't know if I'm understanding 
you correctly, you mean like pulsar? pulsar support `deliverAt` and 
`deliverAfter`, the actual delivery time is calculated by the client.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [ISSUE #7545] Fix set mapped file to null cause file can not destroy [rocketmq]

2023-12-03 Thread via GitHub


lizhimins opened a new pull request, #7612:
URL: https://github.com/apache/rocketmq/pull/7612

   
   
   ### Which Issue(s) This PR Fixes
   
   
   
   Fixes #7545, Fix set mapped file to null cause file can not destroy in 
tiered storage index
   
   ### Brief Description
   
   
   
   ### How Did You Test This Change?
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GH] (rocketmq): Workflow run "E2E test for pull request" failed!

2023-12-03 Thread GitBox


The GitHub Actions job "E2E test for pull request" on rocketmq.git has failed.
Run started by GitHub user lizhimins (triggered by lizhimins).

Head commit for run:
65faea22fd54fd9875f2ca9d3088b4dc46d31cce / keranbingaa <397294...@qq.com>
[ISSUE #7534] Use high performance concurrent set to replace copyonwriteset 
(#7583)

* fix ISSUE #7534

* reformat code

* Remove the useless unit test

-

Co-authored-by: RongtongJin 

Report URL: https://github.com/apache/rocketmq/actions/runs/7081741202

With regards,
GitHub Actions via GitBox



Re: [PR] [ISSUE #7545] Fix set mapped file to null cause file can not destroy [rocketmq]

2023-12-03 Thread via GitHub


codecov-commenter commented on PR #7612:
URL: https://github.com/apache/rocketmq/pull/7612#issuecomment-1837805978

   ## 
[Codecov](https://app.codecov.io/gh/apache/rocketmq/pull/7612?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 Report
   Attention: `3 lines` in your changes are missing coverage. Please review.
   > Comparison is base 
[(`65faea2`)](https://app.codecov.io/gh/apache/rocketmq/commit/65faea22fd54fd9875f2ca9d3088b4dc46d31cce?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 43.19% compared to head 
[(`c9c80a5`)](https://app.codecov.io/gh/apache/rocketmq/pull/7612?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 43.22%.
   
   | 
[Files](https://app.codecov.io/gh/apache/rocketmq/pull/7612?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | Patch % | Lines |
   |---|---|---|
   | 
[.../rocketmq/tieredstore/index/IndexStoreService.java](https://app.codecov.io/gh/apache/rocketmq/pull/7612?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-dGllcmVkc3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3RpZXJlZHN0b3JlL2luZGV4L0luZGV4U3RvcmVTZXJ2aWNlLmphdmE=)
 | 50.00% | [2 Missing and 1 partial :warning: 
](https://app.codecov.io/gh/apache/rocketmq/pull/7612?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 |
   
   Additional details and impacted files
   
   
   ```diff
   @@  Coverage Diff  @@
   ## develop#7612  +/-   ##
   =
   + Coverage  43.19%   43.22%   +0.02% 
   - Complexity  9782 9792  +10 
   =
 Files   1162 1162  
 Lines  8437884382   +4 
 Branches   1095510956   +1 
   =
   + Hits   3645136477  +26 
   + Misses 4339743374  -23 
   - Partials4530 4531   +1 
   ```
   
   
   
   
   
   [:umbrella: View full report in Codecov by 
Sentry](https://app.codecov.io/gh/apache/rocketmq/pull/7612?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   
   :loudspeaker: Have feedback on the report? [Share it 
here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GH] (rocketmq): Workflow run "Build and Run Tests by Maven" failed!

2023-12-03 Thread GitBox


The GitHub Actions job "Build and Run Tests by Maven" on rocketmq.git has 
failed.
Run started by GitHub user lizhimins (triggered by lizhimins).

Head commit for run:
c9c80a5d49d57da0e9de6006d16679ec9b986a05 / lizhimins <707364...@qq.com>
[ISSUE #7545] Fix set mapped file to null cause file can not destroy

Report URL: https://github.com/apache/rocketmq/actions/runs/7081726564

With regards,
GitHub Actions via GitBox



Re: [PR] Delete logback1.2.3 version conflict [rocketmq-mqtt]

2023-12-03 Thread via GitHub


pingww merged PR #227:
URL: https://github.com/apache/rocketmq-mqtt/pull/227


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(rocketmq-mqtt) branch develop updated: delete logback1.2.3 version conflict

2023-12-03 Thread pingww
This is an automated email from the ASF dual-hosted git repository.

pingww pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git


The following commit(s) were added to refs/heads/develop by this push:
 new f66ecb0  delete logback1.2.3 version conflict
 new 0d51d3a  Merge pull request #227 from 
EssinZhang/delete-logback-conflict
f66ecb0 is described below

commit f66ecb08dd46106adab62de1f5d519dd1fbfbbc9
Author: EssinZhang 
AuthorDate: Wed Nov 29 21:43:52 2023 +0800

delete logback1.2.3 version conflict
---
 pom.xml | 5 -
 1 file changed, 5 deletions(-)

diff --git a/pom.xml b/pom.xml
index 7e9ac4e..c99cc65 100644
--- a/pom.xml
+++ b/pom.xml
@@ -127,11 +127,6 @@
 slf4j-api
 1.7.15
 
-
-ch.qos.logback
-logback-classic
-1.2.3
-
 
 ch.qos.logback
 logback-classic



Re: [I] 项目升级rocketmq-client 5.1.4版本后,org.apache.rocketmq.mqtt.ds.store.LmqQueueStoreManager#maxOffset 方法,报throw new MQClientException("The broker[" + queue.getBrokerName() + "] not exist", null); [roc

2023-12-03 Thread via GitHub


DongyuanPan commented on issue #228:
URL: https://github.com/apache/rocketmq-mqtt/issues/228#issuecomment-1837899458

   下面这个逻辑也没走到吗?
   
![image](https://github.com/apache/rocketmq-mqtt/assets/18721508/47af0495-3af1-4a4a-9424-5ee54a1cb8f6)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [ISSUE #103] Fix seek offset and deserialize bug in newSource [rocketmq-flink]

2023-12-03 Thread via GitHub


lizhimins merged PR #104:
URL: https://github.com/apache/rocketmq-flink/pull/104


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(rocketmq-flink) branch main updated: [ISSUE #103] Fix seek offset and deserialize bug in newSource (#104)

2023-12-03 Thread lizhimin
This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git


The following commit(s) were added to refs/heads/main by this push:
 new 1576ae2  [ISSUE #103] Fix seek offset and deserialize bug in newSource 
(#104)
1576ae2 is described below

commit 1576ae2d4d51604a464df0fb21074f67c9e50c0f
Author: hejunjie <844028...@qq.com>
AuthorDate: Mon Dec 4 14:34:52 2023 +0800

[ISSUE #103] Fix seek offset and deserialize bug in newSource (#104)
---
 pom.xml |  1 +
 .../flink/connector/rocketmq/source/InnerConsumerImpl.java  | 13 ++---
 .../enumerator/RocketMQSourceEnumStateSerializer.java   |  5 ++---
 .../source/enumerator/RocketMQSourceEnumerator.java | 11 ++-
 .../rocketmq/source/reader/RocketMQSplitReader.java |  3 ---
 5 files changed, 19 insertions(+), 14 deletions(-)

diff --git a/pom.xml b/pom.xml
index 67f322d..2eddd1a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,6 +46,7 @@
 4.13.2
 5.9.2
 1.7.4
+2.3.1
 
 
 
diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java
index edc438b..317031f 100644
--- 
a/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.connector.rocketmq.source;
 
+import com.alibaba.fastjson.JSON;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.connector.rocketmq.source.enumerator.offset.OffsetsSelector;
@@ -25,8 +26,6 @@ import 
org.apache.flink.connector.rocketmq.source.reader.MessageViewExt;
 import org.apache.flink.connector.rocketmq.source.util.UtilAll;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.StringUtils;
-
-import com.alibaba.fastjson.JSON;
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
@@ -240,7 +239,15 @@ public class InnerConsumerImpl implements InnerConsumer {
 long offset =
 consumer.getOffsetStore()
 .readOffset(messageQueue, 
ReadOffsetType.READ_FROM_STORE);
-LOG.error(
+
+if (offset == -1) {
+offset = adminExt.minOffset(messageQueue);
+LOG.info(
+"Consumer seek committed offset from 
remote, offset=-1,mq={},use minOffset={}",
+UtilAll.getQueueDescription(messageQueue),
+offset);
+}
+LOG.info(
 "Consumer seek committed offset from remote, 
mq={}, offset={}",
 UtilAll.getQueueDescription(messageQueue),
 offset);
diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java
index 7589ba4..805df1b 100644
--- 
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java
@@ -18,9 +18,8 @@
 
 package org.apache.flink.connector.rocketmq.source.enumerator;
 
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-
 import com.alibaba.fastjson.JSON;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -93,7 +92,7 @@ public class RocketMQSourceEnumStateSerializer
 String topic = in.readUTF();
 int queueId = in.readInt();
 
-MessageQueue queue = new MessageQueue(brokerName, topic, 
queueId);
+MessageQueue queue = new MessageQueue(topic, brokerName, 
queueId);
 result.add(queue);
 }
 
diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java
 
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java
index 6103444..77c0c33 100644
--- 
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java
+++ 
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnu

[GH] (rocketmq): Workflow run "Build and Run Tests by Maven" failed!

2023-12-03 Thread GitBox


The GitHub Actions job "Build and Run Tests by Maven" on rocketmq.git has 
failed.
Run started by GitHub user lizhimins (triggered by lollipopjin).

Head commit for run:
c9c80a5d49d57da0e9de6006d16679ec9b986a05 / lizhimins <707364...@qq.com>
[ISSUE #7545] Fix set mapped file to null cause file can not destroy

Report URL: https://github.com/apache/rocketmq/actions/runs/7081726564

With regards,
GitHub Actions via GitBox



Re: [PR] [ISSUE #7585] Support message filtering in rocketmq tiered storage [rocketmq]

2023-12-03 Thread via GitHub


lollipopjin commented on code in PR #7594:
URL: https://github.com/apache/rocketmq/pull/7594#discussion_r1413322104


##
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java:
##
@@ -303,74 +270,94 @@ public CompletableFuture 
getMessageFromCacheAsync(CompositeQue
 
 recordCacheAccess(flatFile, group, queueOffset, resultWrapperList);
 
-// if cache hit, result will be returned immediately and 
asynchronously prefetch messages for later requests
-if (!resultWrapperList.isEmpty()) {
-LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: cache hit: 
" +
-"topic: {}, queue: {}, queue offset: {}, max message num: 
{}, cache hit num: {}",
-mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, 
resultWrapperList.size());
-prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1);
+if (resultWrapperList.isEmpty()) {
+// If cache miss, pull messages immediately
+LOGGER.info("MessageFetcher cache miss, group: {}, topic: {}, 
queueId: {}, offset: {}, maxCount: {}",
+group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
+} else {
+// If cache hit, return buffer result immediately and 
asynchronously prefetch messages
+LOGGER.debug("MessageFetcher cache hit, group: {}, topic: {}, 
queueId: {}, offset: {}, maxCount: {}, resultSize: {}",
+group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, 
resultWrapperList.size());
 
-GetMessageResult result = new GetMessageResult();
+GetMessageResultExt result = new GetMessageResultExt();
 result.setStatus(GetMessageStatus.FOUND);
 result.setMinOffset(flatFile.getConsumeQueueMinOffset());
 result.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
 result.setNextBeginOffset(queueOffset + resultWrapperList.size());
-resultWrapperList.forEach(wrapper -> 
result.addMessage(wrapper.getDuplicateResult(), wrapper.getCurOffset()));
+resultWrapperList.forEach(wrapper -> result.addMessageExt(
+wrapper.getDuplicateResult(), wrapper.getOffset(), 
wrapper.getTagCode()));
+
+if (lastGetOffset < result.getMaxOffset()) {
+this.prefetchMessage(flatFile, group, maxCount, lastGetOffset 
+ 1);
+}
 return CompletableFuture.completedFuture(result);
 }
 
-// if cache is miss, immediately pull messages
-LOGGER.info("TieredMessageFetcher#getMessageFromCacheAsync: cache 
miss: " +
-"topic: {}, queue: {}, queue offset: {}, max message num: {}",
-mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
-
-CompletableFuture resultFuture;
+CompletableFuture resultFuture;
 synchronized (flatFile) {
 int batchSize = maxCount * storeConfig.getReadAheadMinFactor();
 resultFuture = getMessageFromTieredStoreAsync(flatFile, 
queueOffset, batchSize)
-.thenApplyAsync(result -> {
+.thenApply(result -> {
 if (result.getStatus() != GetMessageStatus.FOUND) {
 return result;
 }
-GetMessageResult newResult = new GetMessageResult();
-newResult.setStatus(GetMessageStatus.FOUND);
-
newResult.setMinOffset(flatFile.getConsumeQueueMinOffset());
-
newResult.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
 
+GetMessageResultExt newResult = new GetMessageResultExt();
 List offsetList = result.getMessageQueueOffset();
+List tagCodeList = result.getTagCodeList();
 List msgList = 
result.getMessageMapedList();
-Long minOffset = offsetList.get(0);
-Long maxOffset = offsetList.get(offsetList.size() - 1);
-int size = offsetList.size();
+
 for (int i = 0; i < offsetList.size(); i++) {
-Long offset = offsetList.get(i);
 SelectMappedBufferResult msg = msgList.get(i);
-// put message into cache
-SelectMappedBufferResultWrapper resultWrapper = 
putMessageToCache(flatFile, offset, msg, minOffset, maxOffset, size, true);
-// try to meet maxCount
+SelectBufferResultWrapper bufferResult = new 
SelectBufferResultWrapper(
+msg, offsetList.get(i), tagCodeList.get(i), true);
+this.putMessageToCache(flatFile, bufferResult);
 if (newResult.getMessageMapedList().size() < maxCount) 
{
-
newResult.addMessage(resultWrapper.getDuplicateResult(), offset);
+

[GH] (rocketmq): Workflow run "Build and Run Tests by Maven" failed!

2023-12-03 Thread GitBox


The GitHub Actions job "Build and Run Tests by Maven" on rocketmq.git has 
failed.
Run started by GitHub user lizhimins (triggered by lizhimins).

Head commit for run:
c9c80a5d49d57da0e9de6006d16679ec9b986a05 / lizhimins <707364...@qq.com>
[ISSUE #7545] Fix set mapped file to null cause file can not destroy

Report URL: https://github.com/apache/rocketmq/actions/runs/7081726564

With regards,
GitHub Actions via GitBox



Re: [PR] [ISSUE #7585] Support message filtering in rocketmq tiered storage [rocketmq]

2023-12-03 Thread via GitHub


lizhimins commented on code in PR #7594:
URL: https://github.com/apache/rocketmq/pull/7594#discussion_r1413454902


##
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java:
##
@@ -303,74 +270,94 @@ public CompletableFuture 
getMessageFromCacheAsync(CompositeQue
 
 recordCacheAccess(flatFile, group, queueOffset, resultWrapperList);
 
-// if cache hit, result will be returned immediately and 
asynchronously prefetch messages for later requests
-if (!resultWrapperList.isEmpty()) {
-LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: cache hit: 
" +
-"topic: {}, queue: {}, queue offset: {}, max message num: 
{}, cache hit num: {}",
-mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, 
resultWrapperList.size());
-prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1);
+if (resultWrapperList.isEmpty()) {
+// If cache miss, pull messages immediately
+LOGGER.info("MessageFetcher cache miss, group: {}, topic: {}, 
queueId: {}, offset: {}, maxCount: {}",
+group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
+} else {
+// If cache hit, return buffer result immediately and 
asynchronously prefetch messages
+LOGGER.debug("MessageFetcher cache hit, group: {}, topic: {}, 
queueId: {}, offset: {}, maxCount: {}, resultSize: {}",
+group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, 
resultWrapperList.size());
 
-GetMessageResult result = new GetMessageResult();
+GetMessageResultExt result = new GetMessageResultExt();
 result.setStatus(GetMessageStatus.FOUND);
 result.setMinOffset(flatFile.getConsumeQueueMinOffset());
 result.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
 result.setNextBeginOffset(queueOffset + resultWrapperList.size());
-resultWrapperList.forEach(wrapper -> 
result.addMessage(wrapper.getDuplicateResult(), wrapper.getCurOffset()));
+resultWrapperList.forEach(wrapper -> result.addMessageExt(
+wrapper.getDuplicateResult(), wrapper.getOffset(), 
wrapper.getTagCode()));
+
+if (lastGetOffset < result.getMaxOffset()) {
+this.prefetchMessage(flatFile, group, maxCount, lastGetOffset 
+ 1);
+}
 return CompletableFuture.completedFuture(result);
 }
 
-// if cache is miss, immediately pull messages
-LOGGER.info("TieredMessageFetcher#getMessageFromCacheAsync: cache 
miss: " +
-"topic: {}, queue: {}, queue offset: {}, max message num: {}",
-mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
-
-CompletableFuture resultFuture;
+CompletableFuture resultFuture;
 synchronized (flatFile) {
 int batchSize = maxCount * storeConfig.getReadAheadMinFactor();
 resultFuture = getMessageFromTieredStoreAsync(flatFile, 
queueOffset, batchSize)
-.thenApplyAsync(result -> {
+.thenApply(result -> {
 if (result.getStatus() != GetMessageStatus.FOUND) {
 return result;
 }
-GetMessageResult newResult = new GetMessageResult();
-newResult.setStatus(GetMessageStatus.FOUND);
-
newResult.setMinOffset(flatFile.getConsumeQueueMinOffset());
-
newResult.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
 
+GetMessageResultExt newResult = new GetMessageResultExt();
 List offsetList = result.getMessageQueueOffset();
+List tagCodeList = result.getTagCodeList();
 List msgList = 
result.getMessageMapedList();
-Long minOffset = offsetList.get(0);
-Long maxOffset = offsetList.get(offsetList.size() - 1);
-int size = offsetList.size();
+
 for (int i = 0; i < offsetList.size(); i++) {
-Long offset = offsetList.get(i);
 SelectMappedBufferResult msg = msgList.get(i);
-// put message into cache
-SelectMappedBufferResultWrapper resultWrapper = 
putMessageToCache(flatFile, offset, msg, minOffset, maxOffset, size, true);
-// try to meet maxCount
+SelectBufferResultWrapper bufferResult = new 
SelectBufferResultWrapper(
+msg, offsetList.get(i), tagCodeList.get(i), true);
+this.putMessageToCache(flatFile, bufferResult);
 if (newResult.getMessageMapedList().size() < maxCount) 
{
-
newResult.addMessage(resultWrapper.getDuplicateResult(), offset);
+  

Re: [PR] [ISSUE #7585] Support message filtering in rocketmq tiered storage [rocketmq]

2023-12-03 Thread via GitHub


lizhimins commented on code in PR #7594:
URL: https://github.com/apache/rocketmq/pull/7594#discussion_r1413454902


##
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java:
##
@@ -303,74 +270,94 @@ public CompletableFuture 
getMessageFromCacheAsync(CompositeQue
 
 recordCacheAccess(flatFile, group, queueOffset, resultWrapperList);
 
-// if cache hit, result will be returned immediately and 
asynchronously prefetch messages for later requests
-if (!resultWrapperList.isEmpty()) {
-LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: cache hit: 
" +
-"topic: {}, queue: {}, queue offset: {}, max message num: 
{}, cache hit num: {}",
-mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, 
resultWrapperList.size());
-prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1);
+if (resultWrapperList.isEmpty()) {
+// If cache miss, pull messages immediately
+LOGGER.info("MessageFetcher cache miss, group: {}, topic: {}, 
queueId: {}, offset: {}, maxCount: {}",
+group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
+} else {
+// If cache hit, return buffer result immediately and 
asynchronously prefetch messages
+LOGGER.debug("MessageFetcher cache hit, group: {}, topic: {}, 
queueId: {}, offset: {}, maxCount: {}, resultSize: {}",
+group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, 
resultWrapperList.size());
 
-GetMessageResult result = new GetMessageResult();
+GetMessageResultExt result = new GetMessageResultExt();
 result.setStatus(GetMessageStatus.FOUND);
 result.setMinOffset(flatFile.getConsumeQueueMinOffset());
 result.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
 result.setNextBeginOffset(queueOffset + resultWrapperList.size());
-resultWrapperList.forEach(wrapper -> 
result.addMessage(wrapper.getDuplicateResult(), wrapper.getCurOffset()));
+resultWrapperList.forEach(wrapper -> result.addMessageExt(
+wrapper.getDuplicateResult(), wrapper.getOffset(), 
wrapper.getTagCode()));
+
+if (lastGetOffset < result.getMaxOffset()) {
+this.prefetchMessage(flatFile, group, maxCount, lastGetOffset 
+ 1);
+}
 return CompletableFuture.completedFuture(result);
 }
 
-// if cache is miss, immediately pull messages
-LOGGER.info("TieredMessageFetcher#getMessageFromCacheAsync: cache 
miss: " +
-"topic: {}, queue: {}, queue offset: {}, max message num: {}",
-mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
-
-CompletableFuture resultFuture;
+CompletableFuture resultFuture;
 synchronized (flatFile) {
 int batchSize = maxCount * storeConfig.getReadAheadMinFactor();
 resultFuture = getMessageFromTieredStoreAsync(flatFile, 
queueOffset, batchSize)
-.thenApplyAsync(result -> {
+.thenApply(result -> {
 if (result.getStatus() != GetMessageStatus.FOUND) {
 return result;
 }
-GetMessageResult newResult = new GetMessageResult();
-newResult.setStatus(GetMessageStatus.FOUND);
-
newResult.setMinOffset(flatFile.getConsumeQueueMinOffset());
-
newResult.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
 
+GetMessageResultExt newResult = new GetMessageResultExt();
 List offsetList = result.getMessageQueueOffset();
+List tagCodeList = result.getTagCodeList();
 List msgList = 
result.getMessageMapedList();
-Long minOffset = offsetList.get(0);
-Long maxOffset = offsetList.get(offsetList.size() - 1);
-int size = offsetList.size();
+
 for (int i = 0; i < offsetList.size(); i++) {
-Long offset = offsetList.get(i);
 SelectMappedBufferResult msg = msgList.get(i);
-// put message into cache
-SelectMappedBufferResultWrapper resultWrapper = 
putMessageToCache(flatFile, offset, msg, minOffset, maxOffset, size, true);
-// try to meet maxCount
+SelectBufferResultWrapper bufferResult = new 
SelectBufferResultWrapper(
+msg, offsetList.get(i), tagCodeList.get(i), true);
+this.putMessageToCache(flatFile, bufferResult);
 if (newResult.getMessageMapedList().size() < maxCount) 
{
-
newResult.addMessage(resultWrapper.getDuplicateResult(), offset);
+  

Re: [PR] [ISSUE #7263& ISSUE #7264]Fix to delete synchronously the consumption offset data of the subscription group on the standby node, when execute command 'mqadmin deleteSubGroup -b xxx -g xxx -r

2023-12-03 Thread via GitHub


frinda commented on PR #7273:
URL: https://github.com/apache/rocketmq/pull/7273#issuecomment-1837987681

   > I think the title might be too long, suggest renaming `[ISSUE xx& ISSUE 
xxx] Fix: xxx`
   
   It's been changed. Please submit it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] golang: optimize the logic of obtaining routing tasks [rocketmq-clients]

2023-12-03 Thread via GitHub


leizhiyuan commented on PR #580:
URL: https://github.com/apache/rocketmq-clients/pull/580#issuecomment-1837994026

   After running for a period of time, the consumer may encounter a timeout in 
the request routing table due to network fluctuations or other abnormal 
reasons. In previous versions, an error message "messageQueues is empty" would 
occur and it seems unable to recover. If you encounter this issue, please 
upgrade to this version.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [ISSUE #7263& ISSUE #7264]Fix to delete synchronously the consumption offset data of the subscription group on the standby node, when execute command 'mqadmin deleteSubGroup -b xxx -g xxx -r

2023-12-03 Thread via GitHub


iamgd67 commented on PR #7273:
URL: https://github.com/apache/rocketmq/pull/7273#issuecomment-1837998805

   > > I think the title might be too long, suggest renaming `[ISSUE xx& ISSUE 
xxx] Fix: xxx`
   > 
   > @ joeCarf It's been changed. Please submit it,Thanks
   
   still too long, maybe this? 
   
   > [ISSUE #7263& ISSUE #7264] consumption offset data  and topic 
consumerqueue not deleted  on the standby node


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] [Bug] Consumer still consume new message but the offset of two queues always keep the old value [rocketmq]

2023-12-03 Thread via GitHub


tongtaodragon opened a new issue, #7613:
URL: https://github.com/apache/rocketmq/issues/7613

   ### Before Creating the Bug Report
   
   - [X] I found a bug, not just asking a question, which should be created in 
[GitHub Discussions](https://github.com/apache/rocketmq/discussions).
   
   - [X] I have searched the [GitHub 
Issues](https://github.com/apache/rocketmq/issues) and [GitHub 
Discussions](https://github.com/apache/rocketmq/discussions)  of this 
repository and believe that this is not a duplicate.
   
   - [X] I have confirmed that this bug belongs to the current repository, not 
other repositories of RocketMQ.
   
   
   ### Runtime platform environment
   
   Linux 
   
   ### RocketMQ version
   
   Broker: 4.8.0
   Client: 4.9.3
   
   ### JDK Version
   
   java version "1.8.0_121"
   
   ### Describe the Bug
   
   A topic has 4 brokers and 16 queues totally. One consumer group has two 
instance.
   After the issue observed, find the cg lag keeps growing. We identify two 
queues in one broker have the issue. The offset of these two queues has no 
change. We use tcpdump check the message sent from client to broker and found 
the offset sent from client always use the old value. Then we check the dump 
file of client and find the offset in dump of these two queues use old value. 
At the same time we find consumer still consume message successfully from all 
queues.
   
   Since it's production environment we use admin tool reset offset. Now the 
problem has gone.
   
   ### Steps to Reproduce
   
   Don't know how to reproduce it currently.
   
   Some clues
   1. Our client started in docker container.
   2. Due to node issue we have to restart all nodes one by one.
   3. The message of that offset was consumed by one old container but this 
message is still pulled by new started container.
   4. We use pushConsumer model.
   5. Other 14 queues offset is normal
   
   ### What Did You Expect to See?
   
   No lag and offset is normal
   
   ### What Did You See Instead?
   
   Lag keep growing and offset maintain old value
   
   ### Additional Context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org