aaron-ai commented on code in PR #588: URL: https://github.com/apache/rocketmq-clients/pull/588#discussion_r1304202245
########## python/rocketmq/simple_consumer.py: ########## @@ -0,0 +1,280 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import random +import re +import threading +from datetime import timedelta +from threading import Lock +from typing import Dict + +import rocketmq +from rocketmq.client_config import ClientConfig +from rocketmq.consumer import Consumer +from rocketmq.definition import PermissionHelper +from rocketmq.filter_expression import FilterExpression +from rocketmq.log import logger +from rocketmq.message import MessageView +from rocketmq.protocol.definition_pb2 import Resource +from rocketmq.protocol.definition_pb2 import Resource as ProtoResource +from rocketmq.protocol.service_pb2 import \ + AckMessageEntry as ProtoAckMessageEntry +from rocketmq.protocol.service_pb2 import \ + AckMessageRequest as ProtoAckMessageRequest +from rocketmq.rpc_client import Endpoints +from rocketmq.session_credentials import (SessionCredentials, + SessionCredentialsProvider) +from rocketmq.simple_subscription_settings import SimpleSubscriptionSettings +from utils import get_positive_mod + + +class SubscriptionLoadBalancer: + """This class serves as a load balancer for message subscription. + It keeps track of a rotating index to help distribute the load evenly. + """ + + def __init__(self, topic_route_data): + #: current index for message queue selection + self._index = random.randint(0, 10000) # assuming a range of 0-10000 + #: thread lock to ensure atomic update to the index + self._index_lock = threading.Lock() + + #: filter the message queues which are readable and from the master broker + self._message_queues = [ + mq for mq in topic_route_data.message_queues + if PermissionHelper().is_readable(mq.permission) + and mq.broker.id == rocketmq.utils.master_broker_id + ] + + def update(self, topic_route_data): + """Updates the message queues based on the new topic route data.""" + self._index += 1 + self._message_queues = [ + mq for mq in topic_route_data.message_queues + if PermissionHelper().is_readable(mq.permission) + and mq.broker.id == rocketmq.utils.master_broker_id + ] + return self + + def take_message_queue(self): + """Fetches the next message queue based on the current index.""" + with self._index_lock: + index = get_positive_mod(self._index, len(self._message_queues)) + self._index += 1 + return self._message_queues[index] + + +class SimpleConsumer(Consumer): + """The SimpleConsumer class extends the Client class and is used to consume + messages from specific topics in RocketMQ. + """ + + def __init__(self, client_config: ClientConfig, consumer_group: str, await_duration: int, subscription_expressions: Dict[str, FilterExpression]): + """Create a new SimpleConsumer. + + :param client_config: The configuration for the client. + :param consumer_group: The consumer group. + :param await_duration: The await duration. + :param subscription_expressions: The subscription expressions. + """ + super().__init__(client_config, consumer_group) + + self._consumer_group = consumer_group + self._await_duration = await_duration + self._subscription_expressions = subscription_expressions + + self._simple_subscription_settings = SimpleSubscriptionSettings(self.client_id, self.endpoints, self._consumer_group, timedelta(seconds=10), 10, self._subscription_expressions) + self._subscription_route_data_cache = {} + self._topic_round_robin_index = 0 + self._state_lock = Lock() + self._state = "INIT" + self._subscription_load_balancer = {} # A dictionary to keep subscription load balancers + + def get_topics(self): + return set(self._subscription_expressions.keys()) + + def get_settings(self): + return self._simple_subscription_settings + + async def start(self): + """Start the RocketMQ consumer and log the operation.""" + logger.info(f"Begin to start the rocketmq consumer, client_id={self.client_id}") + with self._state_lock: + if self._state != "INIT": + raise Exception("Consumer already started") + await super().start() + # Start all necessary operations + self._state = "RUNNING" + logger.info(f"The rocketmq consumer starts successfully, client_id={self.client_id}") + + async def shutdown(self): + """Shutdown the RocketMQ consumer and log the operation.""" + logger.info(f"Begin to shutdown the rocketmq consumer, client_id={self.client_id}") + with self._state_lock: + if self._state != "RUNNING": + raise Exception("Consumer is not running") + # Shutdown all necessary operations + self._state = "SHUTDOWN" + await super().shutdown() + logger.info(f"Shutdown the rocketmq consumer successfully, client_id={self.client_id}") + + def update_subscription_load_balancer(self, topic, topic_route_data): + # 如果订阅路由数据缓存中已经存在该主题的负载均衡器,就更新它 Review Comment: Use english here. ########## python/rocketmq/simple_consumer.py: ########## @@ -0,0 +1,280 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import random +import re +import threading +from datetime import timedelta +from threading import Lock +from typing import Dict + +import rocketmq +from rocketmq.client_config import ClientConfig +from rocketmq.consumer import Consumer +from rocketmq.definition import PermissionHelper +from rocketmq.filter_expression import FilterExpression +from rocketmq.log import logger +from rocketmq.message import MessageView +from rocketmq.protocol.definition_pb2 import Resource +from rocketmq.protocol.definition_pb2 import Resource as ProtoResource +from rocketmq.protocol.service_pb2 import \ + AckMessageEntry as ProtoAckMessageEntry +from rocketmq.protocol.service_pb2 import \ + AckMessageRequest as ProtoAckMessageRequest +from rocketmq.rpc_client import Endpoints +from rocketmq.session_credentials import (SessionCredentials, + SessionCredentialsProvider) +from rocketmq.simple_subscription_settings import SimpleSubscriptionSettings +from utils import get_positive_mod + + +class SubscriptionLoadBalancer: + """This class serves as a load balancer for message subscription. + It keeps track of a rotating index to help distribute the load evenly. + """ + + def __init__(self, topic_route_data): + #: current index for message queue selection + self._index = random.randint(0, 10000) # assuming a range of 0-10000 + #: thread lock to ensure atomic update to the index + self._index_lock = threading.Lock() + + #: filter the message queues which are readable and from the master broker + self._message_queues = [ + mq for mq in topic_route_data.message_queues + if PermissionHelper().is_readable(mq.permission) + and mq.broker.id == rocketmq.utils.master_broker_id + ] + + def update(self, topic_route_data): + """Updates the message queues based on the new topic route data.""" + self._index += 1 + self._message_queues = [ + mq for mq in topic_route_data.message_queues + if PermissionHelper().is_readable(mq.permission) + and mq.broker.id == rocketmq.utils.master_broker_id + ] + return self + + def take_message_queue(self): + """Fetches the next message queue based on the current index.""" + with self._index_lock: + index = get_positive_mod(self._index, len(self._message_queues)) + self._index += 1 + return self._message_queues[index] + + +class SimpleConsumer(Consumer): + """The SimpleConsumer class extends the Client class and is used to consume + messages from specific topics in RocketMQ. + """ + + def __init__(self, client_config: ClientConfig, consumer_group: str, await_duration: int, subscription_expressions: Dict[str, FilterExpression]): + """Create a new SimpleConsumer. + + :param client_config: The configuration for the client. + :param consumer_group: The consumer group. + :param await_duration: The await duration. + :param subscription_expressions: The subscription expressions. + """ + super().__init__(client_config, consumer_group) + + self._consumer_group = consumer_group + self._await_duration = await_duration + self._subscription_expressions = subscription_expressions + + self._simple_subscription_settings = SimpleSubscriptionSettings(self.client_id, self.endpoints, self._consumer_group, timedelta(seconds=10), 10, self._subscription_expressions) + self._subscription_route_data_cache = {} + self._topic_round_robin_index = 0 + self._state_lock = Lock() + self._state = "INIT" + self._subscription_load_balancer = {} # A dictionary to keep subscription load balancers + + def get_topics(self): + return set(self._subscription_expressions.keys()) + + def get_settings(self): + return self._simple_subscription_settings + + async def start(self): + """Start the RocketMQ consumer and log the operation.""" + logger.info(f"Begin to start the rocketmq consumer, client_id={self.client_id}") + with self._state_lock: + if self._state != "INIT": + raise Exception("Consumer already started") + await super().start() + # Start all necessary operations + self._state = "RUNNING" + logger.info(f"The rocketmq consumer starts successfully, client_id={self.client_id}") + + async def shutdown(self): + """Shutdown the RocketMQ consumer and log the operation.""" + logger.info(f"Begin to shutdown the rocketmq consumer, client_id={self.client_id}") + with self._state_lock: + if self._state != "RUNNING": + raise Exception("Consumer is not running") + # Shutdown all necessary operations + self._state = "SHUTDOWN" + await super().shutdown() + logger.info(f"Shutdown the rocketmq consumer successfully, client_id={self.client_id}") + + def update_subscription_load_balancer(self, topic, topic_route_data): + # 如果订阅路由数据缓存中已经存在该主题的负载均衡器,就更新它 + subscription_load_balancer = self._subscription_route_data_cache.get(topic) + if subscription_load_balancer: + subscription_load_balancer.update(topic_route_data) + # 否则,新建一个订阅负载均衡器 + else: + subscription_load_balancer = SubscriptionLoadBalancer(topic_route_data) + + # 将新的或更新后的订阅负载均衡器存入订阅路由数据缓存 + self._subscription_route_data_cache[topic] = subscription_load_balancer + return subscription_load_balancer + + async def get_subscription_load_balancer(self, topic): + # 如果订阅路由数据缓存中已经存在该主题的负载均衡器,就返回它 + subscription_load_balancer = self._subscription_route_data_cache.get(topic) + if subscription_load_balancer: + return subscription_load_balancer + + # 否则,获取主题的路由数据 + topic_route_data = await self.get_route_data(topic) + # 更新订阅负载均衡器 + return self.update_subscription_load_balancer(topic, topic_route_data) + + async def receive(self, max_message_num, invisible_duration): + if self._state != "RUNNING": + raise Exception("Simple consumer is not running") + if max_message_num <= 0: + raise Exception("maxMessageNum must be greater than 0") + copy = dict(self._subscription_expressions) + topics = list(copy.keys()) + if len(topics) == 0: + raise ValueError("There is no topic to receive message") + + index = (self._topic_round_robin_index + 1) % len(topics) + self._topic_round_robin_index = index + topic = topics[index] + filter_expression = self._subscription_expressions[topic] + subscription_load_balancer = await self.get_subscription_load_balancer(topic) + mq = subscription_load_balancer.take_message_queue() + request = self.wrap_receive_message_request(max_message_num, mq, filter_expression, self._await_duration, invisible_duration) + result = await self.receive_message(request, mq, self._await_duration) + return result.messages + + async def change_invisible_duration(self, message_view, invisible_duration): + if self._state != "RUNNING": + raise Exception("Simple consumer is not running") Review Comment: What will happen if the state is `RUNNING`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org