Re: [I] [Enhancement] Add more test coverage for MqClientAdminImpl [rocketmq]

2024-07-10 Thread via GitHub


drpmma closed issue #8375: [Enhancement] Add more test coverage for 
MqClientAdminImpl
URL: https://github.com/apache/rocketmq/issues/8375


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [ISSUE #8375] Add more test coverage for MqClientAdminImpl [rocketmq]

2024-07-10 Thread via GitHub


drpmma merged PR #8376:
URL: https://github.com/apache/rocketmq/pull/8376


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(rocketmq) branch develop updated: [ISSUE #8375] Add more test coverage for MqClientAdminImpl (#8376)

2024-07-10 Thread zhouxzhan
This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
 new 67ddc1dae0 [ISSUE #8375] Add more test coverage for MqClientAdminImpl 
(#8376)
67ddc1dae0 is described below

commit 67ddc1dae0548a5a2e413a2c7ef155b39a64f2b6
Author: yx9o 
AuthorDate: Wed Jul 10 15:23:48 2024 +0800

[ISSUE #8375] Add more test coverage for MqClientAdminImpl (#8376)

* Add more test coverage for MqClientAdminImpl
---
 .../client/impl/admin/MqClientAdminImplTest.java   | 559 +
 1 file changed, 559 insertions(+)

diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/admin/MqClientAdminImplTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/impl/admin/MqClientAdminImplTest.java
new file mode 100644
index 00..71682fb52c
--- /dev/null
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/admin/MqClientAdminImplTest.java
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.admin;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
+import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
+import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
+import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.remoting.protocol.body.GroupList;
+import org.apache.rocketmq.remoting.protocol.body.QueryConsumeTimeSpanBody;
+import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan;
+import org.apache.rocketmq.remoting.protocol.body.ResetOffsetBody;
+import org.apache.rocketmq.remoting.protocol.body.TopicList;
+import 
org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.DeleteSubscriptionGroupRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.DeleteTopicRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetTopicStatsInfoRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.QueryConsumeTimeSpanRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.QueryMessageRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.QuerySubscriptionByConsumerRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.QueryTopicConsumeByWhoRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.QueryTopicsByConsumerRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.ResetOffsetRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.ViewMessageRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteKVConfigRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteTopicFromNamesrvRequestHeader;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
+import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.junit.Before;
+imp

[GH] (rocketmq): Workflow run "PUSH-CI" failed!

2024-07-10 Thread GitBox


The GitHub Actions job "PUSH-CI" on rocketmq.git has failed.
Run started by GitHub user drpmma (triggered by drpmma).

Head commit for run:
67ddc1dae0548a5a2e413a2c7ef155b39a64f2b6 / yx9o 
[ISSUE #8375] Add more test coverage for MqClientAdminImpl (#8376)

* Add more test coverage for MqClientAdminImpl

Report URL: https://github.com/apache/rocketmq/actions/runs/9869934315

With regards,
GitHub Actions via GitBox



Re: [PR] [ISSUE #8366] Eliminate deadlocks during the client shutdown process. [rocketmq]

2024-07-10 Thread via GitHub


drpmma commented on PR #8367:
URL: https://github.com/apache/rocketmq/pull/8367#issuecomment-2219815886

   
https://github.com/apache/rocketmq/blob/32f0e8481d1f5672b8d494de7607fe5b846ffa51/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java#L479-L484


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



svn commit: r70210 - /dev/rocketmq/5.3.0-rc1/

2024-07-10 Thread lizhimin
Author: lizhimin
Date: Wed Jul 10 09:48:35 2024
New Revision: 70210

Log:
release 5.3.0-rc1

Added:
dev/rocketmq/5.3.0-rc1/
dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-bin-release.zip   (with props)
dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-bin-release.zip.asc   (with props)
dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-bin-release.zip.sha512
dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-source-release.zip   (with props)
dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-source-release.zip.asc   (with 
props)
dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-source-release.zip.sha512

Added: dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-bin-release.zip
==
Binary file - no diff available.

Propchange: dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-bin-release.zip
--
svn:mime-type = application/zip

Added: dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-bin-release.zip.asc
==
Binary file - no diff available.

Propchange: dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-bin-release.zip.asc
--
svn:mime-type = application/pgp-signature

Added: dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-bin-release.zip.sha512
==
--- dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-bin-release.zip.sha512 (added)
+++ dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-bin-release.zip.sha512 Wed Jul 10 
09:48:35 2024
@@ -0,0 +1,4 @@
+rocketmq-all-5.3.0-bin-release.zip: 11285C3A 8F734849 84398ED0 13F5FF09 
CE772F7B
+C6C78899 1EEA8C1A 3DBC4634 62BBB246 
3C19E973
+26DD0F88 733D88A1 F0CBC67D C9707457 
1820D048
+44AD12EE

Added: dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-source-release.zip
==
Binary file - no diff available.

Propchange: dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-source-release.zip
--
svn:mime-type = application/zip

Added: dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-source-release.zip.asc
==
Binary file - no diff available.

Propchange: dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-source-release.zip.asc
--
svn:mime-type = application/pgp-signature

Added: dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-source-release.zip.sha512
==
--- dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-source-release.zip.sha512 (added)
+++ dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-source-release.zip.sha512 Wed Jul 
10 09:48:35 2024
@@ -0,0 +1,4 @@
+rocketmq-all-5.3.0-source-release.zip: 089E609E 201A908A 29B377EC 775B0735
+   F6BAE026 AD5E6B7B 5F9E186E F9AA69C2
+   744435CB 9E88C677 50DEE125 2E6ED147
+   0F239AF9 77D05560 3A37E0CE 28C883B1




[I] 控制台发起重新消费报错 [rocketmq-dashboard]

2024-07-10 Thread via GitHub


jevonsnotes opened a new issue, #220:
URL: https://github.com/apache/rocketmq-dashboard/issues/220

   The issue tracker is **ONLY** used for bug report and feature request. 
   
   Any question or RocketMQ proposal please use our [mailing 
lists](http://rocketmq.apache.org/about/contact/).
   
   **BUG REPORT**
   
   1. Please describe the issue you observed:
   
   - What did you do (The steps to reproduce)?
   我修改了源码,将proxy地址作为addr传入了以下方法,请求报错不支持309类型的请求
   ```
   public ConsumeMessageDirectlyResult consumeMessageDirectly(final String 
addr,
   String consumerGroup,
   String clientId,
   String topic,
   String msgId,
   final long timeoutMillis) throws RemotingException, 
MQClientException, InterruptedException {
   ConsumeMessageDirectlyResultRequestHeader requestHeader = new 
ConsumeMessageDirectlyResultRequestHeader();
   requestHeader.setTopic(topic);
   requestHeader.setConsumerGroup(consumerGroup);
   requestHeader.setClientId(clientId);
   requestHeader.setMsgId(msgId);
   
   RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CONSUME_MESSAGE_DIRECTLY, 
requestHeader);
   
   RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
   request, timeoutMillis);
   assert response != null;
   switch (response.getCode()) {
   case ResponseCode.SUCCESS: {
   byte[] body = response.getBody();
   if (body != null) {
   ConsumeMessageDirectlyResult info = 
ConsumeMessageDirectlyResult.decode(body, ConsumeMessageDirectlyResult.class);
   return info;
   }
   }
   default:
   break;
   }
   
   throw new MQClientException(response.getCode(), 
response.getRemark());
   }
   
   ```
   - What did you expect to see?
   能够正常消费
   - What did you see instead?
   目前无法从控制台触发重新消费
   2. Please tell us about your environment:
   rocktmq 5.2.0
   
   3. Other information (e.g. detailed explanation, logs, related issues, 
suggestions how to fix, etc):
   
   **FEATURE REQUEST**
   
   1. Please describe the feature you are requesting.
   
   2. Provide any additional detail on your proposed use case for this feature.
   
   2. Indicate the importance of this issue to you (blocker, must-have, 
should-have, nice-to-have). Are you currently using any workarounds to address 
this issue?
   
   4. If there are some sub-tasks using -[] for each subtask and create a 
corresponding issue to map to the sub task:
   
   - [sub-task1-issue-number](example_sub_issue1_link_here): sub-task1 
description here, 
   - [sub-task2-issue-number](example_sub_issue2_link_here): sub-task2 
description here,
   - ...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] 无法查看消费终端 [rocketmq-dashboard]

2024-07-10 Thread via GitHub


jevonsnotes closed issue #218: 无法查看消费终端
URL: https://github.com/apache/rocketmq-dashboard/issues/218


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] 无法查看消费终端 [rocketmq-dashboard]

2024-07-10 Thread via GitHub


jevonsnotes commented on issue #218:
URL: 
https://github.com/apache/rocketmq-dashboard/issues/218#issuecomment-2220282546

   更新代码后,将VIP配置调整为false可以解决
   ```
   rocketmq:
 config:
   isVIPChannel: false
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Add release notes for apache rocketmq 5.3.0 [rocketmq-site]

2024-07-10 Thread via GitHub


lizhimins opened a new pull request, #667:
URL: https://github.com/apache/rocketmq-site/pull/667

   Please do not create a Pull Request without creating an issue first. 
   
   ## What is the purpose of the change
   
   X
   
   ## Brief changelog
   
   XX
   
   ## Verifying this change
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
   - [x] Make sure there is a Github issue filed for the change (usually before 
you start working on it). Trivial changes like typos do not require a Github 
issue. Your pull request should address just this issue, without pulling in 
other changes - one PR resolves one issue. 
   - [ ] Format the pull request title like `[ISSUE #123] Fix UnknownException 
when host config not exist`. Each commit in the pull request should have a 
meaningful subject line and body.
   - [ ] Write a pull request description that is detailed enough to understand 
what the pull request does, how, and why.
   - [ ] Write necessary unit-test to verify your logic correction, more mock a 
little better when cross module dependency exist. If the new feature or 
significant change is committed, please remember to add integration-test in 
[test module](https://github.com/apache/rocketmq/tree/master/test).
   - [ ] Run `mvn -B clean apache-rat:check findbugs:findbugs 
checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install 
-DskipITs` to make sure unit-test pass. Run `mvn clean test-compile 
failsafe:integration-test`  to make sure integration-test pass.
   - [ ] If this contribution is large, please file an [Apache Individual 
Contributor License Agreement](http://www.apache.org/licenses/#clas).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GH] (rocketmq-clients): Workflow run "Build" failed!

2024-07-10 Thread GitBox


The GitHub Actions job "Build" on rocketmq-clients.git has failed.
Run started by GitHub user tsunghanjacktsai (triggered by tsunghanjacktsai).

Head commit for run:
835c8b80d9165212d6ce96a778c7c8a5a2622880 / tsaitsung-han.tht 

Add push consumer for normal message in c# sdk

Report URL: https://github.com/apache/rocketmq-clients/actions/runs/9850194881

With regards,
GitHub Actions via GitBox



[GH] (rocketmq-clients): Workflow run "Build" failed!

2024-07-10 Thread GitBox


The GitHub Actions job "Build" on rocketmq-clients.git has failed.
Run started by GitHub user tsunghanjacktsai (triggered by tsunghanjacktsai).

Head commit for run:
3925413bd2eb996b3b966883f8fb93d9661fd457 / tsaitsung-han.tht 

Add namespace support for c# sdk

Report URL: https://github.com/apache/rocketmq-clients/actions/runs/9851055417

With regards,
GitHub Actions via GitBox



[GH] (rocketmq-clients): Workflow run "Build" failed!

2024-07-10 Thread GitBox


The GitHub Actions job "Build" on rocketmq-clients.git has failed.
Run started by GitHub user tsunghanjacktsai (triggered by tsunghanjacktsai).

Head commit for run:
0a02034b1a74ba60cab486bac3d2c64bd371464f / tsaitsung-han.tht 

Add reentrant push consumer message receiving support for c# sdk

Report URL: https://github.com/apache/rocketmq-clients/actions/runs/9850261052

With regards,
GitHub Actions via GitBox



[GH] (rocketmq-clients): Workflow run "Build" failed!

2024-07-10 Thread GitBox


The GitHub Actions job "Build" on rocketmq-clients.git has failed.
Run started by GitHub user tsunghanjacktsai (triggered by tsunghanjacktsai).

Head commit for run:
6292e81b094546d595bf9adef1498b22854eeac1 / tsaitsung-han.tht 

Add push consumer for fifo message in c# sdk

Report URL: https://github.com/apache/rocketmq-clients/actions/runs/9850695382

With regards,
GitHub Actions via GitBox



[GH] (rocketmq-clients): Workflow run "Build" failed!

2024-07-10 Thread GitBox


The GitHub Actions job "Build" on rocketmq-clients.git has failed.
Run started by GitHub user tsunghanjacktsai (triggered by tsunghanjacktsai).

Head commit for run:
cf2f696e64e74884f6c2f98b771ac56466cddc44 / tsaitsung-han.tht 

Add push consumer for fifo message in c# sdk

Report URL: https://github.com/apache/rocketmq-clients/actions/runs/9850205843

With regards,
GitHub Actions via GitBox



[GH] (rocketmq-clients): Workflow run "Build" failed!

2024-07-10 Thread GitBox


The GitHub Actions job "Build" on rocketmq-clients.git has failed.
Run started by GitHub user tsunghanjacktsai (triggered by tsunghanjacktsai).

Head commit for run:
07b513299035729a2d5c979ac9157a2235952902 / tsaitsung-han.tht 

Add namespace support for c# sdk

Report URL: https://github.com/apache/rocketmq-clients/actions/runs/9850237585

With regards,
GitHub Actions via GitBox



[GH] (rocketmq-clients): Workflow run "Build" failed!

2024-07-10 Thread GitBox


The GitHub Actions job "Build" on rocketmq-clients.git has failed.
Run started by GitHub user tsunghanjacktsai (triggered by tsunghanjacktsai).

Head commit for run:
98585a31240ede820e842605bb6cbc001de515cf / tsaitsung-han.tht 

Add push consumer for normal message in c# sdk

Report URL: https://github.com/apache/rocketmq-clients/actions/runs/9850691202

With regards,
GitHub Actions via GitBox



[GH] (rocketmq-clients): Workflow run "Build" failed!

2024-07-10 Thread GitBox


The GitHub Actions job "Build" on rocketmq-clients.git has failed.
Run started by GitHub user tsunghanjacktsai (triggered by tsunghanjacktsai).

Head commit for run:
e655b6bbf46c85b4114592849a049aaca41dac67 / tsaitsung-han.tht 

Add reentrant push consumer message receiving support for c# sdk

Report URL: https://github.com/apache/rocketmq-clients/actions/runs/9851075418

With regards,
GitHub Actions via GitBox



Re: [PR] [ISSUE #6795] feat: support to send transaction msg with lmqQueue property [rocketmq]

2024-07-10 Thread via GitHub


github-actions[bot] commented on PR #6796:
URL: https://github.com/apache/rocketmq/pull/6796#issuecomment-2221744629

   This PR is stale because it has been open for 365 days with no activity. It 
will be closed in 3 days if no further activity occurs. If you wish not to mark 
it as stale, please leave a comment in this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [ISSUE #5716] fix possible negative index value [rocketmq]

2024-07-10 Thread via GitHub


github-actions[bot] commented on PR #5718:
URL: https://github.com/apache/rocketmq/pull/5718#issuecomment-2221744665

   This PR is stale because it has been open for 365 days with no activity. It 
will be closed in 3 days if no further activity occurs. If you wish not to mark 
it as stale, please leave a comment in this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GH] (rocketmq): Workflow run "Snapshot Daily Release Automation" failed!

2024-07-10 Thread GitBox


The GitHub Actions job "Snapshot Daily Release Automation" on rocketmq.git has 
failed.
Run started by GitHub user lizhanhui (triggered by lizhanhui).

Head commit for run:
67ddc1dae0548a5a2e413a2c7ef155b39a64f2b6 / yx9o 
[ISSUE #8375] Add more test coverage for MqClientAdminImpl (#8376)

* Add more test coverage for MqClientAdminImpl

Report URL: https://github.com/apache/rocketmq/actions/runs/9883373624

With regards,
GitHub Actions via GitBox



[GH] (rocketmq-clients): Workflow run "Rust Coverage" failed!

2024-07-10 Thread GitBox


The GitHub Actions job "Rust Coverage" on rocketmq-clients.git has failed.
Run started by GitHub user lizhanhui (triggered by lizhanhui).

Head commit for run:
25dd6abed56d3df3872f8bfc0e2579085aac98df / Zhanhui Li 
chore: update README doc, reflecting we are supporting push-consumer in Rust

Signed-off-by: Zhanhui Li 

Report URL: https://github.com/apache/rocketmq-clients/actions/runs/9884284056

With regards,
GitHub Actions via GitBox



Re: [PR] [rust] implement standard pushconsumer [rocketmq-clients]

2024-07-10 Thread via GitHub


lizhanhui commented on code in PR #767:
URL: https://github.com/apache/rocketmq-clients/pull/767#discussion_r1673306010


##
rust/src/push_consumer.rs:
##
@@ -0,0 +1,1303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use mockall::automock;
+use mockall_double::double;
+use parking_lot::{Mutex, RwLock};
+use prost_types::Duration;
+use slog::Logger;
+use slog::{debug, error, info, warn};
+use std::collections::{HashMap, VecDeque};
+use std::sync::Arc;
+use tokio::select;
+use tokio::sync::{mpsc, oneshot};
+
+#[double]
+use crate::client::Client;
+use crate::conf::{BackOffRetryPolicy, ClientOption, PushConsumerOption};
+use crate::error::{ClientError, ErrorKind};
+use crate::model::common::{ClientType, ConsumeResult, FilterExpression, 
MessageQueue};
+use crate::model::message::{AckMessageEntry, MessageView};
+use crate::pb::receive_message_response::Content;
+use crate::pb::{
+AckMessageRequest, Assignment, ChangeInvisibleDurationRequest, 
QueryAssignmentRequest,
+ReceiveMessageRequest, Resource,
+};
+use crate::session::{RPCClient, Session};
+use crate::util::{build_endpoints_by_message_queue, 
build_push_consumer_settings};
+use crate::{log, pb};
+
+const OPERATION_NEW_PUSH_CONSUMER: &str = "push_consumer.new";
+const OPERATION_RECEIVE_MESSAGE: &str = "push_consumer.receive_message";
+const OPERATION_ACK_MESSAGE: &str = "push_consumer.ack_message";
+const OPERATION_START_PUSH_CONSUMER: &str = "push_consumer.start";
+const OPERATION_CHANGE_INVISIBLE_DURATION: &str = 
"push_consumer.change_invisible_duration";
+
+pub type MessageListener = Box ConsumeResult + Send + 
Sync>;

Review Comment:
   Fn requires immutability, kind of excessively strict. Use FnMut?
   Here is a good reference to read:
   https://stackoverflow.com/questions/41081240/idiomatic-callbacks-in-rust



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [rust] implement standard pushconsumer [rocketmq-clients]

2024-07-10 Thread via GitHub


lizhanhui commented on code in PR #767:
URL: https://github.com/apache/rocketmq-clients/pull/767#discussion_r1673324426


##
rust/src/push_consumer.rs:
##
@@ -0,0 +1,1303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use mockall::automock;
+use mockall_double::double;
+use parking_lot::{Mutex, RwLock};
+use prost_types::Duration;
+use slog::Logger;
+use slog::{debug, error, info, warn};
+use std::collections::{HashMap, VecDeque};
+use std::sync::Arc;
+use tokio::select;
+use tokio::sync::{mpsc, oneshot};
+
+#[double]
+use crate::client::Client;
+use crate::conf::{BackOffRetryPolicy, ClientOption, PushConsumerOption};
+use crate::error::{ClientError, ErrorKind};
+use crate::model::common::{ClientType, ConsumeResult, FilterExpression, 
MessageQueue};
+use crate::model::message::{AckMessageEntry, MessageView};
+use crate::pb::receive_message_response::Content;
+use crate::pb::{
+AckMessageRequest, Assignment, ChangeInvisibleDurationRequest, 
QueryAssignmentRequest,
+ReceiveMessageRequest, Resource,
+};
+use crate::session::{RPCClient, Session};
+use crate::util::{build_endpoints_by_message_queue, 
build_push_consumer_settings};
+use crate::{log, pb};
+
+const OPERATION_NEW_PUSH_CONSUMER: &str = "push_consumer.new";
+const OPERATION_RECEIVE_MESSAGE: &str = "push_consumer.receive_message";
+const OPERATION_ACK_MESSAGE: &str = "push_consumer.ack_message";
+const OPERATION_START_PUSH_CONSUMER: &str = "push_consumer.start";
+const OPERATION_CHANGE_INVISIBLE_DURATION: &str = 
"push_consumer.change_invisible_duration";
+
+pub type MessageListener = Box ConsumeResult + Send + 
Sync>;
+
+pub struct PushConsumer {
+logger: Logger,
+client: Client,
+message_listener: Arc,
+option: Arc>,
+shutdown_tx: Option>,
+}
+
+/*
+ * An actor is required for each message queue.
+ * It is responsible for polling messages from the message queue. It 
communicates with PushConsumer by channels.
+ */
+struct MessageQueueActor {
+logger: Logger,
+rpc_client: Session,
+message_queue: MessageQueue,
+shutdown_tx: Option>,
+option: PushConsumerOption,
+message_listener: Arc,
+retry_policy: BackOffRetryPolicy,
+}
+
+impl PushConsumer {
+pub fn new(
+client_option: ClientOption,
+option: PushConsumerOption,
+message_listener: MessageListener,
+) -> Result {
+if option.consumer_group().is_empty() {
+return Err(ClientError::new(
+ErrorKind::Config,
+"consumer group is required.",
+OPERATION_NEW_PUSH_CONSUMER,
+));
+}
+if option.subscription_expressions().is_empty() {
+return Err(ClientError::new(
+ErrorKind::Config,
+"subscription expressions is required.",
+OPERATION_NEW_PUSH_CONSUMER,
+));
+}
+let client_option = ClientOption {
+client_type: ClientType::PushConsumer,
+group: Some(option.consumer_group().to_string()),
+..client_option
+};
+let logger = log::logger(option.logging_format());
+let client = Client::new(
+&logger,
+client_option,
+build_push_consumer_settings(&option),
+)?;
+Ok(Self {
+logger,
+client,
+message_listener: Arc::new(message_listener),
+option: Arc::new(RwLock::new(option)),
+shutdown_tx: None,
+})
+}
+
+pub async fn start(&mut self) -> Result<(), ClientError> {
+let (telemetry_command_tx, mut telemetry_command_rx) = 
mpsc::channel(16);
+self.client.start(telemetry_command_tx).await?;
+let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
+self.shutdown_tx = Some(shutdown_tx);
+let option = Arc::clone(&self.option);
+let mut rpc_client = self.client.get_session().await?;
+let route_manager = self.client.get_route_manager();
+let topics;
+{
+topics = option
+.read()
+.subscription_expressions()
+.keys()
+.cloned()
+.collect()

Re: [PR] [rust] implement standard pushconsumer [rocketmq-clients]

2024-07-10 Thread via GitHub


lizhanhui commented on code in PR #767:
URL: https://github.com/apache/rocketmq-clients/pull/767#discussion_r1673324842


##
rust/src/push_consumer.rs:
##
@@ -0,0 +1,1303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use mockall::automock;
+use mockall_double::double;
+use parking_lot::{Mutex, RwLock};
+use prost_types::Duration;
+use slog::Logger;
+use slog::{debug, error, info, warn};
+use std::collections::{HashMap, VecDeque};
+use std::sync::Arc;
+use tokio::select;
+use tokio::sync::{mpsc, oneshot};
+
+#[double]
+use crate::client::Client;
+use crate::conf::{BackOffRetryPolicy, ClientOption, PushConsumerOption};
+use crate::error::{ClientError, ErrorKind};
+use crate::model::common::{ClientType, ConsumeResult, FilterExpression, 
MessageQueue};
+use crate::model::message::{AckMessageEntry, MessageView};
+use crate::pb::receive_message_response::Content;
+use crate::pb::{
+AckMessageRequest, Assignment, ChangeInvisibleDurationRequest, 
QueryAssignmentRequest,
+ReceiveMessageRequest, Resource,
+};
+use crate::session::{RPCClient, Session};
+use crate::util::{build_endpoints_by_message_queue, 
build_push_consumer_settings};
+use crate::{log, pb};
+
+const OPERATION_NEW_PUSH_CONSUMER: &str = "push_consumer.new";
+const OPERATION_RECEIVE_MESSAGE: &str = "push_consumer.receive_message";
+const OPERATION_ACK_MESSAGE: &str = "push_consumer.ack_message";
+const OPERATION_START_PUSH_CONSUMER: &str = "push_consumer.start";
+const OPERATION_CHANGE_INVISIBLE_DURATION: &str = 
"push_consumer.change_invisible_duration";
+
+pub type MessageListener = Box ConsumeResult + Send + 
Sync>;
+
+pub struct PushConsumer {
+logger: Logger,
+client: Client,
+message_listener: Arc,
+option: Arc>,
+shutdown_tx: Option>,
+}
+
+/*
+ * An actor is required for each message queue.
+ * It is responsible for polling messages from the message queue. It 
communicates with PushConsumer by channels.
+ */
+struct MessageQueueActor {
+logger: Logger,
+rpc_client: Session,
+message_queue: MessageQueue,
+shutdown_tx: Option>,
+option: PushConsumerOption,
+message_listener: Arc,
+retry_policy: BackOffRetryPolicy,
+}
+
+impl PushConsumer {
+pub fn new(
+client_option: ClientOption,
+option: PushConsumerOption,
+message_listener: MessageListener,
+) -> Result {
+if option.consumer_group().is_empty() {
+return Err(ClientError::new(
+ErrorKind::Config,
+"consumer group is required.",
+OPERATION_NEW_PUSH_CONSUMER,
+));
+}
+if option.subscription_expressions().is_empty() {
+return Err(ClientError::new(
+ErrorKind::Config,
+"subscription expressions is required.",
+OPERATION_NEW_PUSH_CONSUMER,
+));
+}
+let client_option = ClientOption {
+client_type: ClientType::PushConsumer,
+group: Some(option.consumer_group().to_string()),
+..client_option
+};
+let logger = log::logger(option.logging_format());
+let client = Client::new(
+&logger,
+client_option,
+build_push_consumer_settings(&option),
+)?;
+Ok(Self {
+logger,
+client,
+message_listener: Arc::new(message_listener),
+option: Arc::new(RwLock::new(option)),
+shutdown_tx: None,
+})
+}
+
+pub async fn start(&mut self) -> Result<(), ClientError> {
+let (telemetry_command_tx, mut telemetry_command_rx) = 
mpsc::channel(16);
+self.client.start(telemetry_command_tx).await?;
+let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
+self.shutdown_tx = Some(shutdown_tx);
+let option = Arc::clone(&self.option);
+let mut rpc_client = self.client.get_session().await?;
+let route_manager = self.client.get_route_manager();
+let topics;
+{
+topics = option
+.read()
+.subscription_expressions()
+.keys()
+.cloned()
+.collect()

Re: [PR] [rust] implement standard pushconsumer [rocketmq-clients]

2024-07-10 Thread via GitHub


lizhanhui commented on code in PR #767:
URL: https://github.com/apache/rocketmq-clients/pull/767#discussion_r1673327287


##
rust/src/push_consumer.rs:
##
@@ -0,0 +1,1303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use mockall::automock;
+use mockall_double::double;
+use parking_lot::{Mutex, RwLock};
+use prost_types::Duration;
+use slog::Logger;
+use slog::{debug, error, info, warn};
+use std::collections::{HashMap, VecDeque};
+use std::sync::Arc;
+use tokio::select;
+use tokio::sync::{mpsc, oneshot};
+
+#[double]
+use crate::client::Client;
+use crate::conf::{BackOffRetryPolicy, ClientOption, PushConsumerOption};
+use crate::error::{ClientError, ErrorKind};
+use crate::model::common::{ClientType, ConsumeResult, FilterExpression, 
MessageQueue};
+use crate::model::message::{AckMessageEntry, MessageView};
+use crate::pb::receive_message_response::Content;
+use crate::pb::{
+AckMessageRequest, Assignment, ChangeInvisibleDurationRequest, 
QueryAssignmentRequest,
+ReceiveMessageRequest, Resource,
+};
+use crate::session::{RPCClient, Session};
+use crate::util::{build_endpoints_by_message_queue, 
build_push_consumer_settings};
+use crate::{log, pb};
+
+const OPERATION_NEW_PUSH_CONSUMER: &str = "push_consumer.new";
+const OPERATION_RECEIVE_MESSAGE: &str = "push_consumer.receive_message";
+const OPERATION_ACK_MESSAGE: &str = "push_consumer.ack_message";
+const OPERATION_START_PUSH_CONSUMER: &str = "push_consumer.start";
+const OPERATION_CHANGE_INVISIBLE_DURATION: &str = 
"push_consumer.change_invisible_duration";
+
+pub type MessageListener = Box ConsumeResult + Send + 
Sync>;
+
+pub struct PushConsumer {
+logger: Logger,
+client: Client,
+message_listener: Arc,
+option: Arc>,
+shutdown_tx: Option>,
+}
+
+/*
+ * An actor is required for each message queue.
+ * It is responsible for polling messages from the message queue. It 
communicates with PushConsumer by channels.
+ */
+struct MessageQueueActor {
+logger: Logger,
+rpc_client: Session,
+message_queue: MessageQueue,
+shutdown_tx: Option>,
+option: PushConsumerOption,
+message_listener: Arc,
+retry_policy: BackOffRetryPolicy,
+}
+
+impl PushConsumer {
+pub fn new(
+client_option: ClientOption,
+option: PushConsumerOption,
+message_listener: MessageListener,
+) -> Result {
+if option.consumer_group().is_empty() {
+return Err(ClientError::new(
+ErrorKind::Config,
+"consumer group is required.",
+OPERATION_NEW_PUSH_CONSUMER,
+));
+}
+if option.subscription_expressions().is_empty() {
+return Err(ClientError::new(
+ErrorKind::Config,
+"subscription expressions is required.",
+OPERATION_NEW_PUSH_CONSUMER,
+));
+}
+let client_option = ClientOption {
+client_type: ClientType::PushConsumer,
+group: Some(option.consumer_group().to_string()),
+..client_option
+};
+let logger = log::logger(option.logging_format());
+let client = Client::new(
+&logger,
+client_option,
+build_push_consumer_settings(&option),
+)?;
+Ok(Self {
+logger,
+client,
+message_listener: Arc::new(message_listener),
+option: Arc::new(RwLock::new(option)),
+shutdown_tx: None,
+})
+}
+
+pub async fn start(&mut self) -> Result<(), ClientError> {
+let (telemetry_command_tx, mut telemetry_command_rx) = 
mpsc::channel(16);
+self.client.start(telemetry_command_tx).await?;
+let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
+self.shutdown_tx = Some(shutdown_tx);
+let option = Arc::clone(&self.option);
+let mut rpc_client = self.client.get_session().await?;
+let route_manager = self.client.get_route_manager();
+let topics;
+{
+topics = option
+.read()
+.subscription_expressions()
+.keys()
+.cloned()
+.collect()

Re: [PR] [rust] implement standard pushconsumer [rocketmq-clients]

2024-07-10 Thread via GitHub


lizhanhui commented on code in PR #767:
URL: https://github.com/apache/rocketmq-clients/pull/767#discussion_r1673330758


##
rust/src/push_consumer.rs:
##
@@ -0,0 +1,1303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use mockall::automock;
+use mockall_double::double;
+use parking_lot::{Mutex, RwLock};
+use prost_types::Duration;
+use slog::Logger;
+use slog::{debug, error, info, warn};
+use std::collections::{HashMap, VecDeque};
+use std::sync::Arc;
+use tokio::select;
+use tokio::sync::{mpsc, oneshot};
+
+#[double]
+use crate::client::Client;
+use crate::conf::{BackOffRetryPolicy, ClientOption, PushConsumerOption};
+use crate::error::{ClientError, ErrorKind};
+use crate::model::common::{ClientType, ConsumeResult, FilterExpression, 
MessageQueue};
+use crate::model::message::{AckMessageEntry, MessageView};
+use crate::pb::receive_message_response::Content;
+use crate::pb::{
+AckMessageRequest, Assignment, ChangeInvisibleDurationRequest, 
QueryAssignmentRequest,
+ReceiveMessageRequest, Resource,
+};
+use crate::session::{RPCClient, Session};
+use crate::util::{build_endpoints_by_message_queue, 
build_push_consumer_settings};
+use crate::{log, pb};
+
+const OPERATION_NEW_PUSH_CONSUMER: &str = "push_consumer.new";
+const OPERATION_RECEIVE_MESSAGE: &str = "push_consumer.receive_message";
+const OPERATION_ACK_MESSAGE: &str = "push_consumer.ack_message";
+const OPERATION_START_PUSH_CONSUMER: &str = "push_consumer.start";
+const OPERATION_CHANGE_INVISIBLE_DURATION: &str = 
"push_consumer.change_invisible_duration";
+
+pub type MessageListener = Box ConsumeResult + Send + 
Sync>;
+
+pub struct PushConsumer {
+logger: Logger,
+client: Client,
+message_listener: Arc,
+option: Arc>,
+shutdown_tx: Option>,
+}
+
+/*
+ * An actor is required for each message queue.
+ * It is responsible for polling messages from the message queue. It 
communicates with PushConsumer by channels.
+ */
+struct MessageQueueActor {
+logger: Logger,
+rpc_client: Session,
+message_queue: MessageQueue,
+shutdown_tx: Option>,
+option: PushConsumerOption,
+message_listener: Arc,
+retry_policy: BackOffRetryPolicy,
+}
+
+impl PushConsumer {
+pub fn new(
+client_option: ClientOption,
+option: PushConsumerOption,
+message_listener: MessageListener,
+) -> Result {
+if option.consumer_group().is_empty() {
+return Err(ClientError::new(
+ErrorKind::Config,
+"consumer group is required.",
+OPERATION_NEW_PUSH_CONSUMER,
+));
+}
+if option.subscription_expressions().is_empty() {
+return Err(ClientError::new(
+ErrorKind::Config,
+"subscription expressions is required.",
+OPERATION_NEW_PUSH_CONSUMER,
+));
+}
+let client_option = ClientOption {
+client_type: ClientType::PushConsumer,
+group: Some(option.consumer_group().to_string()),
+..client_option
+};
+let logger = log::logger(option.logging_format());
+let client = Client::new(
+&logger,
+client_option,
+build_push_consumer_settings(&option),
+)?;
+Ok(Self {
+logger,
+client,
+message_listener: Arc::new(message_listener),
+option: Arc::new(RwLock::new(option)),
+shutdown_tx: None,
+})
+}
+
+pub async fn start(&mut self) -> Result<(), ClientError> {
+let (telemetry_command_tx, mut telemetry_command_rx) = 
mpsc::channel(16);
+self.client.start(telemetry_command_tx).await?;
+let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
+self.shutdown_tx = Some(shutdown_tx);
+let option = Arc::clone(&self.option);
+let mut rpc_client = self.client.get_session().await?;
+let route_manager = self.client.get_route_manager();
+let topics;
+{
+topics = option
+.read()
+.subscription_expressions()
+.keys()
+.cloned()
+.collect()

Re: [PR] [rust] implement standard pushconsumer [rocketmq-clients]

2024-07-10 Thread via GitHub


lizhanhui commented on code in PR #767:
URL: https://github.com/apache/rocketmq-clients/pull/767#discussion_r167147


##
rust/src/push_consumer.rs:
##
@@ -0,0 +1,1303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use mockall::automock;
+use mockall_double::double;
+use parking_lot::{Mutex, RwLock};
+use prost_types::Duration;
+use slog::Logger;
+use slog::{debug, error, info, warn};
+use std::collections::{HashMap, VecDeque};
+use std::sync::Arc;
+use tokio::select;
+use tokio::sync::{mpsc, oneshot};
+
+#[double]
+use crate::client::Client;
+use crate::conf::{BackOffRetryPolicy, ClientOption, PushConsumerOption};
+use crate::error::{ClientError, ErrorKind};
+use crate::model::common::{ClientType, ConsumeResult, FilterExpression, 
MessageQueue};
+use crate::model::message::{AckMessageEntry, MessageView};
+use crate::pb::receive_message_response::Content;
+use crate::pb::{
+AckMessageRequest, Assignment, ChangeInvisibleDurationRequest, 
QueryAssignmentRequest,
+ReceiveMessageRequest, Resource,
+};
+use crate::session::{RPCClient, Session};
+use crate::util::{build_endpoints_by_message_queue, 
build_push_consumer_settings};
+use crate::{log, pb};
+
+const OPERATION_NEW_PUSH_CONSUMER: &str = "push_consumer.new";
+const OPERATION_RECEIVE_MESSAGE: &str = "push_consumer.receive_message";
+const OPERATION_ACK_MESSAGE: &str = "push_consumer.ack_message";
+const OPERATION_START_PUSH_CONSUMER: &str = "push_consumer.start";
+const OPERATION_CHANGE_INVISIBLE_DURATION: &str = 
"push_consumer.change_invisible_duration";
+
+pub type MessageListener = Box ConsumeResult + Send + 
Sync>;
+
+pub struct PushConsumer {
+logger: Logger,
+client: Client,
+message_listener: Arc,
+option: Arc>,
+shutdown_tx: Option>,
+}
+
+/*
+ * An actor is required for each message queue.
+ * It is responsible for polling messages from the message queue. It 
communicates with PushConsumer by channels.
+ */
+struct MessageQueueActor {
+logger: Logger,
+rpc_client: Session,
+message_queue: MessageQueue,
+shutdown_tx: Option>,
+option: PushConsumerOption,
+message_listener: Arc,
+retry_policy: BackOffRetryPolicy,
+}
+
+impl PushConsumer {
+pub fn new(
+client_option: ClientOption,
+option: PushConsumerOption,
+message_listener: MessageListener,
+) -> Result {
+if option.consumer_group().is_empty() {
+return Err(ClientError::new(
+ErrorKind::Config,
+"consumer group is required.",
+OPERATION_NEW_PUSH_CONSUMER,
+));
+}
+if option.subscription_expressions().is_empty() {
+return Err(ClientError::new(
+ErrorKind::Config,
+"subscription expressions is required.",
+OPERATION_NEW_PUSH_CONSUMER,
+));
+}
+let client_option = ClientOption {
+client_type: ClientType::PushConsumer,
+group: Some(option.consumer_group().to_string()),
+..client_option
+};
+let logger = log::logger(option.logging_format());
+let client = Client::new(
+&logger,
+client_option,
+build_push_consumer_settings(&option),
+)?;
+Ok(Self {
+logger,
+client,
+message_listener: Arc::new(message_listener),
+option: Arc::new(RwLock::new(option)),
+shutdown_tx: None,
+})
+}
+
+pub async fn start(&mut self) -> Result<(), ClientError> {
+let (telemetry_command_tx, mut telemetry_command_rx) = 
mpsc::channel(16);
+self.client.start(telemetry_command_tx).await?;
+let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
+self.shutdown_tx = Some(shutdown_tx);
+let option = Arc::clone(&self.option);
+let mut rpc_client = self.client.get_session().await?;
+let route_manager = self.client.get_route_manager();
+let topics;
+{
+topics = option
+.read()
+.subscription_expressions()
+.keys()
+.cloned()
+.collect()

Re: [PR] [rust] implement standard pushconsumer [rocketmq-clients]

2024-07-10 Thread via GitHub


lizhanhui commented on code in PR #767:
URL: https://github.com/apache/rocketmq-clients/pull/767#discussion_r167147


##
rust/src/push_consumer.rs:
##
@@ -0,0 +1,1303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use mockall::automock;
+use mockall_double::double;
+use parking_lot::{Mutex, RwLock};
+use prost_types::Duration;
+use slog::Logger;
+use slog::{debug, error, info, warn};
+use std::collections::{HashMap, VecDeque};
+use std::sync::Arc;
+use tokio::select;
+use tokio::sync::{mpsc, oneshot};
+
+#[double]
+use crate::client::Client;
+use crate::conf::{BackOffRetryPolicy, ClientOption, PushConsumerOption};
+use crate::error::{ClientError, ErrorKind};
+use crate::model::common::{ClientType, ConsumeResult, FilterExpression, 
MessageQueue};
+use crate::model::message::{AckMessageEntry, MessageView};
+use crate::pb::receive_message_response::Content;
+use crate::pb::{
+AckMessageRequest, Assignment, ChangeInvisibleDurationRequest, 
QueryAssignmentRequest,
+ReceiveMessageRequest, Resource,
+};
+use crate::session::{RPCClient, Session};
+use crate::util::{build_endpoints_by_message_queue, 
build_push_consumer_settings};
+use crate::{log, pb};
+
+const OPERATION_NEW_PUSH_CONSUMER: &str = "push_consumer.new";
+const OPERATION_RECEIVE_MESSAGE: &str = "push_consumer.receive_message";
+const OPERATION_ACK_MESSAGE: &str = "push_consumer.ack_message";
+const OPERATION_START_PUSH_CONSUMER: &str = "push_consumer.start";
+const OPERATION_CHANGE_INVISIBLE_DURATION: &str = 
"push_consumer.change_invisible_duration";
+
+pub type MessageListener = Box ConsumeResult + Send + 
Sync>;
+
+pub struct PushConsumer {
+logger: Logger,
+client: Client,
+message_listener: Arc,
+option: Arc>,
+shutdown_tx: Option>,
+}
+
+/*
+ * An actor is required for each message queue.
+ * It is responsible for polling messages from the message queue. It 
communicates with PushConsumer by channels.
+ */
+struct MessageQueueActor {
+logger: Logger,
+rpc_client: Session,
+message_queue: MessageQueue,
+shutdown_tx: Option>,
+option: PushConsumerOption,
+message_listener: Arc,
+retry_policy: BackOffRetryPolicy,
+}
+
+impl PushConsumer {
+pub fn new(
+client_option: ClientOption,
+option: PushConsumerOption,
+message_listener: MessageListener,
+) -> Result {
+if option.consumer_group().is_empty() {
+return Err(ClientError::new(
+ErrorKind::Config,
+"consumer group is required.",
+OPERATION_NEW_PUSH_CONSUMER,
+));
+}
+if option.subscription_expressions().is_empty() {
+return Err(ClientError::new(
+ErrorKind::Config,
+"subscription expressions is required.",
+OPERATION_NEW_PUSH_CONSUMER,
+));
+}
+let client_option = ClientOption {
+client_type: ClientType::PushConsumer,
+group: Some(option.consumer_group().to_string()),
+..client_option
+};
+let logger = log::logger(option.logging_format());
+let client = Client::new(
+&logger,
+client_option,
+build_push_consumer_settings(&option),
+)?;
+Ok(Self {
+logger,
+client,
+message_listener: Arc::new(message_listener),
+option: Arc::new(RwLock::new(option)),
+shutdown_tx: None,
+})
+}
+
+pub async fn start(&mut self) -> Result<(), ClientError> {
+let (telemetry_command_tx, mut telemetry_command_rx) = 
mpsc::channel(16);
+self.client.start(telemetry_command_tx).await?;
+let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
+self.shutdown_tx = Some(shutdown_tx);
+let option = Arc::clone(&self.option);
+let mut rpc_client = self.client.get_session().await?;
+let route_manager = self.client.get_route_manager();
+let topics;
+{
+topics = option
+.read()
+.subscription_expressions()
+.keys()
+.cloned()
+.collect()

[GH] (rocketmq): Workflow run "PR-CI" failed!

2024-07-10 Thread GitBox


The GitHub Actions job "PR-CI" on rocketmq.git has failed.
Run started by GitHub user qianye1001 (triggered by qianye1001).

Head commit for run:
0f33126aa50a5fc67c212f20e94bdd236dd552b7 / qianye 

Merge remote-tracking branch 'origin/Enhancement#8365' into Enhancement#8365

# Conflicts:
#   
client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
#   
client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java

Report URL: https://github.com/apache/rocketmq/actions/runs/9884852631

With regards,
GitHub Actions via GitBox



[GH] (rocketmq): Workflow run "E2E test for pull request" failed!

2024-07-10 Thread GitBox


The GitHub Actions job "E2E test for pull request" on rocketmq.git has failed.
Run started by GitHub user qianye1001 (triggered by qianye1001).

Head commit for run:
67ddc1dae0548a5a2e413a2c7ef155b39a64f2b6 / yx9o 
[ISSUE #8375] Add more test coverage for MqClientAdminImpl (#8376)

* Add more test coverage for MqClientAdminImpl

Report URL: https://github.com/apache/rocketmq/actions/runs/9884860440

With regards,
GitHub Actions via GitBox



[GH] (rocketmq): Workflow run "Build and Run Tests by Bazel" failed!

2024-07-10 Thread GitBox


The GitHub Actions job "Build and Run Tests by Bazel" on rocketmq.git has 
failed.
Run started by GitHub user qianye1001 (triggered by qianye1001).

Head commit for run:
0f33126aa50a5fc67c212f20e94bdd236dd552b7 / qianye 

Merge remote-tracking branch 'origin/Enhancement#8365' into Enhancement#8365

# Conflicts:
#   
client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
#   
client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java

Report URL: https://github.com/apache/rocketmq/actions/runs/9884852629

With regards,
GitHub Actions via GitBox



[GH] (rocketmq): Workflow run "Build and Run Tests by Maven" failed!

2024-07-10 Thread GitBox


The GitHub Actions job "Build and Run Tests by Maven" on rocketmq.git has 
failed.
Run started by GitHub user qianye1001 (triggered by qianye1001).

Head commit for run:
0f33126aa50a5fc67c212f20e94bdd236dd552b7 / qianye 

Merge remote-tracking branch 'origin/Enhancement#8365' into Enhancement#8365

# Conflicts:
#   
client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
#   
client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java

Report URL: https://github.com/apache/rocketmq/actions/runs/9884852628

With regards,
GitHub Actions via GitBox



[GH] (rocketmq): Workflow run "PR-CI" is working again!

2024-07-10 Thread GitBox


The GitHub Actions job "PR-CI" on rocketmq.git has succeeded.
Run started by GitHub user qianye1001 (triggered by qianye1001).

Head commit for run:
181be3017a8f91b722d731ebbd36c572c0baaa2c / qianye 

fix style

Report URL: https://github.com/apache/rocketmq/actions/runs/9884901042

With regards,
GitHub Actions via GitBox



[GH] (rocketmq): Workflow run "Build and Run Tests by Bazel" is working again!

2024-07-10 Thread GitBox


The GitHub Actions job "Build and Run Tests by Bazel" on rocketmq.git has 
succeeded.
Run started by GitHub user qianye1001 (triggered by qianye1001).

Head commit for run:
181be3017a8f91b722d731ebbd36c572c0baaa2c / qianye 

fix style

Report URL: https://github.com/apache/rocketmq/actions/runs/9884901041

With regards,
GitHub Actions via GitBox



[GH] (rocketmq): Workflow run "Coverage" failed!

2024-07-10 Thread GitBox


The GitHub Actions job "Coverage" on rocketmq.git has failed.
Run started by GitHub user qianye1001 (triggered by qianye1001).

Head commit for run:
0f33126aa50a5fc67c212f20e94bdd236dd552b7 / qianye 

Merge remote-tracking branch 'origin/Enhancement#8365' into Enhancement#8365

# Conflicts:
#   
client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
#   
client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java

Report URL: https://github.com/apache/rocketmq/actions/runs/9884852627

With regards,
GitHub Actions via GitBox



[GH] (rocketmq): Workflow run "E2E test for pull request" failed!

2024-07-10 Thread GitBox


The GitHub Actions job "E2E test for pull request" on rocketmq.git has failed.
Run started by GitHub user qianye1001 (triggered by qianye1001).

Head commit for run:
67ddc1dae0548a5a2e413a2c7ef155b39a64f2b6 / yx9o 
[ISSUE #8375] Add more test coverage for MqClientAdminImpl (#8376)

* Add more test coverage for MqClientAdminImpl

Report URL: https://github.com/apache/rocketmq/actions/runs/9884922377

With regards,
GitHub Actions via GitBox



Re: [PR] [ISSUE #8366] Eliminate deadlocks during the client shutdown process. [rocketmq]

2024-07-10 Thread via GitHub


YanYunyang commented on PR #8367:
URL: https://github.com/apache/rocketmq/pull/8367#issuecomment-2221953973

   > 
https://github.com/apache/rocketmq/blob/32f0e8481d1f5672b8d494de7607fe5b846ffa51/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java#L479-L484
   > 
   > It seems that this piece of code also may lead to deadlock.
   
   Here is the same channelWrapper. The lock in channelWrapper is reentrant.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [ISSUE #8366] Eliminate deadlocks during the client shutdown process. [rocketmq]

2024-07-10 Thread via GitHub


YanYunyang commented on PR #8367:
URL: https://github.com/apache/rocketmq/pull/8367#issuecomment-2221957851

   > There are two closeChannel method.
   > 
   > This piece of code also needs modification.
   > 
   > 
https://github.com/apache/rocketmq/blob/32f0e8481d1f5672b8d494de7607fe5b846ffa51/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java#L424-L428
   
   Here it needs to be changed to `isWrapperOf` to match the meaning. Thank you.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GH] (rocketmq): Workflow run "Coverage" is working again!

2024-07-10 Thread GitBox


The GitHub Actions job "Coverage" on rocketmq.git has succeeded.
Run started by GitHub user qianye1001 (triggered by qianye1001).

Head commit for run:
181be3017a8f91b722d731ebbd36c572c0baaa2c / qianye 

fix style

Report URL: https://github.com/apache/rocketmq/actions/runs/9884901036

With regards,
GitHub Actions via GitBox



[GH] (rocketmq): Workflow run "Build and Run Tests by Maven" is working again!

2024-07-10 Thread GitBox


The GitHub Actions job "Build and Run Tests by Maven" on rocketmq.git has 
succeeded.
Run started by GitHub user qianye1001 (triggered by qianye1001).

Head commit for run:
181be3017a8f91b722d731ebbd36c572c0baaa2c / qianye 

fix style

Report URL: https://github.com/apache/rocketmq/actions/runs/9884901047

With regards,
GitHub Actions via GitBox



Re: [PR] [ISSUE #8366] Eliminate deadlocks during the client shutdown process. [rocketmq]

2024-07-10 Thread via GitHub


drpmma commented on PR #8367:
URL: https://github.com/apache/rocketmq/pull/8367#issuecomment-100844

   > 
https://github.com/apache/rocketmq/blob/32f0e8481d1f5672b8d494de7607fe5b846ffa51/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java#L479-L484
   > 
   > It seems that this piece of code also may lead to deadlock.
   
   how about tryClose method which contains lock.readLock().lock(), it also may 
lead to deadlock.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [ISSUE #8058] Support for upgrading metadata in json to rocksdb (#8045) [rocketmq]

2024-07-10 Thread via GitHub


LetLetMe closed pull request #8116: [ISSUE #8058] Support for upgrading 
metadata in json to rocksdb (#8045)
URL: https://github.com/apache/rocketmq/pull/8116


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GH] (rocketmq): Workflow run "E2E test for pull request" failed!

2024-07-10 Thread GitBox


The GitHub Actions job "E2E test for pull request" on rocketmq.git has failed.
Run started by GitHub user YanYunyang (triggered by YanYunyang).

Head commit for run:
67ddc1dae0548a5a2e413a2c7ef155b39a64f2b6 / yx9o 
[ISSUE #8375] Add more test coverage for MqClientAdminImpl (#8376)

* Add more test coverage for MqClientAdminImpl

Report URL: https://github.com/apache/rocketmq/actions/runs/9886405353

With regards,
GitHub Actions via GitBox



Re: [PR] [ISSUE #8366] Eliminate deadlocks during the client shutdown process. [rocketmq]

2024-07-10 Thread via GitHub


codecov-commenter commented on PR #8367:
URL: https://github.com/apache/rocketmq/pull/8367#issuecomment-144132

   ## 
[Codecov](https://app.codecov.io/gh/apache/rocketmq/pull/8367?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 Report
   Attention: Patch coverage is `75.0%` with `2 lines` in your changes 
missing coverage. Please review.
   > Project coverage is 44.25%. Comparing base 
[(`77d6633`)](https://app.codecov.io/gh/apache/rocketmq/commit/77d6633e622200ed52feef6fb2adeb12fba8e2c8?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 to head 
[(`36d561f`)](https://app.codecov.io/gh/apache/rocketmq/commit/36d561f118de4eee06a9bea720dc1fae44d42e72?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   > Report is 5 commits behind head on develop.
   
   | 
[Files](https://app.codecov.io/gh/apache/rocketmq/pull/8367?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | Patch % | Lines |
   |---|---|---|
   | 
[...e/rocketmq/remoting/netty/NettyRemotingClient.java](https://app.codecov.io/gh/apache/rocketmq/pull/8367?src=pr&el=tree&filepath=remoting%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Frocketmq%2Fremoting%2Fnetty%2FNettyRemotingClient.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cmVtb3Rpbmcvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3JlbW90aW5nL25ldHR5L05ldHR5UmVtb3RpbmdDbGllbnQuamF2YQ==)
 | 75.00% | [1 Missing and 1 partial :warning: 
](https://app.codecov.io/gh/apache/rocketmq/pull/8367?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 |
   
   Additional details and impacted files
   
   
   ```diff
   @@  Coverage Diff  @@
   ## develop#8367  +/-   ##
   =
   + Coverage  43.88%   44.25%   +0.37% 
   - Complexity 1064710742  +95 
   =
 Files   1274 1274  
 Lines  8893988941   +2 
 Branches   1143211432  
   =
   + Hits   3903339365 +332 
   + Misses 4498544618 -367 
   - Partials4921 4958  +37 
   ```
   
   
   
   [:umbrella: View full report in Codecov by 
Sentry](https://app.codecov.io/gh/apache/rocketmq/pull/8367?dropdown=coverage&src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   
   :loudspeaker: Have feedback on the report? [Share it 
here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [rust] implement standard pushconsumer [rocketmq-clients]

2024-07-10 Thread via GitHub


glcrazier commented on code in PR #767:
URL: https://github.com/apache/rocketmq-clients/pull/767#discussion_r1673495957


##
rust/src/push_consumer.rs:
##
@@ -0,0 +1,1303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use mockall::automock;
+use mockall_double::double;
+use parking_lot::{Mutex, RwLock};
+use prost_types::Duration;
+use slog::Logger;
+use slog::{debug, error, info, warn};
+use std::collections::{HashMap, VecDeque};
+use std::sync::Arc;
+use tokio::select;
+use tokio::sync::{mpsc, oneshot};
+
+#[double]
+use crate::client::Client;
+use crate::conf::{BackOffRetryPolicy, ClientOption, PushConsumerOption};
+use crate::error::{ClientError, ErrorKind};
+use crate::model::common::{ClientType, ConsumeResult, FilterExpression, 
MessageQueue};
+use crate::model::message::{AckMessageEntry, MessageView};
+use crate::pb::receive_message_response::Content;
+use crate::pb::{
+AckMessageRequest, Assignment, ChangeInvisibleDurationRequest, 
QueryAssignmentRequest,
+ReceiveMessageRequest, Resource,
+};
+use crate::session::{RPCClient, Session};
+use crate::util::{build_endpoints_by_message_queue, 
build_push_consumer_settings};
+use crate::{log, pb};
+
+const OPERATION_NEW_PUSH_CONSUMER: &str = "push_consumer.new";
+const OPERATION_RECEIVE_MESSAGE: &str = "push_consumer.receive_message";
+const OPERATION_ACK_MESSAGE: &str = "push_consumer.ack_message";
+const OPERATION_START_PUSH_CONSUMER: &str = "push_consumer.start";
+const OPERATION_CHANGE_INVISIBLE_DURATION: &str = 
"push_consumer.change_invisible_duration";
+
+pub type MessageListener = Box ConsumeResult + Send + 
Sync>;

Review Comment:
   The FnMut requires exclusive access in threads which means incapable of 
concurrent consumption. Do you have any idea?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GH] (rocketmq): Workflow run "Build and Run Tests by Maven" failed!

2024-07-10 Thread GitBox


The GitHub Actions job "Build and Run Tests by Maven" on rocketmq.git has 
failed.
Run started by GitHub user YanYunyang (triggered by drpmma).

Head commit for run:
36d561f118de4eee06a9bea720dc1fae44d42e72 / YanYunyang <313169...@qq.com>
[ISSUE #8366] Compare channels for equality using `isWrapperOf`.

Report URL: https://github.com/apache/rocketmq/actions/runs/9885151578

With regards,
GitHub Actions via GitBox



Re: [PR] [ISSUE #8365] add remoting client non-oneway updateConsumerOffset function [rocketmq]

2024-07-10 Thread via GitHub


qianye1001 closed pull request #8368: [ISSUE #8365] add remoting client 
non-oneway updateConsumerOffset function
URL: https://github.com/apache/rocketmq/pull/8368


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org