aaron-ai commented on code in PR #588: URL: https://github.com/apache/rocketmq-clients/pull/588#discussion_r1304208429
########## python/rocketmq/simple_consumer.py: ########## @@ -0,0 +1,280 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import random +import re +import threading +from datetime import timedelta +from threading import Lock +from typing import Dict + +import rocketmq +from rocketmq.client_config import ClientConfig +from rocketmq.consumer import Consumer +from rocketmq.definition import PermissionHelper +from rocketmq.filter_expression import FilterExpression +from rocketmq.log import logger +from rocketmq.message import MessageView +from rocketmq.protocol.definition_pb2 import Resource +from rocketmq.protocol.definition_pb2 import Resource as ProtoResource +from rocketmq.protocol.service_pb2 import \ + AckMessageEntry as ProtoAckMessageEntry +from rocketmq.protocol.service_pb2 import \ + AckMessageRequest as ProtoAckMessageRequest +from rocketmq.rpc_client import Endpoints +from rocketmq.session_credentials import (SessionCredentials, + SessionCredentialsProvider) +from rocketmq.simple_subscription_settings import SimpleSubscriptionSettings +from utils import get_positive_mod + + +class SubscriptionLoadBalancer: + """This class serves as a load balancer for message subscription. + It keeps track of a rotating index to help distribute the load evenly. + """ + + def __init__(self, topic_route_data): + #: current index for message queue selection + self._index = random.randint(0, 10000) # assuming a range of 0-10000 + #: thread lock to ensure atomic update to the index + self._index_lock = threading.Lock() + + #: filter the message queues which are readable and from the master broker + self._message_queues = [ + mq for mq in topic_route_data.message_queues + if PermissionHelper().is_readable(mq.permission) + and mq.broker.id == rocketmq.utils.master_broker_id + ] + + def update(self, topic_route_data): + """Updates the message queues based on the new topic route data.""" + self._index += 1 + self._message_queues = [ + mq for mq in topic_route_data.message_queues + if PermissionHelper().is_readable(mq.permission) + and mq.broker.id == rocketmq.utils.master_broker_id + ] + return self + + def take_message_queue(self): + """Fetches the next message queue based on the current index.""" + with self._index_lock: + index = get_positive_mod(self._index, len(self._message_queues)) + self._index += 1 + return self._message_queues[index] + + +class SimpleConsumer(Consumer): + """The SimpleConsumer class extends the Client class and is used to consume + messages from specific topics in RocketMQ. + """ + + def __init__(self, client_config: ClientConfig, consumer_group: str, await_duration: int, subscription_expressions: Dict[str, FilterExpression]): + """Create a new SimpleConsumer. + + :param client_config: The configuration for the client. + :param consumer_group: The consumer group. + :param await_duration: The await duration. + :param subscription_expressions: The subscription expressions. + """ + super().__init__(client_config, consumer_group) + + self._consumer_group = consumer_group + self._await_duration = await_duration + self._subscription_expressions = subscription_expressions + + self._simple_subscription_settings = SimpleSubscriptionSettings(self.client_id, self.endpoints, self._consumer_group, timedelta(seconds=10), 10, self._subscription_expressions) + self._subscription_route_data_cache = {} + self._topic_round_robin_index = 0 + self._state_lock = Lock() + self._state = "INIT" Review Comment: Could we use an enum rather than string? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org