Re: [I] [Enhancement] Add more test coverage for MqClientAdminImpl [rocketmq]
drpmma closed issue #8375: [Enhancement] Add more test coverage for MqClientAdminImpl URL: https://github.com/apache/rocketmq/issues/8375 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [ISSUE #8375] Add more test coverage for MqClientAdminImpl [rocketmq]
drpmma merged PR #8376: URL: https://github.com/apache/rocketmq/pull/8376 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(rocketmq) branch develop updated: [ISSUE #8375] Add more test coverage for MqClientAdminImpl (#8376)
This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git The following commit(s) were added to refs/heads/develop by this push: new 67ddc1dae0 [ISSUE #8375] Add more test coverage for MqClientAdminImpl (#8376) 67ddc1dae0 is described below commit 67ddc1dae0548a5a2e413a2c7ef155b39a64f2b6 Author: yx9o AuthorDate: Wed Jul 10 15:23:48 2024 +0800 [ISSUE #8375] Add more test coverage for MqClientAdminImpl (#8376) * Add more test coverage for MqClientAdminImpl --- .../client/impl/admin/MqClientAdminImplTest.java | 559 + 1 file changed, 559 insertions(+) diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/admin/MqClientAdminImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/admin/MqClientAdminImplTest.java new file mode 100644 index 00..71682fb52c --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/impl/admin/MqClientAdminImplTest.java @@ -0,0 +1,559 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.impl.admin; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.RemotingClient; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; +import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; +import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.body.GroupList; +import org.apache.rocketmq.remoting.protocol.body.QueryConsumeTimeSpanBody; +import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan; +import org.apache.rocketmq.remoting.protocol.body.ResetOffsetBody; +import org.apache.rocketmq.remoting.protocol.body.TopicList; +import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.DeleteSubscriptionGroupRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.DeleteTopicRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.GetTopicStatsInfoRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QueryConsumeTimeSpanRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QueryMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QuerySubscriptionByConsumerRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QueryTopicConsumeByWhoRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QueryTopicsByConsumerRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.ResetOffsetRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.ViewMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteKVConfigRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteTopicFromNamesrvRequestHeader; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; +import org.junit.Before; +imp
[GH] (rocketmq): Workflow run "PUSH-CI" failed!
The GitHub Actions job "PUSH-CI" on rocketmq.git has failed. Run started by GitHub user drpmma (triggered by drpmma). Head commit for run: 67ddc1dae0548a5a2e413a2c7ef155b39a64f2b6 / yx9o [ISSUE #8375] Add more test coverage for MqClientAdminImpl (#8376) * Add more test coverage for MqClientAdminImpl Report URL: https://github.com/apache/rocketmq/actions/runs/9869934315 With regards, GitHub Actions via GitBox
Re: [PR] [ISSUE #8366] Eliminate deadlocks during the client shutdown process. [rocketmq]
drpmma commented on PR #8367: URL: https://github.com/apache/rocketmq/pull/8367#issuecomment-2219815886 https://github.com/apache/rocketmq/blob/32f0e8481d1f5672b8d494de7607fe5b846ffa51/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java#L479-L484 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
svn commit: r70210 - /dev/rocketmq/5.3.0-rc1/
Author: lizhimin Date: Wed Jul 10 09:48:35 2024 New Revision: 70210 Log: release 5.3.0-rc1 Added: dev/rocketmq/5.3.0-rc1/ dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-bin-release.zip (with props) dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-bin-release.zip.asc (with props) dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-bin-release.zip.sha512 dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-source-release.zip (with props) dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-source-release.zip.asc (with props) dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-source-release.zip.sha512 Added: dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-bin-release.zip == Binary file - no diff available. Propchange: dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-bin-release.zip -- svn:mime-type = application/zip Added: dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-bin-release.zip.asc == Binary file - no diff available. Propchange: dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-bin-release.zip.asc -- svn:mime-type = application/pgp-signature Added: dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-bin-release.zip.sha512 == --- dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-bin-release.zip.sha512 (added) +++ dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-bin-release.zip.sha512 Wed Jul 10 09:48:35 2024 @@ -0,0 +1,4 @@ +rocketmq-all-5.3.0-bin-release.zip: 11285C3A 8F734849 84398ED0 13F5FF09 CE772F7B +C6C78899 1EEA8C1A 3DBC4634 62BBB246 3C19E973 +26DD0F88 733D88A1 F0CBC67D C9707457 1820D048 +44AD12EE Added: dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-source-release.zip == Binary file - no diff available. Propchange: dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-source-release.zip -- svn:mime-type = application/zip Added: dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-source-release.zip.asc == Binary file - no diff available. Propchange: dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-source-release.zip.asc -- svn:mime-type = application/pgp-signature Added: dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-source-release.zip.sha512 == --- dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-source-release.zip.sha512 (added) +++ dev/rocketmq/5.3.0-rc1/rocketmq-all-5.3.0-source-release.zip.sha512 Wed Jul 10 09:48:35 2024 @@ -0,0 +1,4 @@ +rocketmq-all-5.3.0-source-release.zip: 089E609E 201A908A 29B377EC 775B0735 + F6BAE026 AD5E6B7B 5F9E186E F9AA69C2 + 744435CB 9E88C677 50DEE125 2E6ED147 + 0F239AF9 77D05560 3A37E0CE 28C883B1
[I] 控制台发起重新消费报错 [rocketmq-dashboard]
jevonsnotes opened a new issue, #220: URL: https://github.com/apache/rocketmq-dashboard/issues/220 The issue tracker is **ONLY** used for bug report and feature request. Any question or RocketMQ proposal please use our [mailing lists](http://rocketmq.apache.org/about/contact/). **BUG REPORT** 1. Please describe the issue you observed: - What did you do (The steps to reproduce)? 我修改了源码,将proxy地址作为addr传入了以下方法,请求报错不支持309类型的请求 ``` public ConsumeMessageDirectlyResult consumeMessageDirectly(final String addr, String consumerGroup, String clientId, String topic, String msgId, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { ConsumeMessageDirectlyResultRequestHeader requestHeader = new ConsumeMessageDirectlyResultRequestHeader(); requestHeader.setTopic(topic); requestHeader.setConsumerGroup(consumerGroup); requestHeader.setClientId(clientId); requestHeader.setMsgId(msgId); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUME_MESSAGE_DIRECTLY, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { byte[] body = response.getBody(); if (body != null) { ConsumeMessageDirectlyResult info = ConsumeMessageDirectlyResult.decode(body, ConsumeMessageDirectlyResult.class); return info; } } default: break; } throw new MQClientException(response.getCode(), response.getRemark()); } ``` - What did you expect to see? 能够正常消费 - What did you see instead? 目前无法从控制台触发重新消费 2. Please tell us about your environment: rocktmq 5.2.0 3. Other information (e.g. detailed explanation, logs, related issues, suggestions how to fix, etc): **FEATURE REQUEST** 1. Please describe the feature you are requesting. 2. Provide any additional detail on your proposed use case for this feature. 2. Indicate the importance of this issue to you (blocker, must-have, should-have, nice-to-have). Are you currently using any workarounds to address this issue? 4. If there are some sub-tasks using -[] for each subtask and create a corresponding issue to map to the sub task: - [sub-task1-issue-number](example_sub_issue1_link_here): sub-task1 description here, - [sub-task2-issue-number](example_sub_issue2_link_here): sub-task2 description here, - ... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] 无法查看消费终端 [rocketmq-dashboard]
jevonsnotes closed issue #218: 无法查看消费终端 URL: https://github.com/apache/rocketmq-dashboard/issues/218 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] 无法查看消费终端 [rocketmq-dashboard]
jevonsnotes commented on issue #218: URL: https://github.com/apache/rocketmq-dashboard/issues/218#issuecomment-2220282546 更新代码后,将VIP配置调整为false可以解决 ``` rocketmq: config: isVIPChannel: false ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Add release notes for apache rocketmq 5.3.0 [rocketmq-site]
lizhimins opened a new pull request, #667: URL: https://github.com/apache/rocketmq-site/pull/667 Please do not create a Pull Request without creating an issue first. ## What is the purpose of the change X ## Brief changelog XX ## Verifying this change Follow this checklist to help us incorporate your contribution quickly and easily: - [x] Make sure there is a Github issue filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue. - [ ] Format the pull request title like `[ISSUE #123] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body. - [ ] Write a pull request description that is detailed enough to understand what the pull request does, how, and why. - [ ] Write necessary unit-test to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module](https://github.com/apache/rocketmq/tree/master/test). - [ ] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test` to make sure integration-test pass. - [ ] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GH] (rocketmq-clients): Workflow run "Build" failed!
The GitHub Actions job "Build" on rocketmq-clients.git has failed. Run started by GitHub user tsunghanjacktsai (triggered by tsunghanjacktsai). Head commit for run: 835c8b80d9165212d6ce96a778c7c8a5a2622880 / tsaitsung-han.tht Add push consumer for normal message in c# sdk Report URL: https://github.com/apache/rocketmq-clients/actions/runs/9850194881 With regards, GitHub Actions via GitBox
[GH] (rocketmq-clients): Workflow run "Build" failed!
The GitHub Actions job "Build" on rocketmq-clients.git has failed. Run started by GitHub user tsunghanjacktsai (triggered by tsunghanjacktsai). Head commit for run: 3925413bd2eb996b3b966883f8fb93d9661fd457 / tsaitsung-han.tht Add namespace support for c# sdk Report URL: https://github.com/apache/rocketmq-clients/actions/runs/9851055417 With regards, GitHub Actions via GitBox
[GH] (rocketmq-clients): Workflow run "Build" failed!
The GitHub Actions job "Build" on rocketmq-clients.git has failed. Run started by GitHub user tsunghanjacktsai (triggered by tsunghanjacktsai). Head commit for run: 0a02034b1a74ba60cab486bac3d2c64bd371464f / tsaitsung-han.tht Add reentrant push consumer message receiving support for c# sdk Report URL: https://github.com/apache/rocketmq-clients/actions/runs/9850261052 With regards, GitHub Actions via GitBox
[GH] (rocketmq-clients): Workflow run "Build" failed!
The GitHub Actions job "Build" on rocketmq-clients.git has failed. Run started by GitHub user tsunghanjacktsai (triggered by tsunghanjacktsai). Head commit for run: 6292e81b094546d595bf9adef1498b22854eeac1 / tsaitsung-han.tht Add push consumer for fifo message in c# sdk Report URL: https://github.com/apache/rocketmq-clients/actions/runs/9850695382 With regards, GitHub Actions via GitBox
[GH] (rocketmq-clients): Workflow run "Build" failed!
The GitHub Actions job "Build" on rocketmq-clients.git has failed. Run started by GitHub user tsunghanjacktsai (triggered by tsunghanjacktsai). Head commit for run: cf2f696e64e74884f6c2f98b771ac56466cddc44 / tsaitsung-han.tht Add push consumer for fifo message in c# sdk Report URL: https://github.com/apache/rocketmq-clients/actions/runs/9850205843 With regards, GitHub Actions via GitBox
[GH] (rocketmq-clients): Workflow run "Build" failed!
The GitHub Actions job "Build" on rocketmq-clients.git has failed. Run started by GitHub user tsunghanjacktsai (triggered by tsunghanjacktsai). Head commit for run: 07b513299035729a2d5c979ac9157a2235952902 / tsaitsung-han.tht Add namespace support for c# sdk Report URL: https://github.com/apache/rocketmq-clients/actions/runs/9850237585 With regards, GitHub Actions via GitBox
[GH] (rocketmq-clients): Workflow run "Build" failed!
The GitHub Actions job "Build" on rocketmq-clients.git has failed. Run started by GitHub user tsunghanjacktsai (triggered by tsunghanjacktsai). Head commit for run: 98585a31240ede820e842605bb6cbc001de515cf / tsaitsung-han.tht Add push consumer for normal message in c# sdk Report URL: https://github.com/apache/rocketmq-clients/actions/runs/9850691202 With regards, GitHub Actions via GitBox
[GH] (rocketmq-clients): Workflow run "Build" failed!
The GitHub Actions job "Build" on rocketmq-clients.git has failed. Run started by GitHub user tsunghanjacktsai (triggered by tsunghanjacktsai). Head commit for run: e655b6bbf46c85b4114592849a049aaca41dac67 / tsaitsung-han.tht Add reentrant push consumer message receiving support for c# sdk Report URL: https://github.com/apache/rocketmq-clients/actions/runs/9851075418 With regards, GitHub Actions via GitBox
Re: [PR] [ISSUE #6795] feat: support to send transaction msg with lmqQueue property [rocketmq]
github-actions[bot] commented on PR #6796: URL: https://github.com/apache/rocketmq/pull/6796#issuecomment-2221744629 This PR is stale because it has been open for 365 days with no activity. It will be closed in 3 days if no further activity occurs. If you wish not to mark it as stale, please leave a comment in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [ISSUE #5716] fix possible negative index value [rocketmq]
github-actions[bot] commented on PR #5718: URL: https://github.com/apache/rocketmq/pull/5718#issuecomment-2221744665 This PR is stale because it has been open for 365 days with no activity. It will be closed in 3 days if no further activity occurs. If you wish not to mark it as stale, please leave a comment in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GH] (rocketmq): Workflow run "Snapshot Daily Release Automation" failed!
The GitHub Actions job "Snapshot Daily Release Automation" on rocketmq.git has failed. Run started by GitHub user lizhanhui (triggered by lizhanhui). Head commit for run: 67ddc1dae0548a5a2e413a2c7ef155b39a64f2b6 / yx9o [ISSUE #8375] Add more test coverage for MqClientAdminImpl (#8376) * Add more test coverage for MqClientAdminImpl Report URL: https://github.com/apache/rocketmq/actions/runs/9883373624 With regards, GitHub Actions via GitBox
[GH] (rocketmq-clients): Workflow run "Rust Coverage" failed!
The GitHub Actions job "Rust Coverage" on rocketmq-clients.git has failed. Run started by GitHub user lizhanhui (triggered by lizhanhui). Head commit for run: 25dd6abed56d3df3872f8bfc0e2579085aac98df / Zhanhui Li chore: update README doc, reflecting we are supporting push-consumer in Rust Signed-off-by: Zhanhui Li Report URL: https://github.com/apache/rocketmq-clients/actions/runs/9884284056 With regards, GitHub Actions via GitBox
Re: [PR] [rust] implement standard pushconsumer [rocketmq-clients]
lizhanhui commented on code in PR #767: URL: https://github.com/apache/rocketmq-clients/pull/767#discussion_r1673306010 ## rust/src/push_consumer.rs: ## @@ -0,0 +1,1303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use mockall::automock; +use mockall_double::double; +use parking_lot::{Mutex, RwLock}; +use prost_types::Duration; +use slog::Logger; +use slog::{debug, error, info, warn}; +use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; +use tokio::select; +use tokio::sync::{mpsc, oneshot}; + +#[double] +use crate::client::Client; +use crate::conf::{BackOffRetryPolicy, ClientOption, PushConsumerOption}; +use crate::error::{ClientError, ErrorKind}; +use crate::model::common::{ClientType, ConsumeResult, FilterExpression, MessageQueue}; +use crate::model::message::{AckMessageEntry, MessageView}; +use crate::pb::receive_message_response::Content; +use crate::pb::{ +AckMessageRequest, Assignment, ChangeInvisibleDurationRequest, QueryAssignmentRequest, +ReceiveMessageRequest, Resource, +}; +use crate::session::{RPCClient, Session}; +use crate::util::{build_endpoints_by_message_queue, build_push_consumer_settings}; +use crate::{log, pb}; + +const OPERATION_NEW_PUSH_CONSUMER: &str = "push_consumer.new"; +const OPERATION_RECEIVE_MESSAGE: &str = "push_consumer.receive_message"; +const OPERATION_ACK_MESSAGE: &str = "push_consumer.ack_message"; +const OPERATION_START_PUSH_CONSUMER: &str = "push_consumer.start"; +const OPERATION_CHANGE_INVISIBLE_DURATION: &str = "push_consumer.change_invisible_duration"; + +pub type MessageListener = Box ConsumeResult + Send + Sync>; Review Comment: Fn requires immutability, kind of excessively strict. Use FnMut? Here is a good reference to read: https://stackoverflow.com/questions/41081240/idiomatic-callbacks-in-rust -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [rust] implement standard pushconsumer [rocketmq-clients]
lizhanhui commented on code in PR #767: URL: https://github.com/apache/rocketmq-clients/pull/767#discussion_r1673324426 ## rust/src/push_consumer.rs: ## @@ -0,0 +1,1303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use mockall::automock; +use mockall_double::double; +use parking_lot::{Mutex, RwLock}; +use prost_types::Duration; +use slog::Logger; +use slog::{debug, error, info, warn}; +use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; +use tokio::select; +use tokio::sync::{mpsc, oneshot}; + +#[double] +use crate::client::Client; +use crate::conf::{BackOffRetryPolicy, ClientOption, PushConsumerOption}; +use crate::error::{ClientError, ErrorKind}; +use crate::model::common::{ClientType, ConsumeResult, FilterExpression, MessageQueue}; +use crate::model::message::{AckMessageEntry, MessageView}; +use crate::pb::receive_message_response::Content; +use crate::pb::{ +AckMessageRequest, Assignment, ChangeInvisibleDurationRequest, QueryAssignmentRequest, +ReceiveMessageRequest, Resource, +}; +use crate::session::{RPCClient, Session}; +use crate::util::{build_endpoints_by_message_queue, build_push_consumer_settings}; +use crate::{log, pb}; + +const OPERATION_NEW_PUSH_CONSUMER: &str = "push_consumer.new"; +const OPERATION_RECEIVE_MESSAGE: &str = "push_consumer.receive_message"; +const OPERATION_ACK_MESSAGE: &str = "push_consumer.ack_message"; +const OPERATION_START_PUSH_CONSUMER: &str = "push_consumer.start"; +const OPERATION_CHANGE_INVISIBLE_DURATION: &str = "push_consumer.change_invisible_duration"; + +pub type MessageListener = Box ConsumeResult + Send + Sync>; + +pub struct PushConsumer { +logger: Logger, +client: Client, +message_listener: Arc, +option: Arc>, +shutdown_tx: Option>, +} + +/* + * An actor is required for each message queue. + * It is responsible for polling messages from the message queue. It communicates with PushConsumer by channels. + */ +struct MessageQueueActor { +logger: Logger, +rpc_client: Session, +message_queue: MessageQueue, +shutdown_tx: Option>, +option: PushConsumerOption, +message_listener: Arc, +retry_policy: BackOffRetryPolicy, +} + +impl PushConsumer { +pub fn new( +client_option: ClientOption, +option: PushConsumerOption, +message_listener: MessageListener, +) -> Result { +if option.consumer_group().is_empty() { +return Err(ClientError::new( +ErrorKind::Config, +"consumer group is required.", +OPERATION_NEW_PUSH_CONSUMER, +)); +} +if option.subscription_expressions().is_empty() { +return Err(ClientError::new( +ErrorKind::Config, +"subscription expressions is required.", +OPERATION_NEW_PUSH_CONSUMER, +)); +} +let client_option = ClientOption { +client_type: ClientType::PushConsumer, +group: Some(option.consumer_group().to_string()), +..client_option +}; +let logger = log::logger(option.logging_format()); +let client = Client::new( +&logger, +client_option, +build_push_consumer_settings(&option), +)?; +Ok(Self { +logger, +client, +message_listener: Arc::new(message_listener), +option: Arc::new(RwLock::new(option)), +shutdown_tx: None, +}) +} + +pub async fn start(&mut self) -> Result<(), ClientError> { +let (telemetry_command_tx, mut telemetry_command_rx) = mpsc::channel(16); +self.client.start(telemetry_command_tx).await?; +let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); +self.shutdown_tx = Some(shutdown_tx); +let option = Arc::clone(&self.option); +let mut rpc_client = self.client.get_session().await?; +let route_manager = self.client.get_route_manager(); +let topics; +{ +topics = option +.read() +.subscription_expressions() +.keys() +.cloned() +.collect()
Re: [PR] [rust] implement standard pushconsumer [rocketmq-clients]
lizhanhui commented on code in PR #767: URL: https://github.com/apache/rocketmq-clients/pull/767#discussion_r1673324842 ## rust/src/push_consumer.rs: ## @@ -0,0 +1,1303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use mockall::automock; +use mockall_double::double; +use parking_lot::{Mutex, RwLock}; +use prost_types::Duration; +use slog::Logger; +use slog::{debug, error, info, warn}; +use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; +use tokio::select; +use tokio::sync::{mpsc, oneshot}; + +#[double] +use crate::client::Client; +use crate::conf::{BackOffRetryPolicy, ClientOption, PushConsumerOption}; +use crate::error::{ClientError, ErrorKind}; +use crate::model::common::{ClientType, ConsumeResult, FilterExpression, MessageQueue}; +use crate::model::message::{AckMessageEntry, MessageView}; +use crate::pb::receive_message_response::Content; +use crate::pb::{ +AckMessageRequest, Assignment, ChangeInvisibleDurationRequest, QueryAssignmentRequest, +ReceiveMessageRequest, Resource, +}; +use crate::session::{RPCClient, Session}; +use crate::util::{build_endpoints_by_message_queue, build_push_consumer_settings}; +use crate::{log, pb}; + +const OPERATION_NEW_PUSH_CONSUMER: &str = "push_consumer.new"; +const OPERATION_RECEIVE_MESSAGE: &str = "push_consumer.receive_message"; +const OPERATION_ACK_MESSAGE: &str = "push_consumer.ack_message"; +const OPERATION_START_PUSH_CONSUMER: &str = "push_consumer.start"; +const OPERATION_CHANGE_INVISIBLE_DURATION: &str = "push_consumer.change_invisible_duration"; + +pub type MessageListener = Box ConsumeResult + Send + Sync>; + +pub struct PushConsumer { +logger: Logger, +client: Client, +message_listener: Arc, +option: Arc>, +shutdown_tx: Option>, +} + +/* + * An actor is required for each message queue. + * It is responsible for polling messages from the message queue. It communicates with PushConsumer by channels. + */ +struct MessageQueueActor { +logger: Logger, +rpc_client: Session, +message_queue: MessageQueue, +shutdown_tx: Option>, +option: PushConsumerOption, +message_listener: Arc, +retry_policy: BackOffRetryPolicy, +} + +impl PushConsumer { +pub fn new( +client_option: ClientOption, +option: PushConsumerOption, +message_listener: MessageListener, +) -> Result { +if option.consumer_group().is_empty() { +return Err(ClientError::new( +ErrorKind::Config, +"consumer group is required.", +OPERATION_NEW_PUSH_CONSUMER, +)); +} +if option.subscription_expressions().is_empty() { +return Err(ClientError::new( +ErrorKind::Config, +"subscription expressions is required.", +OPERATION_NEW_PUSH_CONSUMER, +)); +} +let client_option = ClientOption { +client_type: ClientType::PushConsumer, +group: Some(option.consumer_group().to_string()), +..client_option +}; +let logger = log::logger(option.logging_format()); +let client = Client::new( +&logger, +client_option, +build_push_consumer_settings(&option), +)?; +Ok(Self { +logger, +client, +message_listener: Arc::new(message_listener), +option: Arc::new(RwLock::new(option)), +shutdown_tx: None, +}) +} + +pub async fn start(&mut self) -> Result<(), ClientError> { +let (telemetry_command_tx, mut telemetry_command_rx) = mpsc::channel(16); +self.client.start(telemetry_command_tx).await?; +let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); +self.shutdown_tx = Some(shutdown_tx); +let option = Arc::clone(&self.option); +let mut rpc_client = self.client.get_session().await?; +let route_manager = self.client.get_route_manager(); +let topics; +{ +topics = option +.read() +.subscription_expressions() +.keys() +.cloned() +.collect()
Re: [PR] [rust] implement standard pushconsumer [rocketmq-clients]
lizhanhui commented on code in PR #767: URL: https://github.com/apache/rocketmq-clients/pull/767#discussion_r1673327287 ## rust/src/push_consumer.rs: ## @@ -0,0 +1,1303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use mockall::automock; +use mockall_double::double; +use parking_lot::{Mutex, RwLock}; +use prost_types::Duration; +use slog::Logger; +use slog::{debug, error, info, warn}; +use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; +use tokio::select; +use tokio::sync::{mpsc, oneshot}; + +#[double] +use crate::client::Client; +use crate::conf::{BackOffRetryPolicy, ClientOption, PushConsumerOption}; +use crate::error::{ClientError, ErrorKind}; +use crate::model::common::{ClientType, ConsumeResult, FilterExpression, MessageQueue}; +use crate::model::message::{AckMessageEntry, MessageView}; +use crate::pb::receive_message_response::Content; +use crate::pb::{ +AckMessageRequest, Assignment, ChangeInvisibleDurationRequest, QueryAssignmentRequest, +ReceiveMessageRequest, Resource, +}; +use crate::session::{RPCClient, Session}; +use crate::util::{build_endpoints_by_message_queue, build_push_consumer_settings}; +use crate::{log, pb}; + +const OPERATION_NEW_PUSH_CONSUMER: &str = "push_consumer.new"; +const OPERATION_RECEIVE_MESSAGE: &str = "push_consumer.receive_message"; +const OPERATION_ACK_MESSAGE: &str = "push_consumer.ack_message"; +const OPERATION_START_PUSH_CONSUMER: &str = "push_consumer.start"; +const OPERATION_CHANGE_INVISIBLE_DURATION: &str = "push_consumer.change_invisible_duration"; + +pub type MessageListener = Box ConsumeResult + Send + Sync>; + +pub struct PushConsumer { +logger: Logger, +client: Client, +message_listener: Arc, +option: Arc>, +shutdown_tx: Option>, +} + +/* + * An actor is required for each message queue. + * It is responsible for polling messages from the message queue. It communicates with PushConsumer by channels. + */ +struct MessageQueueActor { +logger: Logger, +rpc_client: Session, +message_queue: MessageQueue, +shutdown_tx: Option>, +option: PushConsumerOption, +message_listener: Arc, +retry_policy: BackOffRetryPolicy, +} + +impl PushConsumer { +pub fn new( +client_option: ClientOption, +option: PushConsumerOption, +message_listener: MessageListener, +) -> Result { +if option.consumer_group().is_empty() { +return Err(ClientError::new( +ErrorKind::Config, +"consumer group is required.", +OPERATION_NEW_PUSH_CONSUMER, +)); +} +if option.subscription_expressions().is_empty() { +return Err(ClientError::new( +ErrorKind::Config, +"subscription expressions is required.", +OPERATION_NEW_PUSH_CONSUMER, +)); +} +let client_option = ClientOption { +client_type: ClientType::PushConsumer, +group: Some(option.consumer_group().to_string()), +..client_option +}; +let logger = log::logger(option.logging_format()); +let client = Client::new( +&logger, +client_option, +build_push_consumer_settings(&option), +)?; +Ok(Self { +logger, +client, +message_listener: Arc::new(message_listener), +option: Arc::new(RwLock::new(option)), +shutdown_tx: None, +}) +} + +pub async fn start(&mut self) -> Result<(), ClientError> { +let (telemetry_command_tx, mut telemetry_command_rx) = mpsc::channel(16); +self.client.start(telemetry_command_tx).await?; +let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); +self.shutdown_tx = Some(shutdown_tx); +let option = Arc::clone(&self.option); +let mut rpc_client = self.client.get_session().await?; +let route_manager = self.client.get_route_manager(); +let topics; +{ +topics = option +.read() +.subscription_expressions() +.keys() +.cloned() +.collect()
Re: [PR] [rust] implement standard pushconsumer [rocketmq-clients]
lizhanhui commented on code in PR #767: URL: https://github.com/apache/rocketmq-clients/pull/767#discussion_r1673330758 ## rust/src/push_consumer.rs: ## @@ -0,0 +1,1303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use mockall::automock; +use mockall_double::double; +use parking_lot::{Mutex, RwLock}; +use prost_types::Duration; +use slog::Logger; +use slog::{debug, error, info, warn}; +use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; +use tokio::select; +use tokio::sync::{mpsc, oneshot}; + +#[double] +use crate::client::Client; +use crate::conf::{BackOffRetryPolicy, ClientOption, PushConsumerOption}; +use crate::error::{ClientError, ErrorKind}; +use crate::model::common::{ClientType, ConsumeResult, FilterExpression, MessageQueue}; +use crate::model::message::{AckMessageEntry, MessageView}; +use crate::pb::receive_message_response::Content; +use crate::pb::{ +AckMessageRequest, Assignment, ChangeInvisibleDurationRequest, QueryAssignmentRequest, +ReceiveMessageRequest, Resource, +}; +use crate::session::{RPCClient, Session}; +use crate::util::{build_endpoints_by_message_queue, build_push_consumer_settings}; +use crate::{log, pb}; + +const OPERATION_NEW_PUSH_CONSUMER: &str = "push_consumer.new"; +const OPERATION_RECEIVE_MESSAGE: &str = "push_consumer.receive_message"; +const OPERATION_ACK_MESSAGE: &str = "push_consumer.ack_message"; +const OPERATION_START_PUSH_CONSUMER: &str = "push_consumer.start"; +const OPERATION_CHANGE_INVISIBLE_DURATION: &str = "push_consumer.change_invisible_duration"; + +pub type MessageListener = Box ConsumeResult + Send + Sync>; + +pub struct PushConsumer { +logger: Logger, +client: Client, +message_listener: Arc, +option: Arc>, +shutdown_tx: Option>, +} + +/* + * An actor is required for each message queue. + * It is responsible for polling messages from the message queue. It communicates with PushConsumer by channels. + */ +struct MessageQueueActor { +logger: Logger, +rpc_client: Session, +message_queue: MessageQueue, +shutdown_tx: Option>, +option: PushConsumerOption, +message_listener: Arc, +retry_policy: BackOffRetryPolicy, +} + +impl PushConsumer { +pub fn new( +client_option: ClientOption, +option: PushConsumerOption, +message_listener: MessageListener, +) -> Result { +if option.consumer_group().is_empty() { +return Err(ClientError::new( +ErrorKind::Config, +"consumer group is required.", +OPERATION_NEW_PUSH_CONSUMER, +)); +} +if option.subscription_expressions().is_empty() { +return Err(ClientError::new( +ErrorKind::Config, +"subscription expressions is required.", +OPERATION_NEW_PUSH_CONSUMER, +)); +} +let client_option = ClientOption { +client_type: ClientType::PushConsumer, +group: Some(option.consumer_group().to_string()), +..client_option +}; +let logger = log::logger(option.logging_format()); +let client = Client::new( +&logger, +client_option, +build_push_consumer_settings(&option), +)?; +Ok(Self { +logger, +client, +message_listener: Arc::new(message_listener), +option: Arc::new(RwLock::new(option)), +shutdown_tx: None, +}) +} + +pub async fn start(&mut self) -> Result<(), ClientError> { +let (telemetry_command_tx, mut telemetry_command_rx) = mpsc::channel(16); +self.client.start(telemetry_command_tx).await?; +let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); +self.shutdown_tx = Some(shutdown_tx); +let option = Arc::clone(&self.option); +let mut rpc_client = self.client.get_session().await?; +let route_manager = self.client.get_route_manager(); +let topics; +{ +topics = option +.read() +.subscription_expressions() +.keys() +.cloned() +.collect()
Re: [PR] [rust] implement standard pushconsumer [rocketmq-clients]
lizhanhui commented on code in PR #767: URL: https://github.com/apache/rocketmq-clients/pull/767#discussion_r167147 ## rust/src/push_consumer.rs: ## @@ -0,0 +1,1303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use mockall::automock; +use mockall_double::double; +use parking_lot::{Mutex, RwLock}; +use prost_types::Duration; +use slog::Logger; +use slog::{debug, error, info, warn}; +use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; +use tokio::select; +use tokio::sync::{mpsc, oneshot}; + +#[double] +use crate::client::Client; +use crate::conf::{BackOffRetryPolicy, ClientOption, PushConsumerOption}; +use crate::error::{ClientError, ErrorKind}; +use crate::model::common::{ClientType, ConsumeResult, FilterExpression, MessageQueue}; +use crate::model::message::{AckMessageEntry, MessageView}; +use crate::pb::receive_message_response::Content; +use crate::pb::{ +AckMessageRequest, Assignment, ChangeInvisibleDurationRequest, QueryAssignmentRequest, +ReceiveMessageRequest, Resource, +}; +use crate::session::{RPCClient, Session}; +use crate::util::{build_endpoints_by_message_queue, build_push_consumer_settings}; +use crate::{log, pb}; + +const OPERATION_NEW_PUSH_CONSUMER: &str = "push_consumer.new"; +const OPERATION_RECEIVE_MESSAGE: &str = "push_consumer.receive_message"; +const OPERATION_ACK_MESSAGE: &str = "push_consumer.ack_message"; +const OPERATION_START_PUSH_CONSUMER: &str = "push_consumer.start"; +const OPERATION_CHANGE_INVISIBLE_DURATION: &str = "push_consumer.change_invisible_duration"; + +pub type MessageListener = Box ConsumeResult + Send + Sync>; + +pub struct PushConsumer { +logger: Logger, +client: Client, +message_listener: Arc, +option: Arc>, +shutdown_tx: Option>, +} + +/* + * An actor is required for each message queue. + * It is responsible for polling messages from the message queue. It communicates with PushConsumer by channels. + */ +struct MessageQueueActor { +logger: Logger, +rpc_client: Session, +message_queue: MessageQueue, +shutdown_tx: Option>, +option: PushConsumerOption, +message_listener: Arc, +retry_policy: BackOffRetryPolicy, +} + +impl PushConsumer { +pub fn new( +client_option: ClientOption, +option: PushConsumerOption, +message_listener: MessageListener, +) -> Result { +if option.consumer_group().is_empty() { +return Err(ClientError::new( +ErrorKind::Config, +"consumer group is required.", +OPERATION_NEW_PUSH_CONSUMER, +)); +} +if option.subscription_expressions().is_empty() { +return Err(ClientError::new( +ErrorKind::Config, +"subscription expressions is required.", +OPERATION_NEW_PUSH_CONSUMER, +)); +} +let client_option = ClientOption { +client_type: ClientType::PushConsumer, +group: Some(option.consumer_group().to_string()), +..client_option +}; +let logger = log::logger(option.logging_format()); +let client = Client::new( +&logger, +client_option, +build_push_consumer_settings(&option), +)?; +Ok(Self { +logger, +client, +message_listener: Arc::new(message_listener), +option: Arc::new(RwLock::new(option)), +shutdown_tx: None, +}) +} + +pub async fn start(&mut self) -> Result<(), ClientError> { +let (telemetry_command_tx, mut telemetry_command_rx) = mpsc::channel(16); +self.client.start(telemetry_command_tx).await?; +let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); +self.shutdown_tx = Some(shutdown_tx); +let option = Arc::clone(&self.option); +let mut rpc_client = self.client.get_session().await?; +let route_manager = self.client.get_route_manager(); +let topics; +{ +topics = option +.read() +.subscription_expressions() +.keys() +.cloned() +.collect()
Re: [PR] [rust] implement standard pushconsumer [rocketmq-clients]
lizhanhui commented on code in PR #767: URL: https://github.com/apache/rocketmq-clients/pull/767#discussion_r167147 ## rust/src/push_consumer.rs: ## @@ -0,0 +1,1303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use mockall::automock; +use mockall_double::double; +use parking_lot::{Mutex, RwLock}; +use prost_types::Duration; +use slog::Logger; +use slog::{debug, error, info, warn}; +use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; +use tokio::select; +use tokio::sync::{mpsc, oneshot}; + +#[double] +use crate::client::Client; +use crate::conf::{BackOffRetryPolicy, ClientOption, PushConsumerOption}; +use crate::error::{ClientError, ErrorKind}; +use crate::model::common::{ClientType, ConsumeResult, FilterExpression, MessageQueue}; +use crate::model::message::{AckMessageEntry, MessageView}; +use crate::pb::receive_message_response::Content; +use crate::pb::{ +AckMessageRequest, Assignment, ChangeInvisibleDurationRequest, QueryAssignmentRequest, +ReceiveMessageRequest, Resource, +}; +use crate::session::{RPCClient, Session}; +use crate::util::{build_endpoints_by_message_queue, build_push_consumer_settings}; +use crate::{log, pb}; + +const OPERATION_NEW_PUSH_CONSUMER: &str = "push_consumer.new"; +const OPERATION_RECEIVE_MESSAGE: &str = "push_consumer.receive_message"; +const OPERATION_ACK_MESSAGE: &str = "push_consumer.ack_message"; +const OPERATION_START_PUSH_CONSUMER: &str = "push_consumer.start"; +const OPERATION_CHANGE_INVISIBLE_DURATION: &str = "push_consumer.change_invisible_duration"; + +pub type MessageListener = Box ConsumeResult + Send + Sync>; + +pub struct PushConsumer { +logger: Logger, +client: Client, +message_listener: Arc, +option: Arc>, +shutdown_tx: Option>, +} + +/* + * An actor is required for each message queue. + * It is responsible for polling messages from the message queue. It communicates with PushConsumer by channels. + */ +struct MessageQueueActor { +logger: Logger, +rpc_client: Session, +message_queue: MessageQueue, +shutdown_tx: Option>, +option: PushConsumerOption, +message_listener: Arc, +retry_policy: BackOffRetryPolicy, +} + +impl PushConsumer { +pub fn new( +client_option: ClientOption, +option: PushConsumerOption, +message_listener: MessageListener, +) -> Result { +if option.consumer_group().is_empty() { +return Err(ClientError::new( +ErrorKind::Config, +"consumer group is required.", +OPERATION_NEW_PUSH_CONSUMER, +)); +} +if option.subscription_expressions().is_empty() { +return Err(ClientError::new( +ErrorKind::Config, +"subscription expressions is required.", +OPERATION_NEW_PUSH_CONSUMER, +)); +} +let client_option = ClientOption { +client_type: ClientType::PushConsumer, +group: Some(option.consumer_group().to_string()), +..client_option +}; +let logger = log::logger(option.logging_format()); +let client = Client::new( +&logger, +client_option, +build_push_consumer_settings(&option), +)?; +Ok(Self { +logger, +client, +message_listener: Arc::new(message_listener), +option: Arc::new(RwLock::new(option)), +shutdown_tx: None, +}) +} + +pub async fn start(&mut self) -> Result<(), ClientError> { +let (telemetry_command_tx, mut telemetry_command_rx) = mpsc::channel(16); +self.client.start(telemetry_command_tx).await?; +let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); +self.shutdown_tx = Some(shutdown_tx); +let option = Arc::clone(&self.option); +let mut rpc_client = self.client.get_session().await?; +let route_manager = self.client.get_route_manager(); +let topics; +{ +topics = option +.read() +.subscription_expressions() +.keys() +.cloned() +.collect()
[GH] (rocketmq): Workflow run "PR-CI" failed!
The GitHub Actions job "PR-CI" on rocketmq.git has failed. Run started by GitHub user qianye1001 (triggered by qianye1001). Head commit for run: 0f33126aa50a5fc67c212f20e94bdd236dd552b7 / qianye Merge remote-tracking branch 'origin/Enhancement#8365' into Enhancement#8365 # Conflicts: # client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java # client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java Report URL: https://github.com/apache/rocketmq/actions/runs/9884852631 With regards, GitHub Actions via GitBox
[GH] (rocketmq): Workflow run "E2E test for pull request" failed!
The GitHub Actions job "E2E test for pull request" on rocketmq.git has failed. Run started by GitHub user qianye1001 (triggered by qianye1001). Head commit for run: 67ddc1dae0548a5a2e413a2c7ef155b39a64f2b6 / yx9o [ISSUE #8375] Add more test coverage for MqClientAdminImpl (#8376) * Add more test coverage for MqClientAdminImpl Report URL: https://github.com/apache/rocketmq/actions/runs/9884860440 With regards, GitHub Actions via GitBox
[GH] (rocketmq): Workflow run "Build and Run Tests by Bazel" failed!
The GitHub Actions job "Build and Run Tests by Bazel" on rocketmq.git has failed. Run started by GitHub user qianye1001 (triggered by qianye1001). Head commit for run: 0f33126aa50a5fc67c212f20e94bdd236dd552b7 / qianye Merge remote-tracking branch 'origin/Enhancement#8365' into Enhancement#8365 # Conflicts: # client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java # client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java Report URL: https://github.com/apache/rocketmq/actions/runs/9884852629 With regards, GitHub Actions via GitBox
[GH] (rocketmq): Workflow run "Build and Run Tests by Maven" failed!
The GitHub Actions job "Build and Run Tests by Maven" on rocketmq.git has failed. Run started by GitHub user qianye1001 (triggered by qianye1001). Head commit for run: 0f33126aa50a5fc67c212f20e94bdd236dd552b7 / qianye Merge remote-tracking branch 'origin/Enhancement#8365' into Enhancement#8365 # Conflicts: # client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java # client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java Report URL: https://github.com/apache/rocketmq/actions/runs/9884852628 With regards, GitHub Actions via GitBox
[GH] (rocketmq): Workflow run "PR-CI" is working again!
The GitHub Actions job "PR-CI" on rocketmq.git has succeeded. Run started by GitHub user qianye1001 (triggered by qianye1001). Head commit for run: 181be3017a8f91b722d731ebbd36c572c0baaa2c / qianye fix style Report URL: https://github.com/apache/rocketmq/actions/runs/9884901042 With regards, GitHub Actions via GitBox
[GH] (rocketmq): Workflow run "Build and Run Tests by Bazel" is working again!
The GitHub Actions job "Build and Run Tests by Bazel" on rocketmq.git has succeeded. Run started by GitHub user qianye1001 (triggered by qianye1001). Head commit for run: 181be3017a8f91b722d731ebbd36c572c0baaa2c / qianye fix style Report URL: https://github.com/apache/rocketmq/actions/runs/9884901041 With regards, GitHub Actions via GitBox
[GH] (rocketmq): Workflow run "Coverage" failed!
The GitHub Actions job "Coverage" on rocketmq.git has failed. Run started by GitHub user qianye1001 (triggered by qianye1001). Head commit for run: 0f33126aa50a5fc67c212f20e94bdd236dd552b7 / qianye Merge remote-tracking branch 'origin/Enhancement#8365' into Enhancement#8365 # Conflicts: # client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java # client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java Report URL: https://github.com/apache/rocketmq/actions/runs/9884852627 With regards, GitHub Actions via GitBox
[GH] (rocketmq): Workflow run "E2E test for pull request" failed!
The GitHub Actions job "E2E test for pull request" on rocketmq.git has failed. Run started by GitHub user qianye1001 (triggered by qianye1001). Head commit for run: 67ddc1dae0548a5a2e413a2c7ef155b39a64f2b6 / yx9o [ISSUE #8375] Add more test coverage for MqClientAdminImpl (#8376) * Add more test coverage for MqClientAdminImpl Report URL: https://github.com/apache/rocketmq/actions/runs/9884922377 With regards, GitHub Actions via GitBox
Re: [PR] [ISSUE #8366] Eliminate deadlocks during the client shutdown process. [rocketmq]
YanYunyang commented on PR #8367: URL: https://github.com/apache/rocketmq/pull/8367#issuecomment-2221953973 > https://github.com/apache/rocketmq/blob/32f0e8481d1f5672b8d494de7607fe5b846ffa51/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java#L479-L484 > > It seems that this piece of code also may lead to deadlock. Here is the same channelWrapper. The lock in channelWrapper is reentrant. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [ISSUE #8366] Eliminate deadlocks during the client shutdown process. [rocketmq]
YanYunyang commented on PR #8367: URL: https://github.com/apache/rocketmq/pull/8367#issuecomment-2221957851 > There are two closeChannel method. > > This piece of code also needs modification. > > https://github.com/apache/rocketmq/blob/32f0e8481d1f5672b8d494de7607fe5b846ffa51/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java#L424-L428 Here it needs to be changed to `isWrapperOf` to match the meaning. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GH] (rocketmq): Workflow run "Coverage" is working again!
The GitHub Actions job "Coverage" on rocketmq.git has succeeded. Run started by GitHub user qianye1001 (triggered by qianye1001). Head commit for run: 181be3017a8f91b722d731ebbd36c572c0baaa2c / qianye fix style Report URL: https://github.com/apache/rocketmq/actions/runs/9884901036 With regards, GitHub Actions via GitBox
[GH] (rocketmq): Workflow run "Build and Run Tests by Maven" is working again!
The GitHub Actions job "Build and Run Tests by Maven" on rocketmq.git has succeeded. Run started by GitHub user qianye1001 (triggered by qianye1001). Head commit for run: 181be3017a8f91b722d731ebbd36c572c0baaa2c / qianye fix style Report URL: https://github.com/apache/rocketmq/actions/runs/9884901047 With regards, GitHub Actions via GitBox
Re: [PR] [ISSUE #8366] Eliminate deadlocks during the client shutdown process. [rocketmq]
drpmma commented on PR #8367: URL: https://github.com/apache/rocketmq/pull/8367#issuecomment-100844 > https://github.com/apache/rocketmq/blob/32f0e8481d1f5672b8d494de7607fe5b846ffa51/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java#L479-L484 > > It seems that this piece of code also may lead to deadlock. how about tryClose method which contains lock.readLock().lock(), it also may lead to deadlock. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [ISSUE #8058] Support for upgrading metadata in json to rocksdb (#8045) [rocketmq]
LetLetMe closed pull request #8116: [ISSUE #8058] Support for upgrading metadata in json to rocksdb (#8045) URL: https://github.com/apache/rocketmq/pull/8116 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GH] (rocketmq): Workflow run "E2E test for pull request" failed!
The GitHub Actions job "E2E test for pull request" on rocketmq.git has failed. Run started by GitHub user YanYunyang (triggered by YanYunyang). Head commit for run: 67ddc1dae0548a5a2e413a2c7ef155b39a64f2b6 / yx9o [ISSUE #8375] Add more test coverage for MqClientAdminImpl (#8376) * Add more test coverage for MqClientAdminImpl Report URL: https://github.com/apache/rocketmq/actions/runs/9886405353 With regards, GitHub Actions via GitBox
Re: [PR] [ISSUE #8366] Eliminate deadlocks during the client shutdown process. [rocketmq]
codecov-commenter commented on PR #8367: URL: https://github.com/apache/rocketmq/pull/8367#issuecomment-144132 ## [Codecov](https://app.codecov.io/gh/apache/rocketmq/pull/8367?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report Attention: Patch coverage is `75.0%` with `2 lines` in your changes missing coverage. Please review. > Project coverage is 44.25%. Comparing base [(`77d6633`)](https://app.codecov.io/gh/apache/rocketmq/commit/77d6633e622200ed52feef6fb2adeb12fba8e2c8?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) to head [(`36d561f`)](https://app.codecov.io/gh/apache/rocketmq/commit/36d561f118de4eee06a9bea720dc1fae44d42e72?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). > Report is 5 commits behind head on develop. | [Files](https://app.codecov.io/gh/apache/rocketmq/pull/8367?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines | |---|---|---| | [...e/rocketmq/remoting/netty/NettyRemotingClient.java](https://app.codecov.io/gh/apache/rocketmq/pull/8367?src=pr&el=tree&filepath=remoting%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Frocketmq%2Fremoting%2Fnetty%2FNettyRemotingClient.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cmVtb3Rpbmcvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3JlbW90aW5nL25ldHR5L05ldHR5UmVtb3RpbmdDbGllbnQuamF2YQ==) | 75.00% | [1 Missing and 1 partial :warning: ](https://app.codecov.io/gh/apache/rocketmq/pull/8367?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Additional details and impacted files ```diff @@ Coverage Diff @@ ## develop#8367 +/- ## = + Coverage 43.88% 44.25% +0.37% - Complexity 1064710742 +95 = Files 1274 1274 Lines 8893988941 +2 Branches 1143211432 = + Hits 3903339365 +332 + Misses 4498544618 -367 - Partials4921 4958 +37 ``` [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/rocketmq/pull/8367?dropdown=coverage&src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). :loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [rust] implement standard pushconsumer [rocketmq-clients]
glcrazier commented on code in PR #767: URL: https://github.com/apache/rocketmq-clients/pull/767#discussion_r1673495957 ## rust/src/push_consumer.rs: ## @@ -0,0 +1,1303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use mockall::automock; +use mockall_double::double; +use parking_lot::{Mutex, RwLock}; +use prost_types::Duration; +use slog::Logger; +use slog::{debug, error, info, warn}; +use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; +use tokio::select; +use tokio::sync::{mpsc, oneshot}; + +#[double] +use crate::client::Client; +use crate::conf::{BackOffRetryPolicy, ClientOption, PushConsumerOption}; +use crate::error::{ClientError, ErrorKind}; +use crate::model::common::{ClientType, ConsumeResult, FilterExpression, MessageQueue}; +use crate::model::message::{AckMessageEntry, MessageView}; +use crate::pb::receive_message_response::Content; +use crate::pb::{ +AckMessageRequest, Assignment, ChangeInvisibleDurationRequest, QueryAssignmentRequest, +ReceiveMessageRequest, Resource, +}; +use crate::session::{RPCClient, Session}; +use crate::util::{build_endpoints_by_message_queue, build_push_consumer_settings}; +use crate::{log, pb}; + +const OPERATION_NEW_PUSH_CONSUMER: &str = "push_consumer.new"; +const OPERATION_RECEIVE_MESSAGE: &str = "push_consumer.receive_message"; +const OPERATION_ACK_MESSAGE: &str = "push_consumer.ack_message"; +const OPERATION_START_PUSH_CONSUMER: &str = "push_consumer.start"; +const OPERATION_CHANGE_INVISIBLE_DURATION: &str = "push_consumer.change_invisible_duration"; + +pub type MessageListener = Box ConsumeResult + Send + Sync>; Review Comment: The FnMut requires exclusive access in threads which means incapable of concurrent consumption. Do you have any idea? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GH] (rocketmq): Workflow run "Build and Run Tests by Maven" failed!
The GitHub Actions job "Build and Run Tests by Maven" on rocketmq.git has failed. Run started by GitHub user YanYunyang (triggered by drpmma). Head commit for run: 36d561f118de4eee06a9bea720dc1fae44d42e72 / YanYunyang <313169...@qq.com> [ISSUE #8366] Compare channels for equality using `isWrapperOf`. Report URL: https://github.com/apache/rocketmq/actions/runs/9885151578 With regards, GitHub Actions via GitBox
Re: [PR] [ISSUE #8365] add remoting client non-oneway updateConsumerOffset function [rocketmq]
qianye1001 closed pull request #8368: [ISSUE #8365] add remoting client non-oneway updateConsumerOffset function URL: https://github.com/apache/rocketmq/pull/8368 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org