This is an automated email from the ASF dual-hosted git repository. lollipop pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push: new 04251964 python 5.0.3 (#937) 04251964 is described below commit 0425196498f9936013c13d6408159ae106e9096d Author: zhouli11 <04081...@163.com> AuthorDate: Tue Feb 11 13:58:00 2025 +0800 python 5.0.3 (#937) * update to 5.0.3 --- .../rocketmq/v5/client/balancer/queue_selector.py | 44 +++- python/rocketmq/v5/client/client.py | 235 ++++++++++++------ python/rocketmq/v5/client/client_configuration.py | 17 +- .../rocketmq/v5/client/connection/rpc_channel.py | 79 ++++-- python/rocketmq/v5/client/connection/rpc_client.py | 275 ++++++++++++++------- .../rocketmq/v5/client/metrics/client_metrics.py | 60 +++-- python/rocketmq/v5/consumer/simple_consumer.py | 193 +++++++++++---- python/rocketmq/v5/exception/client_exception.py | 1 + python/rocketmq/v5/log/log_config.py | 38 ++- python/rocketmq/v5/model/callback_result.py | 24 +- python/rocketmq/v5/model/filter_expression.py | 6 +- python/rocketmq/v5/model/message.py | 57 +++-- python/rocketmq/v5/model/metrics.py | 10 +- python/rocketmq/v5/model/send_receipt.py | 1 + python/rocketmq/v5/model/topic_route.py | 32 ++- python/rocketmq/v5/producer/producer.py | 270 ++++++++++++++------ python/rocketmq/v5/test/test_base.py | 25 +- python/rocketmq/v5/test/test_consumer.py | 40 ++- python/rocketmq/v5/test/test_producer.py | 14 +- python/rocketmq/v5/util/client_id.py | 10 +- python/rocketmq/v5/util/message_id_codec.py | 26 +- .../rocketmq/v5/util/messaging_result_checker.py | 56 ++++- python/rocketmq/v5/util/misc.py | 22 +- python/rocketmq/v5/util/signature.py | 36 +-- python/setup.py | 2 +- 25 files changed, 1113 insertions(+), 460 deletions(-) diff --git a/python/rocketmq/v5/client/balancer/queue_selector.py b/python/rocketmq/v5/client/balancer/queue_selector.py index add42089..94ed97f7 100644 --- a/python/rocketmq/v5/client/balancer/queue_selector.py +++ b/python/rocketmq/v5/client/balancer/queue_selector.py @@ -33,18 +33,36 @@ class QueueSelector: @classmethod def producer_queue_selector(cls, topic_route: TopicRouteData): - return cls(list(filter(lambda queue: queue.is_writable() and queue.is_master_broker(), topic_route.message_queues)), - QueueSelector.PRODUCER_QUEUE_SELECTOR) + return cls( + list( + filter( + lambda queue: queue.is_writable() and queue.is_master_broker(), + topic_route.message_queues, + ) + ), + QueueSelector.PRODUCER_QUEUE_SELECTOR, + ) @classmethod def simple_consumer_queue_selector(cls, topic_route: TopicRouteData): - return cls(list(filter(lambda queue: queue.is_readable() and queue.is_master_broker(), topic_route.message_queues)), - QueueSelector.SIMPLE_CONSUMER_QUEUE_SELECTOR) + return cls( + list( + filter( + lambda queue: queue.is_readable() and queue.is_master_broker(), + topic_route.message_queues, + ) + ), + QueueSelector.SIMPLE_CONSUMER_QUEUE_SELECTOR, + ) def select_next_queue(self): if self.__selector_type == QueueSelector.NONE_TYPE_SELECTOR: - raise IllegalArgumentException("error type for queue selector, type is NONE_TYPE_SELECTOR.") - return self.__message_queues[self.__index.get_and_increment() % len(self.__message_queues)] + raise IllegalArgumentException( + "error type for queue selector, type is NONE_TYPE_SELECTOR." + ) + return self.__message_queues[ + self.__index.get_and_increment() % len(self.__message_queues) + ] def all_queues(self): index = self.__index.get_and_increment() % len(self.__message_queues) @@ -54,6 +72,16 @@ class QueueSelector: if topic_route.message_queues == self.__message_queues: return if self.__selector_type == QueueSelector.PRODUCER_QUEUE_SELECTOR: - self.__message_queues = list(filter(lambda queue: queue.is_writable() and queue.is_master_broker(), topic_route.message_queues)) + self.__message_queues = list( + filter( + lambda queue: queue.is_writable() and queue.is_master_broker(), + topic_route.message_queues, + ) + ) elif self.__selector_type == QueueSelector.SIMPLE_CONSUMER_QUEUE_SELECTOR: - self.__message_queues = list(filter(lambda queue: queue.is_readable() and queue.is_master_broker(), topic_route.message_queues)) + self.__message_queues = list( + filter( + lambda queue: queue.is_readable() and queue.is_master_broker(), + topic_route.message_queues, + ) + ) diff --git a/python/rocketmq/v5/client/client.py b/python/rocketmq/v5/client/client.py index 57cf2a0b..b6c40f93 100644 --- a/python/rocketmq/v5/client/client.py +++ b/python/rocketmq/v5/client/client.py @@ -36,7 +36,9 @@ class Client: CALLBACK_THREADS_NUM = 5 - def __init__(self, client_configuration, topics, client_type: ClientType, tls_enable=False): + def __init__( + self, client_configuration, topics, client_type: ClientType, tls_enable=False + ): if client_configuration is None: raise IllegalArgumentException("clientConfiguration should not be null.") self.__client_configuration = client_configuration @@ -55,7 +57,9 @@ class Client: self.__sync_setting_scheduler_threading_event = None self.__clear_idle_rpc_channels_threading_event = None if topics is not None: - self.__topics = set(filter(lambda topic: Misc.is_valid_topic(topic), topics)) + self.__topics = set( + filter(lambda topic: Misc.is_valid_topic(topic), topics) + ) else: self.__topics = set() self.__callback_result_queue = Queue() @@ -67,7 +71,9 @@ class Client: def startup(self): try: if self.__had_shutdown is True: - raise Exception(f"client:{self.__client_id} had shutdown, can't startup again.") + raise Exception( + f"client:{self.__client_id} had shutdown, can't startup again." + ) try: # pre update topic route for producer or consumer @@ -76,7 +82,8 @@ class Client: except Exception as e: # ignore this exception and retrieve again when calling send or receive logger.warn( - f"update topic exception when client startup, ignore it, try it again in scheduler. exception: {e}") + f"update topic exception when client startup, ignore it, try it again in scheduler. exception: {e}" + ) self.__start_scheduler() self.__start_async_rpc_callback_handler() self.__is_running = True @@ -110,27 +117,27 @@ class Client: """ abstract """ def _start_success(self): - """ each subclass implements its own actions after a successful startup """ + """each subclass implements its own actions after a successful startup""" pass def _start_failure(self): - """ each subclass implements its own actions after a startup failure """ + """each subclass implements its own actions after a startup failure""" pass def _sync_setting_req(self, endpoints): - """ each subclass implements its own telemetry settings scheme """ + """each subclass implements its own telemetry settings scheme""" pass def _heartbeat_req(self): - """ each subclass implements its own heartbeat request """ + """each subclass implements its own heartbeat request""" pass def _notify_client_termination_req(self): - """ each subclass implements its own client termination request """ + """each subclass implements its own client termination request""" pass def _update_queue_selector(self, topic, topic_route): - """ each subclass implements its own queue selector """ + """each subclass implements its own queue selector""" pass """ scheduler """ @@ -140,29 +147,36 @@ class Client: try: # update topic route every 30 seconds self.__client_thread_task_enabled = True - self.__topic_route_scheduler = threading.Thread(target=self.__schedule_update_topic_route_cache, - name="update_topic_route_schedule_thread") + self.__topic_route_scheduler = threading.Thread( + target=self.__schedule_update_topic_route_cache, + name="update_topic_route_schedule_thread", + ) self.__topic_route_scheduler_threading_event = threading.Event() self.__topic_route_scheduler.start() logger.info("start topic route scheduler success.") # send heartbeat to all endpoints every 10 seconds - self.__heartbeat_scheduler = threading.Thread(target=self.__schedule_heartbeat, - name="heartbeat_schedule_thread") + self.__heartbeat_scheduler = threading.Thread( + target=self.__schedule_heartbeat, name="heartbeat_schedule_thread" + ) self.__heartbeat_scheduler_threading_event = threading.Event() self.__heartbeat_scheduler.start() logger.info("start heartbeat scheduler success.") # send client setting to all endpoints every 5 seconds - self.__sync_setting_scheduler = threading.Thread(target=self.__schedule_update_setting, - name="sync_setting_schedule_thread") + self.__sync_setting_scheduler = threading.Thread( + target=self.__schedule_update_setting, + name="sync_setting_schedule_thread", + ) self.__sync_setting_scheduler_threading_event = threading.Event() self.__sync_setting_scheduler.start() logger.info("start sync setting scheduler success.") # clear unused grpc channel(>30 minutes) every 60 seconds - self.__clear_idle_rpc_channels_scheduler = threading.Thread(target=self.__schedule_clear_idle_rpc_channels, - name="clear_idle_rpc_channel_schedule_thread") + self.__clear_idle_rpc_channels_scheduler = threading.Thread( + target=self.__schedule_clear_idle_rpc_channels, + name="clear_idle_rpc_channel_schedule_thread", + ) self.__clear_idle_rpc_channels_threading_event = threading.Event() self.__clear_idle_rpc_channels_scheduler.start() logger.info("start clear idle rpc channels scheduler success.") @@ -176,7 +190,7 @@ class Client: while True: if self.__client_thread_task_enabled is True: self.__topic_route_scheduler_threading_event.wait(30) - logger.debug(f"{self.__str__()} run scheduler for update topic route cache.") + logger.debug(f"{self.__str__()} run update topic route in scheduler.") # update topic route for each topic in cache topics = self.__topic_route_cache.keys() for topic in topics: @@ -185,24 +199,29 @@ class Client: self.__update_topic_route_async(topic) except Exception as e: logger.error( - f"{self.__str__()} run scheduler for update topic:{topic} route cache exception: {e}") + f"{self.__str__()} scheduler update topic:{topic} route raise exception: {e}" + ) else: break - logger.info(f"{self.__str__()} stop scheduler for update topic route cache success.") + logger.info( + f"{self.__str__()} stop scheduler for update topic route cache success." + ) def __schedule_heartbeat(self): asyncio.set_event_loop(self._rpc_channel_io_loop()) while True: if self.__client_thread_task_enabled is True: self.__heartbeat_scheduler_threading_event.wait(10) - logger.debug(f"{self.__str__()} run scheduler for heartbeat.") + logger.debug(f"{self.__str__()} run send heartbeat in scheduler.") all_endpoints = self.__get_all_endpoints().values() try: for endpoints in all_endpoints: if self.__client_thread_task_enabled is True: self.__heartbeat_async(endpoints) except Exception as e: - logger.error(f"{self.__str__()} run scheduler for heartbeat exception: {e}") + logger.error( + f"{self.__str__()} scheduler send heartbeat raise exception: {e}" + ) else: break logger.info(f"{self.__str__()} stop scheduler for heartbeat success.") @@ -212,16 +231,16 @@ class Client: while True: if self.__client_thread_task_enabled is True: self.__sync_setting_scheduler_threading_event.wait(5) - logger.debug(f"{self.__str__()} run scheduler for update setting.") + logger.debug(f"{self.__str__()} run update setting in scheduler.") try: all_endpoints = self.__get_all_endpoints().values() for endpoints in all_endpoints: if self.__client_thread_task_enabled is True: - # if stream_stream_call for grpc channel is none, create a new one, otherwise use the existing one - self.__retrieve_telemetry_stream_stream_call(endpoints) self.__setting_write(endpoints) except Exception as e: - logger.error(f"{self.__str__()} run scheduler for update setting exception: {e}") + logger.error( + f"{self.__str__()} scheduler set setting raise exception: {e}" + ) else: break logger.info(f"{self.__str__()} stop scheduler for update setting success.") @@ -230,15 +249,21 @@ class Client: while True: if self.__client_thread_task_enabled is True: self.__clear_idle_rpc_channels_threading_event.wait(60) - logger.debug(f"{self.__str__()} run scheduler for clear idle rpc channels.") + logger.debug( + f"{self.__str__()} run scheduler for clear idle rpc channels." + ) try: if self.__client_thread_task_enabled is True: self.__rpc_client.clear_idle_rpc_channels() except Exception as e: - logger.error(f"{self.__str__()} run scheduler for clear idle rpc channels: {e}") + logger.error( + f"{self.__str__()} run scheduler for clear idle rpc channels: {e}" + ) else: break - logger.info(f"{self.__str__()} stop scheduler for clear idle rpc channels success.") + logger.info( + f"{self.__str__()} stop scheduler for clear idle rpc channels success." + ) """ callback handler for async method """ @@ -247,11 +272,15 @@ class Client: # this handler switches user's callback thread from RpcClient's _io_loop_thread to client's callback_handler_thread try: for i in range(Client.CALLBACK_THREADS_NUM): - th = threading.Thread(name=f"callback_handler_thread-{i}", target=self.__handle_callback) + th = threading.Thread( + name=f"callback_handler_thread-{i}", target=self.__handle_callback + ) th.daemon = True self.__callback_threads.append(th) th.start() - logger.info(f"{self.__str__()} start async rpc callback thread:{th} success.") + logger.info( + f"{self.__str__()} start async rpc callback thread:{th} success." + ) except Exception as e: print(f"{self.__str__()} start async rpc callback raise exception: {e}") raise e @@ -260,7 +289,10 @@ class Client: while True: if self.__client_thread_task_enabled is True: callback_result = self.__callback_result_queue.get() - if callback_result.result_type == CallbackResultType.END_CALLBACK_THREAD_RESULT: + if ( + callback_result.result_type + == CallbackResultType.END_CALLBACK_THREAD_RESULT + ): # end infinite loop when client shutdown self.__callback_result_queue.task_done() break @@ -272,7 +304,9 @@ class Client: self.__callback_result_queue.task_done() else: break - logger.info(f"{self.__str__()} stop client callback result handler thread:{threading.current_thread()} success.") + logger.info( + f"{self.__str__()} stop client callback result handler thread:{threading.current_thread()} success." + ) """ protect """ @@ -282,10 +316,12 @@ class Client: return route else: route = self.__update_topic_route(topic) - logger.info(f"{self.__str__()} update topic:{topic} route success.") if route is not None: + logger.info(f"{self.__str__()} update topic:{topic} route success.") self.__topics.add(topic) - return route + return route + else: + raise Exception(f"failed to fetch topic:{topic} route.") def _remove_unused_topic_route_data(self, topic): self.__topic_route_cache.remove(topic) @@ -306,31 +342,41 @@ class Client: # topic route # def __update_topic_route(self, topic): - try: - future = self.__rpc_client.query_topic_route_async(self.__client_configuration.rpc_endpoints, - self.__topic_route_req(topic), metadata=self._sign(), - timeout=self.__client_configuration.request_timeout) - res = future.result() - route = self.__handle_topic_route_res(res, topic) - return route - except Exception as e: - logger.error(f"update topic route error, topic:{topic}, {e}") - raise e + event = threading.Event() + callback = functools.partial( + self.__query_topic_route_async_callback, topic=topic, event=event + ) + future = self.__rpc_client.query_topic_route_async( + self.__client_configuration.rpc_endpoints, + self.__topic_route_req(topic), + metadata=self._sign(), + timeout=self.__client_configuration.request_timeout, + ) + future.add_done_callback(callback) + event.wait() + return self.__topic_route_cache.get(topic) def __update_topic_route_async(self, topic): - callback = functools.partial(self.__query_topic_route_async_callback, topic=topic) - future = self.__rpc_client.query_topic_route_async(self.__client_configuration.rpc_endpoints, - self.__topic_route_req(topic), - metadata=self._sign(), - timeout=self.__client_configuration.request_timeout) + callback = functools.partial( + self.__query_topic_route_async_callback, topic=topic + ) + future = self.__rpc_client.query_topic_route_async( + self.__client_configuration.rpc_endpoints, + self.__topic_route_req(topic), + metadata=self._sign(), + timeout=self.__client_configuration.request_timeout, + ) future.add_done_callback(callback) - def __query_topic_route_async_callback(self, future, topic): + def __query_topic_route_async_callback(self, future, topic, event=None): try: res = future.result() self.__handle_topic_route_res(res, topic) except Exception as e: raise e + finally: + if event is not None: + event.set() def __topic_route_req(self, topic): req = QueryRouteRequest() @@ -344,7 +390,9 @@ class Client: MessagingResultChecker.check(res.status) if res.status.code == Code.OK: topic_route = TopicRouteData(res.message_queues) - logger.debug(f"{self.__str__()} update topic:{topic} route, route info: {topic_route.__str__()}") + logger.info( + f"{self.__str__()} update topic:{topic} route, route info: {topic_route.__str__()}" + ) # if topic route has new endpoint, connect self.__check_topic_route_endpoints_changed(topic, topic_route) self.__topic_route_cache.put(topic, topic_route) @@ -359,27 +407,50 @@ class Client: def __heartbeat_async(self, endpoints): req = self._heartbeat_req() callback = functools.partial(self.__heartbeat_callback, endpoints=endpoints) - future = self.__rpc_client.heartbeat_async(endpoints, req, metadata=self._sign(), - timeout=self.__client_configuration.request_timeout) + future = self.__rpc_client.heartbeat_async( + endpoints, + req, + metadata=self._sign(), + timeout=self.__client_configuration.request_timeout, + ) future.add_done_callback(callback) def __heartbeat_callback(self, future, endpoints): try: res = future.result() if res is not None and res.status.code == Code.OK: - logger.info(f"{self.__str__()} send heartbeat to {endpoints.__str__()} success.") + logger.info( + f"{self.__str__()} send heartbeat to {endpoints.__str__()} success." + ) else: if res is not None: logger.error( - f"{self.__str__()} send heartbeat to {endpoints.__str__()} error, code:{res.status.code}, message:{res.status.message}.") + f"{self.__str__()} send heartbeat to {endpoints.__str__()} error, code:{res.status.code}, message:{res.status.message}." + ) else: - logger.error(f"{self.__str__()} send heartbeat to {endpoints.__str__()} error, response is none.") + logger.error( + f"{self.__str__()} send heartbeat to {endpoints.__str__()} error, response is none." + ) except Exception as e: - logger.error(f"{self.__str__()} send heartbeat to {endpoints.__str__()} exception, e: {e}") + logger.error( + f"{self.__str__()} send heartbeat to {endpoints.__str__()} exception, e: {e}" + ) raise e # sync settings # + def __retrieve_telemetry_stream_stream_call(self, endpoints, rebuild=False): + try: + self.__rpc_client.telemetry_stream( + endpoints, self, self._sign(), rebuild, timeout=60 * 60 * 24 * 365 + ) + except Exception as e: + logger.error( + f"{self.__str__()} rebuild stream_steam_call to {endpoints.__str__()} exception: {e}" + if rebuild + else f"{self.__str__()} create stream_steam_call to {endpoints.__str__()} exception: {e}" + ) + def __setting_write(self, endpoints): req = self._sync_setting_req(endpoints) callback = functools.partial(self.__setting_write_callback, endpoints=endpoints) @@ -387,26 +458,26 @@ class Client: logger.debug(f"{self.__str__()} send setting to {endpoints.__str__()}, {req}") future.add_done_callback(callback) - def __retrieve_telemetry_stream_stream_call(self, endpoints, rebuild=False): - try: - self.__rpc_client.telemetry_stream(endpoints, self, metadata=self._sign(), timeout=60 * 60 * 24 * 365, - rebuild=rebuild) - except Exception as e: - logger.error( - f"{self.__str__()} rebuild stream_steam_call to {endpoints.__str__()} exception: {e}" if rebuild else f"{self.__str__()} create stream_steam_call to {endpoints.__str__()} exception: {e}") - def __setting_write_callback(self, future, endpoints): try: future.result() - logger.debug(f"{self.__str__()} send setting to {endpoints.__str__()} success.") + logger.info( + f"{self.__str__()} send setting to {endpoints.__str__()} success." + ) except InvalidStateError as e: - logger.warn(f"{self.__str__()} send setting to {endpoints.__str__()} occurred InvalidStateError: {e}") + logger.warn( + f"{self.__str__()} send setting to {endpoints.__str__()} occurred InvalidStateError: {e}" + ) self.__retrieve_telemetry_stream_stream_call(endpoints, rebuild=True) except AioRpcError as e: - logger.warn(f"{self.__str__()} send setting to {endpoints.__str__()} occurred AioRpcError: {e}") + logger.warn( + f"{self.__str__()} send setting to {endpoints.__str__()} occurred AioRpcError: {e}" + ) self.__retrieve_telemetry_stream_stream_call(endpoints, rebuild=True) except Exception as e: - logger.error(f"{self.__str__()} send setting to {endpoints.__str__()} exception: {e}") + logger.error( + f"{self.__str__()} send setting to {endpoints.__str__()} exception: {e}" + ) self.__retrieve_telemetry_stream_stream_call(endpoints, rebuild=True) # metrics # @@ -418,8 +489,12 @@ class Client: def __client_termination(self, endpoints): req = self._notify_client_termination_req() - future = self.__rpc_client.notify_client_termination(endpoints, req, metadata=self._sign(), - timeout=self.__client_configuration.request_timeout) + future = self.__rpc_client.notify_client_termination( + endpoints, + req, + metadata=self._sign(), + timeout=self.__client_configuration.request_timeout, + ) future.result() # others ## @@ -435,11 +510,15 @@ class Client: old_route = self.__topic_route_cache.get(topic) if old_route is None or old_route != route: logger.info( - f"topic:{topic} route changed for {self.__str__()}. old route is {old_route}, new route is {route}") + f"topic:{topic} route changed for {self.__str__()}. old route is {old_route}, new route is {route}" + ) all_endpoints = self.__get_all_endpoints() # the existing endpoints - topic_route_endpoints = route.all_endpoints() # the latest endpoints for topic route + topic_route_endpoints = ( + route.all_endpoints() + ) # the latest endpoints for topic route diff = set(topic_route_endpoints.keys()).difference( - set(all_endpoints.keys())) # the diff between existing and latest + set(all_endpoints.keys()) + ) # the diff between existing and latest # create grpc channel, stream_stream_call for new endpoints, send setting to new endpoints for address in diff: endpoints = topic_route_endpoints[address] @@ -477,7 +556,9 @@ class Client: self.__clear_idle_rpc_channels_scheduler.join() for i in range(Client.CALLBACK_THREADS_NUM): - self._set_future_callback_result(CallbackResult.end_callback_thread_result()) + self._set_future_callback_result( + CallbackResult.end_callback_thread_result() + ) for i in range(Client.CALLBACK_THREADS_NUM): self.__callback_threads[i].join() diff --git a/python/rocketmq/v5/client/client_configuration.py b/python/rocketmq/v5/client/client_configuration.py index 7c9c67f9..922fe6be 100644 --- a/python/rocketmq/v5/client/client_configuration.py +++ b/python/rocketmq/v5/client/client_configuration.py @@ -37,8 +37,12 @@ class Credentials: class ClientConfiguration: - def __init__(self, endpoints: str, credentials: Credentials, namespace="", request_timeout=3): - self.__rpc_endpoints = RpcEndpoints(ClientConfiguration.__parse_endpoints(endpoints)) + def __init__( + self, endpoints: str, credentials: Credentials, namespace="", request_timeout=3 + ): + self.__rpc_endpoints = RpcEndpoints( + ClientConfiguration.__parse_endpoints(endpoints) + ) self.__credentials = credentials self.__request_timeout = request_timeout # seconds self.__namespace = namespace @@ -52,7 +56,10 @@ class ClientConfiguration: endpoints = Endpoints() addresses = endpoints_str.split(";") endpoints.scheme = ClientConfiguration.__parse_endpoints_scheme_type( - ClientConfiguration.__parse_endpoints_prefix(addresses[0].split(":")[0])) + ClientConfiguration.__parse_endpoints_prefix( + addresses[0].split(":")[0] + ) + ) for address in addresses: if len(address) == 0: continue @@ -62,7 +69,9 @@ class ClientConfiguration: ad.port = int(address.split(":")[1]) return endpoints except Exception as e: - logger.error(f"client configuration parse {endpoints_str} exception: {e}") + logger.error( + f"client configuration parse {endpoints_str} exception: {e}" + ) return None @staticmethod diff --git a/python/rocketmq/v5/client/connection/rpc_channel.py b/python/rocketmq/v5/client/connection/rpc_channel.py index d79f5478..246e102b 100644 --- a/python/rocketmq/v5/client/connection/rpc_channel.py +++ b/python/rocketmq/v5/client/connection/rpc_channel.py @@ -60,9 +60,13 @@ class RpcEndpoints: def __init__(self, endpoints: Endpoints): self.__endpoints = endpoints self.__scheme = endpoints.scheme - self.__addresses = set(map(lambda address: RpcAddress(address), endpoints.addresses)) + self.__addresses = set( + map(lambda address: RpcAddress(address), endpoints.addresses) + ) if self.__scheme == AddressScheme.DOMAIN_NAME and len(self.__addresses) > 1: - raise UnsupportedException("Multiple addresses not allowed in domain schema") + raise UnsupportedException( + "Multiple addresses not allowed in domain schema" + ) self.__facade, self.__endpoint_desc = self.__facade() def __hash__(self) -> int: @@ -79,8 +83,11 @@ class RpcEndpoints: """ private """ def __facade(self): - if self.__scheme is None or len( - self.__addresses) == 0 or self.__scheme == AddressScheme.ADDRESS_SCHEME_UNSPECIFIED: + if ( + self.__scheme is None + or len(self.__addresses) == 0 + or self.__scheme == AddressScheme.ADDRESS_SCHEME_UNSPECIFIED + ): return "" prefix = "dns:" @@ -94,7 +101,7 @@ class RpcEndpoints: ret = "" for address in sorted_list: ret = ret + address.__str__() + "," - return prefix + ret[0: len(ret) - 1], ret[0: len(ret) - 1] + return prefix + ret[0:len(ret) - 1], ret[0:len(ret) - 1] """ property """ @@ -123,22 +130,33 @@ class RpcStreamStreamCall: if res.HasField("settings"): # read a response for send setting result if res is not None and res.status.code == Code.OK: - logger.debug(f"async setting success. response status code: {res.status.code}") - if res.settings is not None and res.settings.metric is not None: + logger.debug( + f"{ self.__handler.__str__()} sync setting success. response status code: {res.status.code}" + ) + if ( + res.settings is not None + and res.settings.metric is not None + ): # reset metrics if needed self.__handler.reset_metric(res.settings.metric) elif res.HasField("recover_orphaned_transaction_command"): # sever check for a transaction message if self.__handler is not None: - transaction_id = res.recover_orphaned_transaction_command.transaction_id + transaction_id = ( + res.recover_orphaned_transaction_command.transaction_id + ) message = res.recover_orphaned_transaction_command.message - await self.__handler.on_recover_orphaned_transaction_command(self.__endpoints, message, - transaction_id) + await self.__handler.on_recover_orphaned_transaction_command( + self.__endpoints, message, transaction_id + ) except AioRpcError as e: logger.warn( - f"stream read from endpoints {self.__endpoints.__str__()} occurred AioRpcError. code: {e.code()}, message: {e.details()}") + f"{ self.__handler.__str__()} read stream from endpoints {self.__endpoints.__str__()} occurred AioRpcError. code: {e.code()}, message: {e.details()}" + ) except Exception as e: - logger.error(f"stream read from endpoints {self.__endpoints.__str__()} exception, {e}") + logger.error( + f"{ self.__handler.__str__()} read stream from endpoints {self.__endpoints.__str__()} exception, {e}" + ) async def stream_write(self, req): if self.__stream_stream_call is not None: @@ -164,6 +182,7 @@ class RpcChannel: def create_channel(self, loop): # create grpc channel with the given loop + # assert loop == RpcClient._io_loop asyncio.set_event_loop(loop) self.__create_aio_channel() @@ -173,7 +192,9 @@ class RpcChannel: if self.__telemetry_stream_stream_call is not None: self.__telemetry_stream_stream_call.close() self.__telemetry_stream_stream_call = None - logger.info(f"channel[{self.__endpoints.__str__()}] close stream_stream_call success.") + logger.info( + f"channel[{self.__endpoints.__str__()}] close stream_stream_call success." + ) if self.channel_state() is not ChannelConnectivity.SHUTDOWN: # close grpc channel asyncio.run_coroutine_threadsafe(self.__async_channel.close(), loop) @@ -189,29 +210,43 @@ class RpcChannel: def register_telemetry_stream_stream_call(self, stream_stream_call, handler): if self.__telemetry_stream_stream_call is not None: self.__telemetry_stream_stream_call.close() - self.__telemetry_stream_stream_call = RpcStreamStreamCall(self.__endpoints, stream_stream_call, handler) + self.__telemetry_stream_stream_call = RpcStreamStreamCall( + self.__endpoints, stream_stream_call, handler + ) """ private """ def __create_aio_channel(self): try: if self.__endpoints is None: - raise IllegalArgumentException("create_aio_channel exception, endpoints is None") + raise IllegalArgumentException( + "create_aio_channel exception, endpoints is None" + ) else: - options = [('grpc.enable_retries', 0), ("grpc.max_send_message_length", -1), - ("grpc.max_receive_message_length", -1)] + options = [ + ("grpc.enable_retries", 0), + ("grpc.max_send_message_length", -1), + ("grpc.max_receive_message_length", -1), + ] if self.__tls_enabled: - self.__async_channel = aio.secure_channel(self.__endpoints.facade, grpc.ssl_channel_credentials(), - options) + self.__async_channel = aio.secure_channel( + self.__endpoints.facade, grpc.ssl_channel_credentials(), options + ) else: - self.__async_channel = aio.insecure_channel(self.__endpoints.facade, options) + self.__async_channel = aio.insecure_channel( + self.__endpoints.facade, options + ) self.__async_stub = MessagingServiceStub(self.__async_channel) logger.debug( - f"create_aio_channel to [{self.__endpoints.__str__()}] success. channel state:{self.__async_channel.get_state()}") + f"create_aio_channel to [{self.__endpoints.__str__()}] success. channel state:{self.__async_channel.get_state()}" + ) except Exception as e: - logger.error(f"create_aio_channel to [{self.__endpoints.__str__()}] exception: {e}") + logger.error( + f"create_aio_channel to [{self.__endpoints.__str__()}] exception: {e}" + ) raise e + # """ property """ @property diff --git a/python/rocketmq/v5/client/connection/rpc_client.py b/python/rocketmq/v5/client/connection/rpc_client.py index 3bcd3e16..77e6473c 100644 --- a/python/rocketmq/v5/client/connection/rpc_client.py +++ b/python/rocketmq/v5/client/connection/rpc_client.py @@ -42,8 +42,11 @@ class RpcClient: # start an event loop for async io if RpcClient._io_loop is None: initialized_event = threading.Event() - RpcClient._io_loop_thread = threading.Thread(target=RpcClient.__init_io_loop, args=(initialized_event,), - name="channel_io_loop_thread") + RpcClient._io_loop_thread = threading.Thread( + target=RpcClient.__init_io_loop, + args=(initialized_event,), + name="channel_io_loop_thread", + ) RpcClient._io_loop_thread.daemon = True RpcClient._io_loop_thread.start() # waiting for thread start success @@ -57,18 +60,17 @@ class RpcClient: def retrieve_or_create_channel(self, endpoints: RpcEndpoints): if self.__enable_retrieve_channel is False: raise Exception("RpcClient is not running.") - try: # get or create a new grpc channel - with RpcClient._channel_lock: - channel = self.__get_channel(endpoints) - if channel is not None: - channel.update_time = int(time.time()) - else: + channel = self.__get_channel(endpoints) + if channel is not None: + channel.update_time = int(time.time()) + else: + with RpcClient._channel_lock: channel = RpcChannel(endpoints, self.__tls_enable) channel.create_channel(RpcClient.get_channel_io_loop()) self.__put_channel(endpoints, channel) - return channel + return channel except Exception as e: logger.error(f"retrieve or create channel exception: {e}") raise e @@ -80,11 +82,12 @@ class RpcClient: for endpoints, channel in items: if now - channel.update_time > RpcClient.RPC_CLIENT_MAX_IDLE_SECONDS: idle_endpoints.append(endpoints) - with RpcClient._channel_lock: - for endpoints in idle_endpoints: - logger.info(f"remove idle channel {endpoints.__str__()}") - self.__close_rpc_channel(endpoints) - self.channels.remove(endpoints) + if len(idle_endpoints) > 0: + with RpcClient._channel_lock: + for endpoints in idle_endpoints: + logger.info(f"remove idle channel {endpoints.__str__()}") + self.__close_rpc_channel(endpoints) + self.channels.remove(endpoints) def stop(self): with RpcClient._channel_lock: @@ -99,103 +102,190 @@ class RpcClient: """ grpc MessageService """ - def query_topic_route_async(self, endpoints: RpcEndpoints, req: QueryRouteRequest, metadata, timeout=3): + def query_topic_route_async( + self, endpoints: RpcEndpoints, req: QueryRouteRequest, metadata, timeout=3 + ): return RpcClient.__run_message_service_async( - self.__query_route_async_0(endpoints, req, metadata=metadata, timeout=timeout)) - - def send_message_async(self, endpoints: RpcEndpoints, req: SendMessageRequest, metadata, timeout=3): + self.__query_route_async_0( + endpoints, req, metadata=metadata, timeout=timeout + ) + ) + + def send_message_async( + self, endpoints: RpcEndpoints, req: SendMessageRequest, metadata, timeout=3 + ): return RpcClient.__run_message_service_async( - self.__send_message_0(endpoints, req, metadata=metadata, timeout=timeout)) + self.__send_message_0(endpoints, req, metadata=metadata, timeout=timeout) + ) - def receive_message_async(self, endpoints: RpcEndpoints, req: ReceiveMessageRequest, metadata, timeout=3): + def receive_message_async( + self, endpoints: RpcEndpoints, req: ReceiveMessageRequest, metadata, timeout=3 + ): return RpcClient.__run_message_service_async( - self.__receive_message_0(endpoints, req, metadata=metadata, timeout=timeout)) + self.__receive_message_0(endpoints, req, metadata=metadata, timeout=timeout) + ) - def ack_message_async(self, endpoints: RpcEndpoints, req: AckMessageRequest, metadata, timeout=3): + def ack_message_async( + self, endpoints: RpcEndpoints, req: AckMessageRequest, metadata, timeout=3 + ): return RpcClient.__run_message_service_async( - self.__ack_message_0(endpoints, req, metadata=metadata, timeout=timeout)) - - def change_invisible_duration_async(self, endpoints: RpcEndpoints, req: ChangeInvisibleDurationRequest, metadata, - timeout=3): + self.__ack_message_0(endpoints, req, metadata=metadata, timeout=timeout) + ) + + def change_invisible_duration_async( + self, + endpoints: RpcEndpoints, + req: ChangeInvisibleDurationRequest, + metadata, + timeout=3, + ): return RpcClient.__run_message_service_async( - self.__change_invisible_duration_0(endpoints, req, metadata=metadata, timeout=timeout)) - - def heartbeat_async(self, endpoints: RpcEndpoints, req: HeartbeatRequest, metadata, timeout=3): + self.__change_invisible_duration_0( + endpoints, req, metadata=metadata, timeout=timeout + ) + ) + + def heartbeat_async( + self, endpoints: RpcEndpoints, req: HeartbeatRequest, metadata, timeout=3 + ): return RpcClient.__run_message_service_async( - self.__heartbeat_async_0(endpoints, req, metadata=metadata, timeout=timeout)) + self.__heartbeat_async_0(endpoints, req, metadata=metadata, timeout=timeout) + ) def telemetry_write_async(self, endpoints: RpcEndpoints, req: TelemetryCommand): return RpcClient.__run_message_service_async( - self.retrieve_or_create_channel(endpoints).telemetry_stream_stream_call.stream_write(req)) - - def end_transaction_async(self, endpoints: RpcEndpoints, req: EndTransactionRequest, metadata, timeout=3): + self.retrieve_or_create_channel( + endpoints + ).telemetry_stream_stream_call.stream_write(req) + ) + + def end_transaction_async( + self, endpoints: RpcEndpoints, req: EndTransactionRequest, metadata, timeout=3 + ): return RpcClient.__run_message_service_async( - self.__end_transaction_0(endpoints, req, metadata=metadata, timeout=timeout)) - - def notify_client_termination(self, endpoints: RpcEndpoints, req: NotifyClientTerminationRequest, metadata, - timeout=3): + self.__end_transaction_0(endpoints, req, metadata=metadata, timeout=timeout) + ) + + def notify_client_termination( + self, + endpoints: RpcEndpoints, + req: NotifyClientTerminationRequest, + metadata, + timeout=3, + ): return RpcClient.__run_message_service_async( - self.__notify_client_termination_0(endpoints, req, metadata=metadata, timeout=timeout)) - - def telemetry_stream(self, endpoints: RpcEndpoints, client, metadata, timeout=3000, rebuild=False): + self.__notify_client_termination_0( + endpoints, req, metadata=metadata, timeout=timeout + ) + ) + + def end_transaction_for_server_check( + self, endpoints: RpcEndpoints, req: EndTransactionRequest, metadata, timeout=3 + ): + # assert asyncio.get_running_loop() == RpcClient._io_loop try: - channel = self.retrieve_or_create_channel(endpoints) - if channel.telemetry_stream_stream_call is None or rebuild is True: - stream = channel.async_stub.Telemetry(metadata=metadata, timeout=timeout) - channel.register_telemetry_stream_stream_call(stream, client) - asyncio.run_coroutine_threadsafe(channel.telemetry_stream_stream_call.start_stream_read(), - RpcClient.get_channel_io_loop()) - logger.info( - f"{client.__str__()} rebuild stream_steam_call to {endpoints.__str__()} success." if rebuild else f"{client.__str__()} create stream_steam_call to {endpoints.__str__()} success.") + return self.__end_transaction_0( + endpoints, req, metadata=metadata, timeout=timeout + ) except Exception as e: + logger.error( + f"end transaction exception, topic:{req.topic.name}, message_id:{req.message_id}, transaction_id:{req.transaction_id}: {e}" + ) raise e - def end_transaction_for_server_check(self, endpoints: RpcEndpoints, req: EndTransactionRequest, metadata, - timeout=3): + """ build stream_stream_call """ + + def telemetry_stream( + self, endpoints: RpcEndpoints, client, metadata, rebuild, timeout=3000 + ): + # assert asyncio.get_running_loop() == RpcClient._io_loop try: - return self.__end_transaction_0(endpoints, req, metadata=metadata, timeout=timeout) + channel = self.retrieve_or_create_channel(endpoints) + stream = channel.async_stub.Telemetry( + metadata=metadata, timeout=timeout, wait_for_ready=True + ) + channel.register_telemetry_stream_stream_call(stream, client) + asyncio.run_coroutine_threadsafe( + channel.telemetry_stream_stream_call.start_stream_read(), + RpcClient.get_channel_io_loop(), + ) + logger.info( + f"{client.__str__()} rebuild stream_steam_call to {endpoints.__str__()}." + if rebuild + else f"{client.__str__()} create stream_steam_call to {endpoints.__str__()}." + ) + return channel except Exception as e: - logger.error( - f"end transaction exception, topic:{req.topic.name}, message_id:{req.message_id}, transaction_id:{req.transaction_id}: {e}") raise e """ MessageService.stub impl """ - async def __query_route_async_0(self, endpoints: RpcEndpoints, req: QueryRouteRequest, metadata, timeout=3): - return await self.retrieve_or_create_channel(endpoints).async_stub.QueryRoute(req, metadata=metadata, - timeout=timeout) - - async def __send_message_0(self, endpoints: RpcEndpoints, req: SendMessageRequest, metadata, timeout=3): - return await self.retrieve_or_create_channel(endpoints).async_stub.SendMessage(req, metadata=metadata, - timeout=timeout) - - async def __receive_message_0(self, endpoints: RpcEndpoints, req: ReceiveMessageRequest, metadata, timeout=3): - return self.retrieve_or_create_channel(endpoints).async_stub.ReceiveMessage(req, metadata=metadata, - timeout=timeout) - - async def __ack_message_0(self, endpoints: RpcEndpoints, req: AckMessageRequest, metadata, timeout=3): - return await self.retrieve_or_create_channel(endpoints).async_stub.AckMessage(req, metadata=metadata, - timeout=timeout) - - async def __heartbeat_async_0(self, endpoints: RpcEndpoints, req: HeartbeatRequest, metadata, timeout=3): - return await self.retrieve_or_create_channel(endpoints).async_stub.Heartbeat(req, metadata=metadata, - timeout=timeout) - - async def __change_invisible_duration_0(self, endpoints: RpcEndpoints, req: ChangeInvisibleDurationRequest, - metadata, timeout=3): - return await self.retrieve_or_create_channel(endpoints).async_stub.ChangeInvisibleDuration(req, - metadata=metadata, - timeout=timeout) - - async def __end_transaction_0(self, endpoints: RpcEndpoints, req: EndTransactionRequest, metadata, timeout=3): - return await self.retrieve_or_create_channel(endpoints).async_stub.EndTransaction(req, metadata=metadata, - timeout=timeout) - - async def __notify_client_termination_0(self, endpoints: RpcEndpoints, req: NotifyClientTerminationRequest, - metadata, timeout=3): - return await self.retrieve_or_create_channel(endpoints).async_stub.NotifyClientTermination(req, - metadata=metadata, - timeout=timeout) + async def __query_route_async_0( + self, endpoints: RpcEndpoints, req: QueryRouteRequest, metadata, timeout=3 + ): + return await self.retrieve_or_create_channel(endpoints).async_stub.QueryRoute( + req, metadata=metadata, timeout=timeout + ) + + async def __send_message_0( + self, endpoints: RpcEndpoints, req: SendMessageRequest, metadata, timeout=3 + ): + return await self.retrieve_or_create_channel(endpoints).async_stub.SendMessage( + req, metadata=metadata, timeout=timeout + ) + + async def __receive_message_0( + self, endpoints: RpcEndpoints, req: ReceiveMessageRequest, metadata, timeout=3 + ): + return self.retrieve_or_create_channel(endpoints).async_stub.ReceiveMessage( + req, metadata=metadata, timeout=timeout + ) + + async def __ack_message_0( + self, endpoints: RpcEndpoints, req: AckMessageRequest, metadata, timeout=3 + ): + return await self.retrieve_or_create_channel(endpoints).async_stub.AckMessage( + req, metadata=metadata, timeout=timeout + ) + + async def __heartbeat_async_0( + self, endpoints: RpcEndpoints, req: HeartbeatRequest, metadata, timeout=3 + ): + return await self.retrieve_or_create_channel(endpoints).async_stub.Heartbeat( + req, metadata=metadata, timeout=timeout + ) + + async def __change_invisible_duration_0( + self, + endpoints: RpcEndpoints, + req: ChangeInvisibleDurationRequest, + metadata, + timeout=3, + ): + return await self.retrieve_or_create_channel( + endpoints + ).async_stub.ChangeInvisibleDuration(req, metadata=metadata, timeout=timeout) + + async def __end_transaction_0( + self, endpoints: RpcEndpoints, req: EndTransactionRequest, metadata, timeout=3 + ): + return await self.retrieve_or_create_channel( + endpoints + ).async_stub.EndTransaction(req, metadata=metadata, timeout=timeout) + + async def __notify_client_termination_0( + self, + endpoints: RpcEndpoints, + req: NotifyClientTerminationRequest, + metadata, + timeout=3, + ): + return await self.retrieve_or_create_channel( + endpoints + ).async_stub.NotifyClientTermination(req, metadata=metadata, timeout=timeout) + + async def __create_channel_async(self, endpoints: RpcEndpoints): + return self.retrieve_or_create_channel(endpoints) """ private """ @@ -207,7 +297,10 @@ class RpcClient: def __close_rpc_channel(self, endpoints: RpcEndpoints): channel = self.__get_channel(endpoints) - if channel is not None and channel.channel_state() is not ChannelConnectivity.SHUTDOWN: + if ( + channel is not None + and channel.channel_state() is not ChannelConnectivity.SHUTDOWN + ): try: channel.close_channel(RpcClient.get_channel_io_loop()) self.channels.remove(endpoints) @@ -234,7 +327,9 @@ class RpcClient: def __run_message_service_async(func): try: # execute grpc call in RpcClient._io_loop - return asyncio.run_coroutine_threadsafe(func, RpcClient.get_channel_io_loop()) + return asyncio.run_coroutine_threadsafe( + func, RpcClient.get_channel_io_loop() + ) except Exception as e: future = Future() future.set_exception(e) diff --git a/python/rocketmq/v5/client/metrics/client_metrics.py b/python/rocketmq/v5/client/metrics/client_metrics.py index 7aa94433..210631ee 100644 --- a/python/rocketmq/v5/client/metrics/client_metrics.py +++ b/python/rocketmq/v5/client/metrics/client_metrics.py @@ -76,16 +76,21 @@ class ClientMetrics: def send_after(self, send_context: MetricContext, success: bool): if send_context is None: - logger.warn("metrics do send after exception. send_context must not be none.") + logger.warn( + "metrics do send after exception. send_context must not be none." + ) return if send_context.metric_type != MessageMetricType.SEND: logger.warn( - f"metric type must be MessageMetricType.SEND. current send_context type is {send_context.metric_type}") + f"metric type must be MessageMetricType.SEND. current send_context type is {send_context.metric_type}" + ) return if send_context.get_attr("send_stopwatch") is None: - logger.warn("metrics do send after exception. send_stopwatch must not be none.") + logger.warn( + "metrics do send after exception. send_stopwatch must not be none." + ) return if send_context.get_attr("topic") is None: @@ -108,7 +113,11 @@ class ClientMetrics: if metric.endpoints is None: return True # if metrics endpoints changed, return False - if self.__enabled and metric.on and self.__endpoints == RpcEndpoints(metric.endpoints): + if ( + self.__enabled + and metric.on + and self.__endpoints == RpcEndpoints(metric.endpoints) + ): return True return not self.__enabled and not metric.on @@ -122,36 +131,51 @@ class ClientMetrics: def __meter_provider_start(self): if self.__endpoints is None: - logger.warn(f"client:{self.__client_id} can't create meter provider, because endpoints is none.") + logger.warn( + f"client:{self.__client_id} can't create meter provider, because endpoints is none." + ) return try: # setup OTLP exporter - exporter = OTLPMetricExporter(endpoint=self.__endpoints.__str__(), insecure=True, - timeout=ClientMetrics.METRIC_EXPORTER_RPC_TIMEOUT) + exporter = OTLPMetricExporter( + endpoint=self.__endpoints.__str__(), + insecure=True, + timeout=ClientMetrics.METRIC_EXPORTER_RPC_TIMEOUT, + ) # create a metric reader and set the export interval - reader = PeriodicExportingMetricReader(exporter, - export_interval_millis=ClientMetrics.METRIC_READER_INTERVAL) + reader = PeriodicExportingMetricReader( + exporter, export_interval_millis=ClientMetrics.METRIC_READER_INTERVAL + ) # create an empty resource resource = Resource.get_empty() # create view - send_cost_time_view = View(instrument_type=Histogram, - instrument_name=HistogramEnum.SEND_COST_TIME.histogram_name, - aggregation=ExplicitBucketHistogramAggregation( - HistogramEnum.SEND_COST_TIME.buckets)) + send_cost_time_view = View( + instrument_type=Histogram, + instrument_name=HistogramEnum.SEND_COST_TIME.histogram_name, + aggregation=ExplicitBucketHistogramAggregation( + HistogramEnum.SEND_COST_TIME.buckets + ), + ) # create MeterProvider - self.__meter_provider = MeterProvider(metric_readers=[reader], resource=resource, - views=[send_cost_time_view]) + self.__meter_provider = MeterProvider( + metric_readers=[reader], resource=resource, views=[send_cost_time_view] + ) # define the histogram instruments self.__send_success_cost_time_instrument = self.__meter_provider.get_meter( - ClientMetrics.METRIC_INSTRUMENTATION_NAME).create_histogram(HistogramEnum.SEND_COST_TIME.histogram_name) + ClientMetrics.METRIC_INSTRUMENTATION_NAME + ).create_histogram(HistogramEnum.SEND_COST_TIME.histogram_name) except Exception as e: - logger.error(f"client:{self.__client_id} start meter provider exception: {e}") + logger.error( + f"client:{self.__client_id} start meter provider exception: {e}" + ) def __record_send_success_cost_time(self, context, amount): if self.__enabled: try: # record send message cost time and result - self.__send_success_cost_time_instrument.record(amount, context.attributes) + self.__send_success_cost_time_instrument.record( + amount, context.attributes + ) except Exception as e: logger.error(f"record send message cost time exception, e:{e}") diff --git a/python/rocketmq/v5/consumer/simple_consumer.py b/python/rocketmq/v5/consumer/simple_consumer.py index 11a09d1b..6b1483db 100644 --- a/python/rocketmq/v5/consumer/simple_consumer.py +++ b/python/rocketmq/v5/consumer/simple_consumer.py @@ -36,18 +36,27 @@ from rocketmq.v5.util import (AtomicInteger, ConcurrentMap, class SimpleConsumer(Client): - def __init__(self, client_configuration: ClientConfiguration, consumer_group, subscription: dict = None, - await_duration=20): - if consumer_group is None or consumer_group.strip() == '': + def __init__( + self, + client_configuration: ClientConfiguration, + consumer_group, + subscription: dict = None, + await_duration=20, + ): + if consumer_group is None or consumer_group.strip() == "": raise IllegalArgumentException("consumerGroup should not be null") if Misc.is_valid_consumer_group(consumer_group) is False: raise IllegalArgumentException( - f"consumerGroup does not match the regex [regex={Misc.CONSUMER_GROUP_PATTERN}]") + f"consumerGroup does not match the regex [regex={Misc.CONSUMER_GROUP_PATTERN}]" + ) if await_duration is None: raise IllegalArgumentException("awaitDuration should not be null") - super().__init__(client_configuration, None if subscription is None else subscription.keys(), - ClientType.SIMPLE_CONSUMER) + super().__init__( + client_configuration, + None if subscription is None else subscription.keys(), + ClientType.SIMPLE_CONSUMER, + ) self.__consumer_group = consumer_group self.__await_duration = await_duration # long polling timeout, seconds # <String /* topic */, FilterExpression> @@ -64,19 +73,30 @@ class SimpleConsumer(Client): def subscribe(self, topic, filter_expression: FilterExpression = None): if self.is_running is False: - raise IllegalStateException("unable to add subscription because simple consumer is not running") + raise IllegalStateException( + "unable to add subscription because simple consumer is not running" + ) try: if not self.__subscriptions.contains(topic): self._retrieve_topic_route_data(topic) - self.__subscriptions.put(topic, filter_expression if filter_expression is not None else FilterExpression()) + self.__subscriptions.put( + topic, + ( + filter_expression + if filter_expression is not None + else FilterExpression() + ), + ) except Exception as e: - logger.error(f"subscribe exception: {e}") + logger.error(f"subscribe raise exception: {e}") raise e def unsubscribe(self, topic): if self.is_running is False: - raise IllegalStateException("unable to remove subscription because simple consumer is not running") + raise IllegalStateException( + "unable to remove subscription because simple consumer is not running" + ) if topic in self.__subscriptions: self.__subscriptions.remove(topic) @@ -84,43 +104,57 @@ class SimpleConsumer(Client): def receive(self, max_message_num, invisible_duration): if self.is_running is False: - raise IllegalStateException("unable to receive messages because simple consumer is not running") + raise IllegalStateException( + "unable to receive messages because simple consumer is not running" + ) return self.__receive(max_message_num, invisible_duration) def receive_async(self, max_message_num, invisible_duration): if self.is_running is False: - raise IllegalStateException("unable to receive messages because simple consumer is not running") + raise IllegalStateException( + "unable to receive messages because simple consumer is not running" + ) return self.__receive_async(max_message_num, invisible_duration) def ack(self, message: Message): if self.is_running is False: - raise IllegalStateException("unable to ack message because simple consumer is not running") + raise IllegalStateException( + "unable to ack message because simple consumer is not running" + ) queue = self.__select_topic_queue(message.topic) return self.__ack(message, queue) def ack_async(self, message: Message): if self.is_running is False: - raise IllegalStateException("unable to ack message because simple consumer is not running") + raise IllegalStateException( + "unable to ack message because simple consumer is not running" + ) queue = self.__select_topic_queue(message.topic) return self.__ack_async(message, queue) def change_invisible_duration(self, message: Message, invisible_duration): if self.is_running is False: - raise IllegalStateException("unable to change invisible duration because simple consumer is not running") + raise IllegalStateException( + "unable to change invisible duration because simple consumer is not running" + ) queue = self.__select_topic_queue(message.topic) return self.__change_invisible_duration(message, queue, invisible_duration) def change_invisible_duration_async(self, message: Message, invisible_duration): if self.is_running is False: - raise IllegalStateException("unable to change invisible duration because simple consumer is not running") + raise IllegalStateException( + "unable to change invisible duration because simple consumer is not running" + ) queue = self.__select_topic_queue(message.topic) - return self.__change_invisible_duration_async(message, queue, invisible_duration) + return self.__change_invisible_duration_async( + message, queue, invisible_duration + ) """ override """ @@ -190,21 +224,25 @@ class SimpleConsumer(Client): def __select_topic_for_receive(self): try: # select the next topic for receive - mod_index = self.__topic_index.get_and_increment() % len(self.__subscriptions.keys()) + mod_index = self.__topic_index.get_and_increment() % len( + self.__subscriptions.keys() + ) return list(self.__subscriptions.keys())[mod_index] except Exception as e: - logger.error(f"simple consumer select topic for receive message exception: {e}") + logger.error( + f"simple consumer select topic for receive message exception: {e}" + ) raise e def __select_topic_queue(self, topic): try: route = self._retrieve_topic_route_data(topic) - queue_selector = self.__receive_queue_selectors.put_if_absent(topic, - QueueSelector.simple_consumer_queue_selector( - route)) + queue_selector = self.__receive_queue_selectors.put_if_absent( + topic, QueueSelector.simple_consumer_queue_selector(route) + ) return queue_selector.select_next_queue() except Exception as e: - logger.error(f"simple consumer select topic queue for receive message exception: {e}") + logger.error(f"simple consumer select topic queue raise exception: {e}") raise e def __receive(self, max_message_num, invisible_duration): @@ -213,9 +251,13 @@ class SimpleConsumer(Client): queue = self.__select_topic_queue(topic) req = self.__receive_req(topic, queue, max_message_num, invisible_duration) timeout = self.client_configuration.request_timeout + self.__await_duration - future = self.rpc_client.receive_message_async(queue.endpoints, req, metadata=self._sign(), timeout=timeout) - read_future = asyncio.run_coroutine_threadsafe(self.__receive_message_response(future.result()), - self._rpc_channel_io_loop()) + future = self.rpc_client.receive_message_async( + queue.endpoints, req, metadata=self._sign(), timeout=timeout + ) + read_future = asyncio.run_coroutine_threadsafe( + self.__receive_message_response(future.result()), + self._rpc_channel_io_loop(), + ) return self.__handle_receive_message_response(read_future.result()) def __receive_async(self, max_message_num, invisible_duration): @@ -225,11 +267,17 @@ class SimpleConsumer(Client): queue = self.__select_topic_queue(topic) req = self.__receive_req(topic, queue, max_message_num, invisible_duration) timeout = self.client_configuration.request_timeout + self.__await_duration - future = self.rpc_client.receive_message_async(queue.endpoints, req, metadata=self._sign(), timeout=timeout) - read_future = asyncio.run_coroutine_threadsafe(self.__receive_message_response(future.result()), - self._rpc_channel_io_loop()) + future = self.rpc_client.receive_message_async( + queue.endpoints, req, metadata=self._sign(), timeout=timeout + ) + read_future = asyncio.run_coroutine_threadsafe( + self.__receive_message_response(future.result()), + self._rpc_channel_io_loop(), + ) ret_future = Future() - handle_send_receipt_callback = functools.partial(self.__receive_message_callback, ret_future=ret_future) + handle_send_receipt_callback = functools.partial( + self.__receive_message_callback, ret_future=ret_future + ) read_future.add_done_callback(handle_send_receipt_callback) return ret_future except Exception as e: @@ -261,20 +309,28 @@ class SimpleConsumer(Client): try: responses = future.result() messages = self.__handle_receive_message_response(responses) - self._set_future_callback_result(CallbackResult.async_receive_callback_result(ret_future, messages)) + self._set_future_callback_result( + CallbackResult.async_receive_callback_result(ret_future, messages) + ) except Exception as e: - self._set_future_callback_result(CallbackResult.async_receive_callback_result(ret_future, e, False)) + self._set_future_callback_result( + CallbackResult.async_receive_callback_result(ret_future, e, False) + ) async def __receive_message_response(self, unary_stream_call): try: responses = list() async for res in unary_stream_call: if res.HasField("message") or res.HasField("status"): - logger.debug(f"consumer:{self.__consumer_group} receive response: {res}") + logger.debug( + f"consumer:{self.__consumer_group} receive response: {res}" + ) responses.append(res) return responses except Exception as e: - logger.error(f"consumer:{self.__consumer_group} receive message exception: {e}") + logger.error( + f"consumer:{self.__consumer_group} receive message exception: {e}" + ) raise e def __handle_receive_message_response(self, responses): @@ -284,7 +340,8 @@ class SimpleConsumer(Client): for res in responses: if res.HasField("status"): logger.debug( - f"simple_consumer[{self.__consumer_group}] receive_message, code:{res.status.code}, message:{res.status.message}.") + f"simple_consumer[{self.__consumer_group}] receive_message, code:{res.status.code}, message:{res.status.message}." + ) status = res.status elif res.HasField("message"): messages.append(Message().fromProtobuf(res.message)) @@ -300,7 +357,12 @@ class SimpleConsumer(Client): try: req = self.__ack_req(message) - future = self.rpc_client.ack_message_async(queue.endpoints, req, metadata=self._sign()) + future = self.rpc_client.ack_message_async( + queue.endpoints, + req, + metadata=self._sign(), + timeout=self.client_configuration.request_timeout, + ) self.__handle_ack_result(future) except Exception as e: raise e @@ -311,9 +373,16 @@ class SimpleConsumer(Client): try: req = self.__ack_req(message) - future = self.rpc_client.ack_message_async(queue.endpoints, req, metadata=self._sign()) + future = self.rpc_client.ack_message_async( + queue.endpoints, + req, + metadata=self._sign(), + timeout=self.client_configuration.request_timeout, + ) ret_future = Future() - ack_callback = functools.partial(self.__handle_ack_result, ret_future=ret_future) + ack_callback = functools.partial( + self.__handle_ack_result, ret_future=ret_future + ) future.add_done_callback(ack_callback) return ret_future except Exception as e: @@ -335,15 +404,21 @@ class SimpleConsumer(Client): def __handle_ack_result(self, future, ret_future=None): try: res = future.result() - logger.debug(f"consumer[{self.__consumer_group}] ack response, {res.status}") + logger.debug( + f"consumer[{self.__consumer_group}] ack response, {res.status}" + ) MessagingResultChecker.check(res.status) if ret_future is not None: - self._set_future_callback_result(CallbackResult.async_ack_callback_result(ret_future, None)) + self._set_future_callback_result( + CallbackResult.async_ack_callback_result(ret_future, None) + ) except Exception as e: if ret_future is None: raise e else: - self._set_future_callback_result(CallbackResult.async_ack_callback_result(ret_future, e, False)) + self._set_future_callback_result( + CallbackResult.async_ack_callback_result(ret_future, e, False) + ) # change_invisible @@ -353,20 +428,34 @@ class SimpleConsumer(Client): try: req = self.__change_invisible_req(message, invisible_duration) - future = self.rpc_client.change_invisible_duration_async(queue.endpoints, req, metadata=self._sign()) + future = self.rpc_client.change_invisible_duration_async( + queue.endpoints, + req, + metadata=self._sign(), + timeout=self.client_configuration.request_timeout, + ) self.__handle_change_invisible_result(future) except Exception as e: raise e - def __change_invisible_duration_async(self, message: Message, queue, invisible_duration): + def __change_invisible_duration_async( + self, message: Message, queue, invisible_duration + ): if self.is_running is False: raise IllegalArgumentException("consumer is not running now.") try: req = self.__change_invisible_req(message, invisible_duration) - future = self.rpc_client.change_invisible_duration_async(queue.endpoints, req, metadata=self._sign()) + future = self.rpc_client.change_invisible_duration_async( + queue.endpoints, + req, + metadata=self._sign(), + timeout=self.client_configuration.request_timeout, + ) ret_future = Future() - change_invisible_callback = functools.partial(self.__handle_change_invisible_result, ret_future=ret_future) + change_invisible_callback = functools.partial( + self.__handle_change_invisible_result, ret_future=ret_future + ) future.add_done_callback(change_invisible_callback) return ret_future except Exception as e: @@ -386,17 +475,25 @@ class SimpleConsumer(Client): def __handle_change_invisible_result(self, future, ret_future=None): try: res = future.result() - logger.debug(f"consumer[{self.__consumer_group}] change invisible response, {res.status}") + logger.debug( + f"consumer[{self.__consumer_group}] change invisible response, {res.status}" + ) MessagingResultChecker.check(res.status) if ret_future is not None: self._set_future_callback_result( - CallbackResult.async_change_invisible_duration_callback_result(ret_future, None)) + CallbackResult.async_change_invisible_duration_callback_result( + ret_future, None + ) + ) except Exception as e: if ret_future is None: raise e else: self._set_future_callback_result( - CallbackResult.async_change_invisible_duration_callback_result(ret_future, e, False)) + CallbackResult.async_change_invisible_duration_callback_result( + ret_future, e, False + ) + ) """ property """ diff --git a/python/rocketmq/v5/exception/client_exception.py b/python/rocketmq/v5/exception/client_exception.py index d4239645..8e9b4e44 100644 --- a/python/rocketmq/v5/exception/client_exception.py +++ b/python/rocketmq/v5/exception/client_exception.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. + class ClientException(Exception): def __init__(self, message, code=None): diff --git a/python/rocketmq/v5/log/log_config.py b/python/rocketmq/v5/log/log_config.py index 1de3240e..a363c650 100644 --- a/python/rocketmq/v5/log/log_config.py +++ b/python/rocketmq/v5/log/log_config.py @@ -19,35 +19,33 @@ import os __DIR = f'{os.path.expanduser("~/logs/rocketmq_python/")}' __LOG_CONFIG = { - 'version': 1.0, - 'disable_existing_loggers': False, - 'formatters': { - 'standard': { - 'format': '%(asctime)s [%(levelname)s] %(message)s' - }, + "version": 1.0, + "disable_existing_loggers": False, + "formatters": { + "standard": {"format": "%(asctime)s [%(levelname)s] %(message)s"}, }, - 'handlers': { + "handlers": { # 'console': { # 'level': 'DEBUG', # 'class': 'logging.StreamHandler', # 'formatter': 'standard' # }, - 'file': { - 'class': 'logging.handlers.RotatingFileHandler', - 'level': 'INFO', - 'formatter': 'standard', - 'filename': f'{__DIR}/rocketmq_client.log', - 'maxBytes': 1024 * 1024 * 100, # 100MB - 'backupCount': 10, + "file": { + "class": "logging.handlers.RotatingFileHandler", + "level": "INFO", + "formatter": "standard", + "filename": f"{__DIR}/rocketmq_client.log", + "maxBytes": 1024 * 1024 * 100, # 100MB + "backupCount": 10, }, }, - 'loggers': { - 'rocketmq-python-client': { - 'handlers': ['file'], - 'level': 'INFO', - 'propagate': False + "loggers": { + "rocketmq-python-client": { + "handlers": ["file"], + "level": "INFO", + "propagate": False, }, - } + }, } if not os.path.exists(__DIR): diff --git a/python/rocketmq/v5/model/callback_result.py b/python/rocketmq/v5/model/callback_result.py index 568d6c43..ad967973 100644 --- a/python/rocketmq/v5/model/callback_result.py +++ b/python/rocketmq/v5/model/callback_result.py @@ -47,25 +47,41 @@ class CallbackResult: @staticmethod def async_send_callback_result(future, result, success=True): callback_result = CallbackResult.callback_result(future, result, success) - callback_result.__result_type = CallbackResultType.ASYNC_SEND_CALLBACK_RESULT if success else CallbackResultType.ASYNC_SEND_CALLBACK_EXCEPTION + callback_result.__result_type = ( + CallbackResultType.ASYNC_SEND_CALLBACK_RESULT + if success + else CallbackResultType.ASYNC_SEND_CALLBACK_EXCEPTION + ) return callback_result @staticmethod def async_receive_callback_result(future, result, success=True): callback_result = CallbackResult.callback_result(future, result, success) - callback_result.__result_type = CallbackResultType.ASYNC_ACK_CALLBACK_RESULT if success else CallbackResultType.ASYNC_ACK_CALLBACK_EXCEPTION + callback_result.__result_type = ( + CallbackResultType.ASYNC_ACK_CALLBACK_RESULT + if success + else CallbackResultType.ASYNC_ACK_CALLBACK_EXCEPTION + ) return callback_result @staticmethod def async_ack_callback_result(future, result, success=True): callback_result = CallbackResult.callback_result(future, result, success) - callback_result.__result_type = CallbackResultType.ASYNC_RECEIVE_CALLBACK_RESULT if success else CallbackResultType.ASYNC_RECEIVE_CALLBACK_EXCEPTION + callback_result.__result_type = ( + CallbackResultType.ASYNC_RECEIVE_CALLBACK_RESULT + if success + else CallbackResultType.ASYNC_RECEIVE_CALLBACK_EXCEPTION + ) return callback_result @staticmethod def async_change_invisible_duration_callback_result(future, result, success=True): callback_result = CallbackResult.callback_result(future, result, success) - callback_result.__result_type = CallbackResultType.ASYNC_CHANGE_INVISIBLE_DURATION_RESULT if success else CallbackResultType.ASYNC_CHANGE_INVISIBLE_DURATION_EXCEPTION + callback_result.__result_type = ( + CallbackResultType.ASYNC_CHANGE_INVISIBLE_DURATION_RESULT + if success + else CallbackResultType.ASYNC_CHANGE_INVISIBLE_DURATION_EXCEPTION + ) return callback_result @staticmethod diff --git a/python/rocketmq/v5/model/filter_expression.py b/python/rocketmq/v5/model/filter_expression.py index 98e23da4..9cb0666e 100644 --- a/python/rocketmq/v5/model/filter_expression.py +++ b/python/rocketmq/v5/model/filter_expression.py @@ -19,7 +19,11 @@ from rocketmq.grpc_protocol import FilterType class FilterExpression: TAG_EXPRESSION_SUB_ALL = "*" - def __init__(self, expression=TAG_EXPRESSION_SUB_ALL, filter_type: FilterType = FilterType.TAG): + def __init__( + self, + expression=TAG_EXPRESSION_SUB_ALL, + filter_type: FilterType = FilterType.TAG, + ): self.__expression = expression self.__filter_type = filter_type diff --git a/python/rocketmq/v5/model/message.py b/python/rocketmq/v5/model/message.py index efd32085..ead7d795 100644 --- a/python/rocketmq/v5/model/message.py +++ b/python/rocketmq/v5/model/message.py @@ -37,15 +37,19 @@ class Message: self.__message_type = None def __str__(self) -> str: - return f"topic:{self.__topic}, tag:{self.__tag}, messageGroup:{self.__message_group}, " \ - f"deliveryTimestamp:{self.__delivery_timestamp}, keys:{self.__keys}, properties:{self.__properties}" + return ( + f"topic:{self.__topic}, tag:{self.__tag}, messageGroup:{self.__message_group}, " + f"deliveryTimestamp:{self.__delivery_timestamp}, keys:{self.__keys}, properties:{self.__properties}" + ) def fromProtobuf(self, message: definition_pb2.Message): # noqa try: self.__message_body_check_sum(message) self.__topic = message.topic.name self.__namespace = message.topic.resource_namespace - self.__message_id = MessageIdCodec.decode(message.system_properties.message_id) + self.__message_id = MessageIdCodec.decode( + message.system_properties.message_id + ) self.__body = self.__uncompress_body(message) self.__tag = message.system_properties.tag self.__message_group = message.system_properties.message_group @@ -71,18 +75,26 @@ class Message: if message.system_properties.body_digest.type == DigestType.CRC32: crc32_sum = Misc.crc32_checksum(message.body) if message.system_properties.body_digest.checksum != crc32_sum: - raise Exception(f"(body_check_sum exception, {message.digest.checksum} != crc32_sum {crc32_sum}") + raise Exception( + f"(body_check_sum exception, {message.digest.checksum} != crc32_sum {crc32_sum}" + ) elif message.system_properties.body_digest.type == DigestType.MD5: md5_sum = Misc.md5_checksum(message.body) if message.system_properties.body_digest.checksum != md5_sum: - raise Exception(f"(body_check_sum exception, {message.digest.checksum} != crc32_sum {md5_sum}") + raise Exception( + f"(body_check_sum exception, {message.digest.checksum} != crc32_sum {md5_sum}" + ) elif message.system_properties.body_digest.type == DigestType.SHA1: sha1_sum = Misc.sha1_checksum(message.body) if message.system_properties.body_digest.checksum != sha1_sum: - raise Exception(f"(body_check_sum exception, {message.digest.checksum} != crc32_sum {sha1_sum}") + raise Exception( + f"(body_check_sum exception, {message.digest.checksum} != crc32_sum {sha1_sum}" + ) else: - raise Exception(f"unsupported message body digest algorithm, {message.system_properties.body_digest.type}," - f" {message.topic}, {message.system_properties.message_id}") + raise Exception( + f"unsupported message body digest algorithm, {message.system_properties.body_digest.type}," + f" {message.topic}, {message.system_properties.message_id}" + ) @staticmethod def __uncompress_body(message): @@ -92,7 +104,8 @@ class Message: return message.body else: raise Exception( - f"unsupported message encoding algorithm, {message.system_properties.body_encoding}, {message.topic}, {message.system_properties.message_id}") + f"unsupported message encoding algorithm, {message.system_properties.body_encoding}, {message.topic}, {message.system_properties.message_id}" + ) """ property """ @@ -154,18 +167,20 @@ class Message: @body.setter def body(self, body): - if body is None or body.strip() == '': + if body is None or body.strip() == "": raise IllegalArgumentException("body should not be blank") self.__body = body @topic.setter def topic(self, topic): - if topic is None or topic.strip() == '': + if topic is None or topic.strip() == "": raise IllegalArgumentException("topic has not been set yet") if Misc.is_valid_topic(topic): self.__topic = topic else: - raise IllegalArgumentException(f"topic does not match the regex [regex={Misc.TOPIC_PATTERN}]") + raise IllegalArgumentException( + f"topic does not match the regex [regex={Misc.TOPIC_PATTERN}]" + ) @message_id.setter def message_id(self, message_id): @@ -173,16 +188,18 @@ class Message: @tag.setter def tag(self, tag): - if tag is None or tag.strip() == '': + if tag is None or tag.strip() == "": raise IllegalArgumentException("tag should not be blank") if "|" in tag: - raise IllegalArgumentException("tag should not contain \"|\"") + raise IllegalArgumentException('tag should not contain "|"') self.__tag = tag @message_group.setter def message_group(self, message_group): if self.__delivery_timestamp is not None: - raise IllegalArgumentException("deliveryTimestamp and messageGroup should not be set at same time") + raise IllegalArgumentException( + "deliveryTimestamp and messageGroup should not be set at same time" + ) if message_group is None or len(message_group) == 0: raise IllegalArgumentException("messageGroup should not be blank") self.__message_group = message_group @@ -190,13 +207,15 @@ class Message: @delivery_timestamp.setter def delivery_timestamp(self, delivery_timestamp): if self.__message_group is not None: - raise IllegalArgumentException("deliveryTimestamp and messageGroup should not be set at same time") + raise IllegalArgumentException( + "deliveryTimestamp and messageGroup should not be set at same time" + ) self.__delivery_timestamp = delivery_timestamp @keys.setter def keys(self, *keys): for key in keys: - if not key or key.strip() == '': + if not key or key.strip() == "": raise IllegalArgumentException("key should not be blank") self.__keys.update(set(keys)) @@ -205,8 +224,8 @@ class Message: self.__message_type = message_type def add_property(self, key, value): - if key is None or key.strip() == '': + if key is None or key.strip() == "": raise IllegalArgumentException("key should not be blank") - if value is None or value.strip() == '': + if value is None or value.strip() == "": raise IllegalArgumentException("value should not be blank") self.__properties[key] = value diff --git a/python/rocketmq/v5/model/metrics.py b/python/rocketmq/v5/model/metrics.py index 1106ae4c..31e84a4e 100644 --- a/python/rocketmq/v5/model/metrics.py +++ b/python/rocketmq/v5/model/metrics.py @@ -54,9 +54,15 @@ class MetricContext: class HistogramEnum(Enum): # a histogram that records the cost time of successful api calls of message publishing. - SEND_COST_TIME = ("rocketmq_send_cost_time", [1.0, 5.0, 10.0, 20.0, 50.0, 200.0, 500.0]) + SEND_COST_TIME = ( + "rocketmq_send_cost_time", + [1.0, 5.0, 10.0, 20.0, 50.0, 200.0, 500.0], + ) # a histogram that records the latency of message delivery from remote. - DELIVERY_LATENCY = ("rocketmq_delivery_latency", [1.0, 5.0, 10.0, 20.0, 50.0, 200.0, 500.0]) + DELIVERY_LATENCY = ( + "rocketmq_delivery_latency", + [1.0, 5.0, 10.0, 20.0, 50.0, 200.0, 500.0], + ) def __init__(self, histogram_name, buckets): self.__histogram_name = histogram_name diff --git a/python/rocketmq/v5/model/send_receipt.py b/python/rocketmq/v5/model/send_receipt.py index 90d576f5..f67997a1 100644 --- a/python/rocketmq/v5/model/send_receipt.py +++ b/python/rocketmq/v5/model/send_receipt.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. + class SendReceipt: def __init__(self, message_id, transaction_id, message_queue, offset): diff --git a/python/rocketmq/v5/model/topic_route.py b/python/rocketmq/v5/model/topic_route.py index e62b6d6f..b93728e2 100644 --- a/python/rocketmq/v5/model/topic_route.py +++ b/python/rocketmq/v5/model/topic_route.py @@ -31,10 +31,16 @@ class MessageQueue: self.__accept_message_types = set(queue.accept_message_types) def is_readable(self): - return self.__permission == Permission.READ or self.__permission == Permission.READ_WRITE + return ( + self.__permission == Permission.READ + or self.__permission == Permission.READ_WRITE + ) def is_writable(self): - return self.__permission == Permission.WRITE or self.__permission == Permission.READ_WRITE + return ( + self.__permission == Permission.WRITE + or self.__permission == Permission.READ_WRITE + ) def is_master_broker(self): return self.__broker_id == MessageQueue.MASTER_BROKER_ID @@ -42,7 +48,17 @@ class MessageQueue: def __eq__(self, other: object) -> bool: if not isinstance(other, MessageQueue): return False - ret = (self.__topic == other.__topic and self.__namespace == other.__namespace and self.__queue_id == other.__queue_id and self.__permission == other.__permission and self.__broker_name == other.__broker_name and self.__broker_id == other.__broker_id and self.__broker_endpoints == other.__broker_endpoints and sorted(self.__accept_message_types) == sorted(other.__accept_message_types)) + ret = ( + self.__topic == other.__topic + and self.__namespace == other.__namespace + and self.__queue_id == other.__queue_id + and self.__permission == other.__permission + and self.__broker_name == other.__broker_name + and self.__broker_id == other.__broker_id + and self.__broker_endpoints == other.__broker_endpoints + and sorted(self.__accept_message_types) + == sorted(other.__accept_message_types) + ) return ret def __str__(self): @@ -75,7 +91,9 @@ class MessageQueue: class TopicRouteData: def __init__(self, message_queues): - self.__message_queues = list(map(lambda queue: MessageQueue(queue), message_queues)) + self.__message_queues = list( + map(lambda queue: MessageQueue(queue), message_queues) + ) def __eq__(self, other): if self is other: @@ -88,7 +106,11 @@ class TopicRouteData: return hash(tuple(self.__message_queues)) def __str__(self): - return "message_queues:(" + ', '.join(str(queue) for queue in self.__message_queues) + ")" + return ( + "message_queues:(" + + ", ".join(str(queue) for queue in self.__message_queues) + + ")" + ) def all_endpoints(self): endpoints_map = {} diff --git a/python/rocketmq/v5/producer/producer.py b/python/rocketmq/v5/producer/producer.py index 9f4ae356..ba9e3d9b 100644 --- a/python/rocketmq/v5/producer/producer.py +++ b/python/rocketmq/v5/producer/producer.py @@ -48,21 +48,31 @@ class Transaction: def add_half_message(self, message: Message): with Transaction.__transaction_lock: if message is None: - raise IllegalArgumentException("add half message error, message is none.") + raise IllegalArgumentException( + "add half message error, message is none." + ) if self.__message is None: self.__message = message else: - raise IllegalArgumentException(f"message already existed in transaction, topic:{message.topic}") + raise IllegalArgumentException( + f"message already existed in transaction, topic:{message.topic}" + ) def add_send_receipt(self, send_receipt): with Transaction.__transaction_lock: if self.__message is None: - raise IllegalArgumentException("add send receipt error, no message in transaction.") + raise IllegalArgumentException( + "add send receipt error, no message in transaction." + ) if send_receipt is None: - raise IllegalArgumentException("add send receipt error, send receipt in none.") + raise IllegalArgumentException( + "add send receipt error, send receipt in none." + ) if self.__message.message_id != send_receipt.message_id: - raise IllegalArgumentException("can't add another send receipt to a half message.") + raise IllegalArgumentException( + "can't add another send receipt to a half message." + ) self.__send_receipt = send_receipt @@ -76,20 +86,28 @@ class Transaction: if self.__message is None: raise IllegalArgumentException("no message in transaction.") if self.__send_receipt is None or self.__send_receipt.transaction_id is None: - raise IllegalArgumentException("no transaction_id in transaction, must send half message at first.") + raise IllegalArgumentException( + "no transaction_id in transaction, must send half message at first." + ) try: - res = self.__producer.end_transaction(self.__send_receipt.message_queue.endpoints, self.__message, - self.__send_receipt.transaction_id, result, - TransactionSource.SOURCE_CLIENT) + res = self.__producer.end_transaction( + self.__send_receipt.message_queue.endpoints, + self.__message, + self.__send_receipt.transaction_id, + result, + TransactionSource.SOURCE_CLIENT, + ) if res.status.code != Code.OK: logger.error( - f"transaction commit or rollback error. topic:{self.__message.topic}, message_id:{self.__message.message_id}, transaction_id:{self.__send_receipt.transaction_id}, transactionResolution:{result}") + f"transaction commit or rollback error. topic:{self.__message.topic}, message_id:{self.__message.message_id}, transaction_id:{self.__send_receipt.transaction_id}, transactionResolution:{result}" + ) raise ClientException(res.status.message, res.status.code) return res except Exception as e: logger.error( - f"end transaction error, topic:{self.__message.topic}, message_id:{self.__send_receipt.message_id}, transaction_id:{self.__send_receipt.transaction_id}, transactionResolution:{result}: {e}") + f"end transaction error, topic:{self.__message.topic}, message_id:{self.__send_receipt.message_id}, transaction_id:{self.__send_receipt.transaction_id}, transactionResolution:{result}: {e}" + ) raise e """ property """ @@ -109,11 +127,15 @@ class TransactionChecker(metaclass=abc.ABCMeta): class Producer(Client): MAX_SEND_ATTEMPTS = 3 # max retry times when send failed - def __init__(self, client_configuration, topics=None, checker=None, tls_enable=False): + def __init__( + self, client_configuration, topics=None, checker=None, tls_enable=False + ): super().__init__(client_configuration, topics, ClientType.PRODUCER, tls_enable) # {topic, QueueSelector} self.__send_queue_selectors = ConcurrentMap() - self.__checker = checker # checker for transaction message, handle checking from server + self.__checker = ( + checker # checker for transaction message, handle checking from server + ) def __str__(self): return f"{ClientType.Name(self.client_type)} client_id:{self.client_id}" @@ -128,7 +150,8 @@ class Producer(Client): topic_queue = self.__select_send_queue(message.topic) if message.message_type not in topic_queue.accept_message_types: raise IllegalArgumentException( - f"current message type not match with queue accept message types, topic:{message.topic}, message_type:{message.message_type}, queue access type:{topic_queue.accept_message_types}") + f"current message type not match with queue accept message types, topic:{message.topic}, message_type:{message.message_type}, queue access type:{topic_queue.accept_message_types}" + ) if transaction is None: try: @@ -146,7 +169,9 @@ class Producer(Client): except IllegalArgumentException as e: raise e except Exception as e: - logger.error(f"send transaction message exception, topic: {message.topic}, e: {e}") + logger.error( + f"send transaction message exception, topic: {message.topic}, e: {e}" + ) raise e def send_async(self, message: Message): @@ -156,9 +181,11 @@ class Producer(Client): self.__wrap_sending_message(message, False) topic_queue = self.__select_send_queue(message.topic) if message.message_type not in topic_queue.accept_message_types: - raise IllegalArgumentException(f"current message type not match with queue accept message types, " - f"topic:{message.topic}, message_type:{message.message_type}, " - f"queue access type:{topic_queue.accept_message_types}") + raise IllegalArgumentException( + f"current message type not match with queue accept message types, " + f"topic:{message.topic}, message_type:{message.message_type}, " + f"queue access type:{topic_queue.accept_message_types}" + ) try: return self.__send_async(message, topic_queue) @@ -170,7 +197,9 @@ class Producer(Client): def begin_transaction(self): if self.is_running is False: - raise IllegalStateException("unable to begin transaction because producer is not running") + raise IllegalStateException( + "unable to begin transaction because producer is not running" + ) if self.__checker is None: raise IllegalArgumentException("Transaction checker should not be null.") @@ -178,22 +207,31 @@ class Producer(Client): def end_transaction(self, endpoints, message, transaction_id, result, source): if self.is_running is False: - raise IllegalStateException("unable to end transaction because producer is not running") + raise IllegalStateException( + "unable to end transaction because producer is not running" + ) if self.__checker is None: raise IllegalArgumentException("Transaction checker should not be null.") req = self.__end_transaction_req(message, transaction_id, result, source) - future = self.rpc_client.end_transaction_async(endpoints, req, metadata=self._sign(), - timeout=self.client_configuration.request_timeout) + future = self.rpc_client.end_transaction_async( + endpoints, + req, + metadata=self._sign(), + timeout=self.client_configuration.request_timeout, + ) return future.result() - async def on_recover_orphaned_transaction_command(self, endpoints, msg, transaction_id): + async def on_recover_orphaned_transaction_command( + self, endpoints, msg, transaction_id + ): # call this function from server side stream, in RpcClient._io_loop try: if self.is_running is False: raise IllegalStateException( - "unable to recover orphaned transaction command because producer is not running") + "unable to recover orphaned transaction command because producer is not running" + ) if self.__checker is None: raise IllegalArgumentException("No transaction checker registered.") @@ -201,15 +239,25 @@ class Producer(Client): result = self.__checker.check(message) if result == TransactionResolution.COMMIT: - res = await self.__commit_for_server_check(endpoints, message, transaction_id, - TransactionSource.SOURCE_SERVER_CHECK) + res = await self.__commit_for_server_check( + endpoints, + message, + transaction_id, + TransactionSource.SOURCE_SERVER_CHECK, + ) logger.debug( - f"commit message. message_id: {message.message_id}, transaction_id: {transaction_id}, res: {res}") + f"commit message. message_id: {message.message_id}, transaction_id: {transaction_id}, res: {res}" + ) elif result == TransactionResolution.ROLLBACK: - res = await self.__rollback_for_server_check(endpoints, message, transaction_id, - TransactionSource.SOURCE_SERVER_CHECK) + res = await self.__rollback_for_server_check( + endpoints, + message, + transaction_id, + TransactionSource.SOURCE_SERVER_CHECK, + ) logger.debug( - f"rollback message. message_id: {message.message_id}, transaction_id: {transaction_id}, res: {res}") + f"rollback message. message_id: {message.message_id}, transaction_id: {transaction_id}, res: {res}" + ) except Exception as e: logger.error(f"on_recover_orphaned_transaction_command exception: {e}") @@ -273,17 +321,35 @@ class Producer(Client): def __send(self, message: Message, topic_queue, attempt=1) -> SendReceipt: req = self.__send_req(message) send_context = self.client_metrics.send_before(message.topic) - send_message_future = self.rpc_client.send_message_async(topic_queue.endpoints, req, self._sign()) - return self.__handle_sync_send_receipt(send_message_future, message, topic_queue, attempt, send_context) - - def __handle_sync_send_receipt(self, send_message_future, message, topic_queue, attempt, send_metric_context=None): + send_message_future = self.rpc_client.send_message_async( + topic_queue.endpoints, + req, + self._sign(), + timeout=self.client_configuration.request_timeout, + ) + return self.__handle_sync_send_receipt( + send_message_future, message, topic_queue, attempt, send_context + ) + + def __handle_sync_send_receipt( + self, + send_message_future, + message, + topic_queue, + attempt, + send_metric_context=None, + ): try: - send_receipt = self.__process_send_message_response(send_message_future, topic_queue) + send_receipt = self.__process_send_message_response( + send_message_future, topic_queue + ) self.client_metrics.send_after(send_metric_context, True) return send_receipt except Exception as e: attempt += 1 - retry_exception_future = self.__check_send_retry_condition(message, topic_queue, attempt, e) + retry_exception_future = self.__check_send_retry_condition( + message, topic_queue, attempt, e + ) if retry_exception_future is not None: # end retry with exception self.client_metrics.send_after(send_metric_context, False) @@ -296,29 +362,55 @@ class Producer(Client): def __send_async(self, message: Message, topic_queue, attempt=1, ret_future=None): req = self.__send_req(message) send_context = self.client_metrics.send_before(message.topic) - send_message_future = self.rpc_client.send_message_async(topic_queue.endpoints, req, self._sign()) + send_message_future = self.rpc_client.send_message_async( + topic_queue.endpoints, + req, + self._sign(), + timeout=self.client_configuration.request_timeout, + ) if ret_future is None: ret_future = Future() - handle_send_receipt_callback = functools.partial(self.__handle_async_send_receipt, message=message, - topic_queue=topic_queue, attempt=attempt, - ret_future=ret_future, send_metric_context=send_context) + handle_send_receipt_callback = functools.partial( + self.__handle_async_send_receipt, + message=message, + topic_queue=topic_queue, + attempt=attempt, + ret_future=ret_future, + send_metric_context=send_context, + ) send_message_future.add_done_callback(handle_send_receipt_callback) return ret_future - def __handle_async_send_receipt(self, send_message_future, message, topic_queue, attempt, ret_future, - send_metric_context=None): + def __handle_async_send_receipt( + self, + send_message_future, + message, + topic_queue, + attempt, + ret_future, + send_metric_context=None, + ): try: - send_receipt = self.__process_send_message_response(send_message_future, topic_queue) + send_receipt = self.__process_send_message_response( + send_message_future, topic_queue + ) self.client_metrics.send_after(send_metric_context, True) - self._set_future_callback_result(CallbackResult.async_send_callback_result(ret_future, send_receipt)) + self._set_future_callback_result( + CallbackResult.async_send_callback_result(ret_future, send_receipt) + ) except Exception as e: attempt += 1 - retry_exception_future = self.__check_send_retry_condition(message, topic_queue, attempt, e) + retry_exception_future = self.__check_send_retry_condition( + message, topic_queue, attempt, e + ) if retry_exception_future is not None: # end retry with exception self.client_metrics.send_after(send_metric_context, False) self._set_future_callback_result( - CallbackResult.async_send_callback_result(ret_future, retry_exception_future.exception(), False)) + CallbackResult.async_send_callback_result( + ret_future, retry_exception_future.exception(), False + ) + ) return # resend message topic_queue = self.__select_send_queue(message.topic) @@ -328,28 +420,34 @@ class Producer(Client): res = send_message_future.result() MessagingResultChecker.check(res.status) entries = res.entries - assert len( - entries) == 1, f"entries size error, the send response entries size is {len(entries)}, {self.__str__()}" + assert ( + len(entries) == 1 + ), f"entries size error, the send response entries size is {len(entries)}, {self.__str__()}" entry = entries[0] - return SendReceipt(entry.message_id, entry.transaction_id, topic_queue, entry.offset) + return SendReceipt( + entry.message_id, entry.transaction_id, topic_queue, entry.offset + ) def __check_send_retry_condition(self, message, topic_queue, attempt, e): end_retry = False if attempt > Producer.MAX_SEND_ATTEMPTS: logger.error( - f"{self.__str__()} failed to send message to {topic_queue.endpoints.__str__()}, because of run out of attempt times, topic:{message.topic}, message_id:{message.message_id}, message_type:{message.message_type}, attempt:{attempt}") + f"{self.__str__()} failed to send message to {topic_queue.endpoints.__str__()}, because of run out of attempt times, topic:{message.topic}, message_id:{message.message_id}, message_type:{message.message_type}, attempt:{attempt}" + ) end_retry = True # no need more attempts for transactional message if message.message_type == MessageType.TRANSACTION: logger.error( - f"{self.__str__()} failed to send message to {topic_queue.endpoints.__str__()}, topic:{message.topic}, message_id:{message.message_id}, message_type:{message.message_type} ,attempt:{attempt}") + f"{self.__str__()} failed to send message to {topic_queue.endpoints.__str__()}, topic:{message.topic}, message_id:{message.message_id}, message_type:{message.message_type} ,attempt:{attempt}" + ) end_retry = True # end retry if system busy if isinstance(e, TooManyRequestsException): logger.error( - f"{self.__str__()} failed to send message to {topic_queue.endpoints.__str__()}, because of to too many requests, topic:{message.topic}, message_type:{message.message_type}, message_id:{message.message_id}, attempt:{attempt}") + f"{self.__str__()} failed to send message to {topic_queue.endpoints.__str__()}, because of to too many requests, topic:{message.topic}, message_type:{message.message_type}, message_id:{message.message_id}, attempt:{attempt}" + ) end_retry = True if end_retry: @@ -374,7 +472,8 @@ class Producer(Client): max_body_size = 4 * 1024 * 1024 # max body size is 4m if len(message.body) > max_body_size: raise IllegalArgumentException( - f"Message body size exceeds the threshold, max size={max_body_size} bytes") + f"Message body size exceeds the threshold, max size={max_body_size} bytes" + ) msg.body = message.body if message.tag is not None: @@ -391,13 +490,19 @@ class Producer(Client): if message.message_group is not None: msg.system_properties.message_group = message.message_group if message.delivery_timestamp is not None: - msg.system_properties.delivery_timestamp.seconds = message.delivery_timestamp + msg.system_properties.delivery_timestamp.seconds = ( + message.delivery_timestamp + ) return req except Exception as e: raise e def __send_message_type(self, message: Message, is_transaction=False): - if message.message_group is None and message.delivery_timestamp is None and is_transaction is False: + if ( + message.message_group is None + and message.delivery_timestamp is None + and is_transaction is False + ): return MessageType.NORMAL if message.message_group is not None and is_transaction is False: @@ -406,21 +511,30 @@ class Producer(Client): if message.delivery_timestamp is not None and is_transaction is False: return MessageType.DELAY - if message.message_group is None and message.delivery_timestamp is None and is_transaction is True: + if ( + message.message_group is None + and message.delivery_timestamp is None + and is_transaction is True + ): return MessageType.TRANSACTION # transaction semantics is conflicted with fifo/delay. - logger.error(f"{self.__str__()} set send message type exception, message: {str(message)}") - raise IllegalArgumentException("transactional message should not set messageGroup or deliveryTimestamp") + logger.error( + f"{self.__str__()} set send message type exception, message: {str(message)}" + ) + raise IllegalArgumentException( + "transactional message should not set messageGroup or deliveryTimestamp" + ) def __select_send_queue(self, topic): try: route = self._retrieve_topic_route_data(topic) - queue_selector = self.__send_queue_selectors.put_if_absent(topic, - QueueSelector.producer_queue_selector(route)) + queue_selector = self.__send_queue_selectors.put_if_absent( + topic, QueueSelector.producer_queue_selector(route) + ) return queue_selector.select_next_queue() except Exception as e: - logger.error(f"producer select topic:{topic} queue index exception, {e}") + logger.error(f"producer select topic:{topic} queue raise exception, {e}") raise e def __end_transaction_req(self, message: Message, transaction_id, result, source): @@ -433,15 +547,27 @@ class Producer(Client): req.source = source return req - def __commit_for_server_check(self, endpoints, message: Message, transaction_id, source): - return self.__end_transaction_for_server_check(endpoints, message, transaction_id, TransactionResolution.COMMIT, - source) - - def __rollback_for_server_check(self, endpoints, message: Message, transaction_id, source): - return self.__end_transaction_for_server_check(endpoints, message, transaction_id, - TransactionResolution.ROLLBACK, source) - - def __end_transaction_for_server_check(self, endpoints, message: Message, transaction_id, result, source): + def __commit_for_server_check( + self, endpoints, message: Message, transaction_id, source + ): + return self.__end_transaction_for_server_check( + endpoints, message, transaction_id, TransactionResolution.COMMIT, source + ) + + def __rollback_for_server_check( + self, endpoints, message: Message, transaction_id, source + ): + return self.__end_transaction_for_server_check( + endpoints, message, transaction_id, TransactionResolution.ROLLBACK, source + ) + + def __end_transaction_for_server_check( + self, endpoints, message: Message, transaction_id, result, source + ): req = self.__end_transaction_req(message, transaction_id, result, source) - return self.rpc_client.end_transaction_for_server_check(endpoints, req, metadata=self._sign(), - timeout=self.client_configuration.request_timeout) + return self.rpc_client.end_transaction_for_server_check( + endpoints, + req, + metadata=self._sign(), + timeout=self.client_configuration.request_timeout, + ) diff --git a/python/rocketmq/v5/test/test_base.py b/python/rocketmq/v5/test/test_base.py index aa49fafc..fd315eee 100644 --- a/python/rocketmq/v5/test/test_base.py +++ b/python/rocketmq/v5/test/test_base.py @@ -33,7 +33,7 @@ class TestBase: FAKE_CLIENT_ID = ClientId() FAKE_TOPIC_0 = "foo-bar-topic-0" FAKE_TOPIC_1 = "foo-bar-topic-1" - FAKE_MESSAGE_BODY = "foobar".encode('utf-8') + FAKE_MESSAGE_BODY = "foobar".encode("utf-8") FAKE_TAG_0 = "foo-bar-tag-0" FAKE_BROKER_NAME_0 = "foo-bar-broker-name-0" FAKE_BROKER_NAME_1 = "foo-bar-broker-name-1" @@ -49,7 +49,9 @@ class TestBase: @staticmethod def fake_client_config(): credentials = Credentials(TestBase.FAKE_AK, TestBase.FAKE_SK) - config = ClientConfiguration(TestBase.FAKE_ENDPOINTS, credentials, TestBase.FAKE_NAMESPACE) + config = ClientConfiguration( + TestBase.FAKE_ENDPOINTS, credentials, TestBase.FAKE_NAMESPACE + ) return config @staticmethod @@ -75,7 +77,9 @@ class TestBase: msg.body = TestBase.FAKE_MESSAGE_BODY msg.system_properties.born_host = TestBase.FAKE_HOST_0 msg.system_properties.born_timestamp.seconds = int(time.time() * 1000) - msg.system_properties.delivery_timestamp.seconds = msg.system_properties.born_timestamp.seconds - 10 + msg.system_properties.delivery_timestamp.seconds = ( + msg.system_properties.born_timestamp.seconds - 10 + ) msg.system_properties.message_type = 1 msg.system_properties.body_encoding = 1 return msg @@ -103,7 +107,13 @@ class TestBase: fake_queue.broker.CopyFrom(fake_broker) fake_queue.permission = Permission.READ_WRITE fake_queue.accept_message_types.extend( - (MessageType.NORMAL, MessageType.FIFO, MessageType.DELAY, MessageType.TRANSACTION)) + ( + MessageType.NORMAL, + MessageType.FIFO, + MessageType.DELAY, + MessageType.TRANSACTION, + ) + ) return MessageQueue(fake_queue) @staticmethod @@ -135,7 +145,12 @@ class TestBase: fake_response = TestBase.fake_send_success_response() fake_message_queue = TestBase.fake_queue(topic) fake_entry = fake_response.entries[0] - return SendReceipt(fake_entry.message_id, fake_entry.transaction_id, fake_message_queue, fake_entry.offset) + return SendReceipt( + fake_entry.message_id, + fake_entry.transaction_id, + fake_message_queue, + fake_entry.offset, + ) @staticmethod def fake_receive_receipt(): diff --git a/python/rocketmq/v5/test/test_consumer.py b/python/rocketmq/v5/test/test_consumer.py index 47db493a..8bd107cb 100644 --- a/python/rocketmq/v5/test/test_consumer.py +++ b/python/rocketmq/v5/test/test_consumer.py @@ -26,24 +26,40 @@ from rocketmq.v5.test import TestBase class TestNormalConsumer(unittest.TestCase): - @patch.object(Message, '_Message__message_body_check_sum') - @patch.object(SimpleConsumer, '_SimpleConsumer__receive_message_response') - @patch.object(RpcClient, 'receive_message_async') - @patch.object(SimpleConsumer, '_SimpleConsumer__select_topic_queue', - return_value=TestBase.fake_queue(TestBase.FAKE_TOPIC_0)) - @patch.object(SimpleConsumer, '_SimpleConsumer__select_topic_for_receive', return_value=TestBase.FAKE_TOPIC_0) - @patch.object(Client, '_Client__start_scheduler', return_value=None) - @patch.object(Client, '_Client__update_topic_route', return_value=None) - def test_receive(self, mock_update_topic_route, mock_start_scheduler, mock_select_topic_for_receive, - mock_select_topic_queue, mock_receive_message_async, mock_receive_message_response, - mock_message_body_check_sum): + @patch.object(Message, "_Message__message_body_check_sum") + @patch.object(SimpleConsumer, "_SimpleConsumer__receive_message_response") + @patch.object(RpcClient, "receive_message_async") + @patch.object( + SimpleConsumer, + "_SimpleConsumer__select_topic_queue", + return_value=TestBase.fake_queue(TestBase.FAKE_TOPIC_0), + ) + @patch.object( + SimpleConsumer, + "_SimpleConsumer__select_topic_for_receive", + return_value=TestBase.FAKE_TOPIC_0, + ) + @patch.object(Client, "_Client__start_scheduler", return_value=None) + @patch.object(Client, "_Client__update_topic_route", return_value=None) + def test_receive( + self, + mock_update_topic_route, + mock_start_scheduler, + mock_select_topic_for_receive, + mock_select_topic_queue, + mock_receive_message_async, + mock_receive_message_response, + mock_message_body_check_sum, + ): future = Future() future.set_result(list()) mock_receive_message_async.return_value = future mock_receive_message_response.return_value = TestBase.fake_receive_receipt() subs = {TestBase.FAKE_TOPIC_0: FilterExpression()} - consumer = SimpleConsumer(TestBase.fake_client_config(), TestBase.FAKE_CONSUMER_GROUP_0, subs) + consumer = SimpleConsumer( + TestBase.fake_client_config(), TestBase.FAKE_CONSUMER_GROUP_0, subs + ) consumer.startup() messages = consumer.receive(32, 10) self.assertIsInstance(messages[0], Message) diff --git a/python/rocketmq/v5/test/test_producer.py b/python/rocketmq/v5/test/test_producer.py index 4dc73d29..e85864c1 100644 --- a/python/rocketmq/v5/test/test_producer.py +++ b/python/rocketmq/v5/test/test_producer.py @@ -26,10 +26,16 @@ from rocketmq.v5.test import TestBase class TestNormalProducer(unittest.TestCase): - @patch.object(Producer, '_Producer__select_send_queue', return_value=TestBase.fake_queue(TestBase.FAKE_TOPIC_0)) - @patch.object(RpcClient, 'send_message_async') - @patch.object(Client, '_Client__start_scheduler', return_value=None) - def test_send(self, mock_start_scheduler, mock_send_message_async, mock_select_send_queue): + @patch.object( + Producer, + "_Producer__select_send_queue", + return_value=TestBase.fake_queue(TestBase.FAKE_TOPIC_0), + ) + @patch.object(RpcClient, "send_message_async") + @patch.object(Client, "_Client__start_scheduler", return_value=None) + def test_send( + self, mock_start_scheduler, mock_send_message_async, mock_select_send_queue + ): # mock send_message_async return future future = Future() future.set_result(TestBase.fake_send_success_response()) diff --git a/python/rocketmq/v5/util/client_id.py b/python/rocketmq/v5/util/client_id.py index 44ef02f0..04054b2c 100644 --- a/python/rocketmq/v5/util/client_id.py +++ b/python/rocketmq/v5/util/client_id.py @@ -30,7 +30,15 @@ class ClientId: host_name = gethostname() process_id = getpid() base36_time = Misc.to_base36(time_ns()) - self.__client_id = (host_name + ClientId.CLIENT_ID_SEPARATOR + str(process_id) + ClientId.CLIENT_ID_SEPARATOR + str(self.__client_index) + ClientId.CLIENT_ID_SEPARATOR + base36_time) + self.__client_id = ( + host_name + + ClientId.CLIENT_ID_SEPARATOR + + str(process_id) + + ClientId.CLIENT_ID_SEPARATOR + + str(self.__client_index) + + ClientId.CLIENT_ID_SEPARATOR + + base36_time + ) @property def client_id(self): diff --git a/python/rocketmq/v5/util/message_id_codec.py b/python/rocketmq/v5/util/message_id_codec.py index eec4f94a..a0b83b4c 100644 --- a/python/rocketmq/v5/util/message_id_codec.py +++ b/python/rocketmq/v5/util/message_id_codec.py @@ -74,17 +74,21 @@ class MessageIdCodec: def __init__(self): with MessageIdCodec._instance_lock: - if not hasattr(self, 'initialized'): + if not hasattr(self, "initialized"): buffer = bytearray(8) - mac = getnode().to_bytes(6, byteorder='big') + mac = getnode().to_bytes(6, byteorder="big") buffer[0:6] = mac pid = getpid() pid_buffer = bytearray(4) - pid_buffer[0:4] = pid.to_bytes(4, byteorder='big') + pid_buffer[0:4] = pid.to_bytes(4, byteorder="big") buffer[6:8] = pid_buffer[2:4] self.process_fixed_string_v1 = buffer.hex().upper() self.seconds_since_custom_epoch = int( - (datetime.now(timezone.utc) - datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone.utc)).total_seconds()) + ( + datetime.now(timezone.utc) + - datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + ).total_seconds() + ) self.seconds_start_timestamp = int(time()) self.seconds = self.__delta_time() self.sequence = None @@ -95,9 +99,13 @@ class MessageIdCodec: if self.seconds != delta_seconds: self.seconds = delta_seconds buffer = bytearray(8) - buffer[0:4] = self.seconds.to_bytes(8, byteorder='big')[4:8] - buffer[4:8] = self.__sequence_id().to_bytes(4, byteorder='big') - return MessageIdCodec.MESSAGE_ID_VERSION_V1 + self.process_fixed_string_v1 + buffer.hex().upper() + buffer[0:4] = self.seconds.to_bytes(8, byteorder="big")[4:8] + buffer[4:8] = self.__sequence_id().to_bytes(4, byteorder="big") + return ( + MessageIdCodec.MESSAGE_ID_VERSION_V1 + + self.process_fixed_string_v1 + + buffer.hex().upper() + ) @staticmethod def decode(message_id): @@ -106,7 +114,9 @@ class MessageIdCodec: """ private """ def __delta_time(self): - return int(time()) - self.seconds_start_timestamp + self.seconds_since_custom_epoch + return ( + int(time()) - self.seconds_start_timestamp + self.seconds_since_custom_epoch + ) def __sequence_id(self): self.sequence = MessageIdCodec.__index.get_and_increment() diff --git a/python/rocketmq/v5/util/messaging_result_checker.py b/python/rocketmq/v5/util/messaging_result_checker.py index f8bc1ee0..4091397a 100644 --- a/python/rocketmq/v5/util/messaging_result_checker.py +++ b/python/rocketmq/v5/util/messaging_result_checker.py @@ -34,12 +34,27 @@ class MessagingResultChecker: if code == Code.OK or code == Code.MULTIPLE_RESULTS: return - elif code == Code.BAD_REQUEST or code == Code.ILLEGAL_ACCESS_POINT or code == Code.ILLEGAL_TOPIC \ - or code == Code.ILLEGAL_CONSUMER_GROUP or code == Code.ILLEGAL_MESSAGE_TAG or code == Code.ILLEGAL_MESSAGE_KEY \ - or code == Code.ILLEGAL_MESSAGE_GROUP or code == Code.ILLEGAL_MESSAGE_PROPERTY_KEY or code == Code.INVALID_TRANSACTION_ID \ - or code == Code.ILLEGAL_MESSAGE_ID or code == Code.ILLEGAL_FILTER_EXPRESSION or code == Code.ILLEGAL_INVISIBLE_TIME \ - or code == Code.ILLEGAL_DELIVERY_TIME or code == Code.INVALID_RECEIPT_HANDLE or code == Code.MESSAGE_PROPERTY_CONFLICT_WITH_TYPE \ - or code == Code.UNRECOGNIZED_CLIENT_TYPE or code == Code.MESSAGE_CORRUPTED or code == Code.CLIENT_ID_REQUIRED or code == Code.ILLEGAL_POLLING_TIME: + elif ( + code == Code.BAD_REQUEST + or code == Code.ILLEGAL_ACCESS_POINT + or code == Code.ILLEGAL_TOPIC + or code == Code.ILLEGAL_CONSUMER_GROUP + or code == Code.ILLEGAL_MESSAGE_TAG + or code == Code.ILLEGAL_MESSAGE_KEY + or code == Code.ILLEGAL_MESSAGE_GROUP + or code == Code.ILLEGAL_MESSAGE_PROPERTY_KEY + or code == Code.INVALID_TRANSACTION_ID + or code == Code.ILLEGAL_MESSAGE_ID + or code == Code.ILLEGAL_FILTER_EXPRESSION + or code == Code.ILLEGAL_INVISIBLE_TIME + or code == Code.ILLEGAL_DELIVERY_TIME + or code == Code.INVALID_RECEIPT_HANDLE + or code == Code.MESSAGE_PROPERTY_CONFLICT_WITH_TYPE + or code == Code.UNRECOGNIZED_CLIENT_TYPE + or code == Code.MESSAGE_CORRUPTED + or code == Code.CLIENT_ID_REQUIRED + or code == Code.ILLEGAL_POLLING_TIME + ): raise BadRequestException(message, code) elif code == Code.UNAUTHORIZED: raise UnauthorizedException(message, code) @@ -49,19 +64,38 @@ class MessagingResultChecker: raise ForbiddenException(message, code) elif code == Code.MESSAGE_NOT_FOUND: return - elif code == Code.NOT_FOUND or code == Code.TOPIC_NOT_FOUND or code == Code.CONSUMER_GROUP_NOT_FOUND: + elif ( + code == Code.NOT_FOUND + or code == Code.TOPIC_NOT_FOUND + or code == Code.CONSUMER_GROUP_NOT_FOUND + ): raise NotFoundException(message, code) elif code == Code.PAYLOAD_TOO_LARGE or code == Code.MESSAGE_BODY_TOO_LARGE: raise PayloadTooLargeException(message, code) elif code == Code.TOO_MANY_REQUESTS: raise TooManyRequestsException(message, code) - elif code == Code.REQUEST_HEADER_FIELDS_TOO_LARGE or code == Code.MESSAGE_PROPERTIES_TOO_LARGE: + elif ( + code == Code.REQUEST_HEADER_FIELDS_TOO_LARGE + or code == Code.MESSAGE_PROPERTIES_TOO_LARGE + ): raise RequestHeaderFieldsTooLargeException(message, code) - elif code == Code.INTERNAL_ERROR or code == Code.INTERNAL_SERVER_ERROR or code == Code.HA_NOT_AVAILABLE: + elif ( + code == Code.INTERNAL_ERROR + or code == Code.INTERNAL_SERVER_ERROR + or code == Code.HA_NOT_AVAILABLE + ): raise InternalErrorException(message, code) - elif code == Code.PROXY_TIMEOUT or code == Code.MASTER_PERSISTENCE_TIMEOUT or code == Code.SLAVE_PERSISTENCE_TIMEOUT: + elif ( + code == Code.PROXY_TIMEOUT + or code == Code.MASTER_PERSISTENCE_TIMEOUT + or code == Code.SLAVE_PERSISTENCE_TIMEOUT + ): raise ProxyTimeoutException(message, code) - elif code == Code.UNSUPPORTED or code == Code.VERSION_UNSUPPORTED or code == Code.VERIFY_FIFO_MESSAGE_UNSUPPORTED: + elif ( + code == Code.UNSUPPORTED + or code == Code.VERSION_UNSUPPORTED + or code == Code.VERIFY_FIFO_MESSAGE_UNSUPPORTED + ): raise UnsupportedException(message, code) else: logger.warn(f"unrecognized status code:{code}, message:{message}") diff --git a/python/rocketmq/v5/util/misc.py b/python/rocketmq/v5/util/misc.py index d371625f..b021b690 100644 --- a/python/rocketmq/v5/util/misc.py +++ b/python/rocketmq/v5/util/misc.py @@ -27,9 +27,9 @@ from rocketmq.v5.log import logger class Misc: __LOCAL_IP = None __OS_NAME = None - TOPIC_PATTERN = compile(r'^[%a-zA-Z0-9_-]+$') - CONSUMER_GROUP_PATTERN = compile(r'^[%a-zA-Z0-9_-]+$') - SDK_VERSION = "5.0.2" + TOPIC_PATTERN = compile(r"^[%a-zA-Z0-9_-]+$") + CONSUMER_GROUP_PATTERN = compile(r"^[%a-zA-Z0-9_-]+$") + SDK_VERSION = "5.0.3" @staticmethod def sdk_language(): @@ -41,33 +41,33 @@ class Misc: @staticmethod def to_base36(n): - chars = '0123456789abcdefghijklmnopqrstuvwxyz' + chars = "0123456789abcdefghijklmnopqrstuvwxyz" result = [] if n == 0: - return '0' + return "0" while n > 0: n, r = divmod(n, 36) result.append(chars[r]) - return ''.join(reversed(result)) + return "".join(reversed(result)) @staticmethod def get_local_ip(): if Misc.__LOCAL_IP is None: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: - s.connect(('8.8.8.8', 80)) + s.connect(("8.8.8.8", 80)) Misc.__LOCAL_IP = s.getsockname()[0] except Exception as e: logger.error(f"get local ip exception: {e}") - return '127.0.0.1' + return "127.0.0.1" finally: s.close() return Misc.__LOCAL_IP @staticmethod def crc32_checksum(array): - crc32_value = zlib.crc32(array) & 0xffffffff - return format(crc32_value, '08X') + crc32_value = zlib.crc32(array) & 0xFFFFFFFF + return format(crc32_value, "08X") @staticmethod def md5_checksum(array): @@ -83,7 +83,7 @@ class Misc: @staticmethod def uncompress_bytes_gzip(body): - if body and body[:2] == b'\x1f\x8b': + if body and body[:2] == b"\x1f\x8b": body = gzip.decompress(body) # Standard Gzip format else: body = zlib.decompress(body) # deflate zip diff --git a/python/rocketmq/v5/util/signature.py b/python/rocketmq/v5/util/signature.py index a23dd26b..f5983da7 100644 --- a/python/rocketmq/v5/util/signature.py +++ b/python/rocketmq/v5/util/signature.py @@ -30,19 +30,21 @@ class Signature: formatted_date_time = now.strftime("%Y%m%dT%H%M%SZ") request_id = str(uuid4()) sign = Signature.sign(config.credentials.sk, formatted_date_time) - authorization = "MQv2-HMAC-SHA1" \ - + " " \ - + "Credential" \ - + "=" \ - + config.credentials.ak \ - + ", " \ - + "SignedHeaders" \ - + "=" \ - + "x-mq-date-time" \ - + ", " \ - + "Signature" \ - + "=" \ - + sign + authorization = ( + "MQv2-HMAC-SHA1" + + " " + + "Credential" + + "=" + + config.credentials.ak + + ", " + + "SignedHeaders" + + "=" + + "x-mq-date-time" + + ", " + + "Signature" + + "=" + + sign + ) metadata = [ ("x-mq-language", "PYTHON"), ("x-mq-protocol", "GRPC_V2"), @@ -51,12 +53,12 @@ class Signature: ("x-mq-request-id", request_id), ("x-mq-client-id", client_id), ("x-mq-namespace", config.namespace), - ("authorization", authorization) + ("authorization", authorization), ] return metadata @staticmethod def sign(access_secret, date_time): - signing_key = access_secret.encode('utf-8') - mac = new(signing_key, date_time.encode('utf-8'), sha1) - return hexlify(mac.digest()).decode('utf-8') + signing_key = access_secret.encode("utf-8") + mac = new(signing_key, date_time.encode("utf-8"), sha1) + return hexlify(mac.digest()).decode("utf-8") diff --git a/python/setup.py b/python/setup.py index 6e8453f7..3881d6d7 100644 --- a/python/setup.py +++ b/python/setup.py @@ -17,7 +17,7 @@ from setuptools import find_packages, setup setup( name='rocketmq-python-client', - version='5.0.2', + version='5.0.3', packages=find_packages(), install_requires=[ "grpcio>=1.5.0",