This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 04251964 python 5.0.3 (#937)
04251964 is described below

commit 0425196498f9936013c13d6408159ae106e9096d
Author: zhouli11 <04081...@163.com>
AuthorDate: Tue Feb 11 13:58:00 2025 +0800

    python 5.0.3 (#937)
    
    * update to 5.0.3
---
 .../rocketmq/v5/client/balancer/queue_selector.py  |  44 +++-
 python/rocketmq/v5/client/client.py                | 235 ++++++++++++------
 python/rocketmq/v5/client/client_configuration.py  |  17 +-
 .../rocketmq/v5/client/connection/rpc_channel.py   |  79 ++++--
 python/rocketmq/v5/client/connection/rpc_client.py | 275 ++++++++++++++-------
 .../rocketmq/v5/client/metrics/client_metrics.py   |  60 +++--
 python/rocketmq/v5/consumer/simple_consumer.py     | 193 +++++++++++----
 python/rocketmq/v5/exception/client_exception.py   |   1 +
 python/rocketmq/v5/log/log_config.py               |  38 ++-
 python/rocketmq/v5/model/callback_result.py        |  24 +-
 python/rocketmq/v5/model/filter_expression.py      |   6 +-
 python/rocketmq/v5/model/message.py                |  57 +++--
 python/rocketmq/v5/model/metrics.py                |  10 +-
 python/rocketmq/v5/model/send_receipt.py           |   1 +
 python/rocketmq/v5/model/topic_route.py            |  32 ++-
 python/rocketmq/v5/producer/producer.py            | 270 ++++++++++++++------
 python/rocketmq/v5/test/test_base.py               |  25 +-
 python/rocketmq/v5/test/test_consumer.py           |  40 ++-
 python/rocketmq/v5/test/test_producer.py           |  14 +-
 python/rocketmq/v5/util/client_id.py               |  10 +-
 python/rocketmq/v5/util/message_id_codec.py        |  26 +-
 .../rocketmq/v5/util/messaging_result_checker.py   |  56 ++++-
 python/rocketmq/v5/util/misc.py                    |  22 +-
 python/rocketmq/v5/util/signature.py               |  36 +--
 python/setup.py                                    |   2 +-
 25 files changed, 1113 insertions(+), 460 deletions(-)

diff --git a/python/rocketmq/v5/client/balancer/queue_selector.py 
b/python/rocketmq/v5/client/balancer/queue_selector.py
index add42089..94ed97f7 100644
--- a/python/rocketmq/v5/client/balancer/queue_selector.py
+++ b/python/rocketmq/v5/client/balancer/queue_selector.py
@@ -33,18 +33,36 @@ class QueueSelector:
 
     @classmethod
     def producer_queue_selector(cls, topic_route: TopicRouteData):
-        return cls(list(filter(lambda queue: queue.is_writable() and 
queue.is_master_broker(), topic_route.message_queues)),
-                   QueueSelector.PRODUCER_QUEUE_SELECTOR)
+        return cls(
+            list(
+                filter(
+                    lambda queue: queue.is_writable() and 
queue.is_master_broker(),
+                    topic_route.message_queues,
+                )
+            ),
+            QueueSelector.PRODUCER_QUEUE_SELECTOR,
+        )
 
     @classmethod
     def simple_consumer_queue_selector(cls, topic_route: TopicRouteData):
-        return cls(list(filter(lambda queue: queue.is_readable() and 
queue.is_master_broker(), topic_route.message_queues)),
-                   QueueSelector.SIMPLE_CONSUMER_QUEUE_SELECTOR)
+        return cls(
+            list(
+                filter(
+                    lambda queue: queue.is_readable() and 
queue.is_master_broker(),
+                    topic_route.message_queues,
+                )
+            ),
+            QueueSelector.SIMPLE_CONSUMER_QUEUE_SELECTOR,
+        )
 
     def select_next_queue(self):
         if self.__selector_type == QueueSelector.NONE_TYPE_SELECTOR:
-            raise IllegalArgumentException("error type for queue selector, 
type is NONE_TYPE_SELECTOR.")
-        return self.__message_queues[self.__index.get_and_increment() % 
len(self.__message_queues)]
+            raise IllegalArgumentException(
+                "error type for queue selector, type is NONE_TYPE_SELECTOR."
+            )
+        return self.__message_queues[
+            self.__index.get_and_increment() % len(self.__message_queues)
+        ]
 
     def all_queues(self):
         index = self.__index.get_and_increment() % len(self.__message_queues)
@@ -54,6 +72,16 @@ class QueueSelector:
         if topic_route.message_queues == self.__message_queues:
             return
         if self.__selector_type == QueueSelector.PRODUCER_QUEUE_SELECTOR:
-            self.__message_queues = list(filter(lambda queue: 
queue.is_writable() and queue.is_master_broker(), topic_route.message_queues))
+            self.__message_queues = list(
+                filter(
+                    lambda queue: queue.is_writable() and 
queue.is_master_broker(),
+                    topic_route.message_queues,
+                )
+            )
         elif self.__selector_type == 
QueueSelector.SIMPLE_CONSUMER_QUEUE_SELECTOR:
-            self.__message_queues = list(filter(lambda queue: 
queue.is_readable() and queue.is_master_broker(), topic_route.message_queues))
+            self.__message_queues = list(
+                filter(
+                    lambda queue: queue.is_readable() and 
queue.is_master_broker(),
+                    topic_route.message_queues,
+                )
+            )
diff --git a/python/rocketmq/v5/client/client.py 
b/python/rocketmq/v5/client/client.py
index 57cf2a0b..b6c40f93 100644
--- a/python/rocketmq/v5/client/client.py
+++ b/python/rocketmq/v5/client/client.py
@@ -36,7 +36,9 @@ class Client:
 
     CALLBACK_THREADS_NUM = 5
 
-    def __init__(self, client_configuration, topics, client_type: ClientType, 
tls_enable=False):
+    def __init__(
+        self, client_configuration, topics, client_type: ClientType, 
tls_enable=False
+    ):
         if client_configuration is None:
             raise IllegalArgumentException("clientConfiguration should not be 
null.")
         self.__client_configuration = client_configuration
@@ -55,7 +57,9 @@ class Client:
         self.__sync_setting_scheduler_threading_event = None
         self.__clear_idle_rpc_channels_threading_event = None
         if topics is not None:
-            self.__topics = set(filter(lambda topic: 
Misc.is_valid_topic(topic), topics))
+            self.__topics = set(
+                filter(lambda topic: Misc.is_valid_topic(topic), topics)
+            )
         else:
             self.__topics = set()
         self.__callback_result_queue = Queue()
@@ -67,7 +71,9 @@ class Client:
     def startup(self):
         try:
             if self.__had_shutdown is True:
-                raise Exception(f"client:{self.__client_id} had shutdown, 
can't startup again.")
+                raise Exception(
+                    f"client:{self.__client_id} had shutdown, can't startup 
again."
+                )
 
             try:
                 # pre update topic route for producer or consumer
@@ -76,7 +82,8 @@ class Client:
             except Exception as e:
                 # ignore this exception and retrieve again when calling send 
or receive
                 logger.warn(
-                    f"update topic exception when client startup, ignore it, 
try it again in scheduler. exception: {e}")
+                    f"update topic exception when client startup, ignore it, 
try it again in scheduler. exception: {e}"
+                )
             self.__start_scheduler()
             self.__start_async_rpc_callback_handler()
             self.__is_running = True
@@ -110,27 +117,27 @@ class Client:
     """ abstract """
 
     def _start_success(self):
-        """ each subclass implements its own actions after a successful 
startup """
+        """each subclass implements its own actions after a successful 
startup"""
         pass
 
     def _start_failure(self):
-        """ each subclass implements its own actions after a startup failure 
"""
+        """each subclass implements its own actions after a startup failure"""
         pass
 
     def _sync_setting_req(self, endpoints):
-        """ each subclass implements its own telemetry settings scheme """
+        """each subclass implements its own telemetry settings scheme"""
         pass
 
     def _heartbeat_req(self):
-        """ each subclass implements its own heartbeat request """
+        """each subclass implements its own heartbeat request"""
         pass
 
     def _notify_client_termination_req(self):
-        """ each subclass implements its own client termination request """
+        """each subclass implements its own client termination request"""
         pass
 
     def _update_queue_selector(self, topic, topic_route):
-        """ each subclass implements its own queue selector """
+        """each subclass implements its own queue selector"""
         pass
 
     """ scheduler """
@@ -140,29 +147,36 @@ class Client:
         try:
             # update topic route every 30 seconds
             self.__client_thread_task_enabled = True
-            self.__topic_route_scheduler = 
threading.Thread(target=self.__schedule_update_topic_route_cache,
-                                                            
name="update_topic_route_schedule_thread")
+            self.__topic_route_scheduler = threading.Thread(
+                target=self.__schedule_update_topic_route_cache,
+                name="update_topic_route_schedule_thread",
+            )
             self.__topic_route_scheduler_threading_event = threading.Event()
             self.__topic_route_scheduler.start()
             logger.info("start topic route scheduler success.")
 
             # send heartbeat to all endpoints every 10 seconds
-            self.__heartbeat_scheduler = 
threading.Thread(target=self.__schedule_heartbeat,
-                                                          
name="heartbeat_schedule_thread")
+            self.__heartbeat_scheduler = threading.Thread(
+                target=self.__schedule_heartbeat, 
name="heartbeat_schedule_thread"
+            )
             self.__heartbeat_scheduler_threading_event = threading.Event()
             self.__heartbeat_scheduler.start()
             logger.info("start heartbeat scheduler success.")
 
             # send client setting to all endpoints every 5 seconds
-            self.__sync_setting_scheduler = 
threading.Thread(target=self.__schedule_update_setting,
-                                                             
name="sync_setting_schedule_thread")
+            self.__sync_setting_scheduler = threading.Thread(
+                target=self.__schedule_update_setting,
+                name="sync_setting_schedule_thread",
+            )
             self.__sync_setting_scheduler_threading_event = threading.Event()
             self.__sync_setting_scheduler.start()
             logger.info("start sync setting scheduler success.")
 
             # clear unused grpc channel(>30 minutes) every 60 seconds
-            self.__clear_idle_rpc_channels_scheduler = 
threading.Thread(target=self.__schedule_clear_idle_rpc_channels,
-                                                                        
name="clear_idle_rpc_channel_schedule_thread")
+            self.__clear_idle_rpc_channels_scheduler = threading.Thread(
+                target=self.__schedule_clear_idle_rpc_channels,
+                name="clear_idle_rpc_channel_schedule_thread",
+            )
             self.__clear_idle_rpc_channels_threading_event = threading.Event()
             self.__clear_idle_rpc_channels_scheduler.start()
             logger.info("start clear idle rpc channels scheduler success.")
@@ -176,7 +190,7 @@ class Client:
         while True:
             if self.__client_thread_task_enabled is True:
                 self.__topic_route_scheduler_threading_event.wait(30)
-                logger.debug(f"{self.__str__()} run scheduler for update topic 
route cache.")
+                logger.debug(f"{self.__str__()} run update topic route in 
scheduler.")
                 # update topic route for each topic in cache
                 topics = self.__topic_route_cache.keys()
                 for topic in topics:
@@ -185,24 +199,29 @@ class Client:
                             self.__update_topic_route_async(topic)
                     except Exception as e:
                         logger.error(
-                            f"{self.__str__()} run scheduler for update 
topic:{topic} route cache exception: {e}")
+                            f"{self.__str__()} scheduler update topic:{topic} 
route raise exception: {e}"
+                        )
             else:
                 break
-        logger.info(f"{self.__str__()} stop scheduler for update topic route 
cache success.")
+        logger.info(
+            f"{self.__str__()} stop scheduler for update topic route cache 
success."
+        )
 
     def __schedule_heartbeat(self):
         asyncio.set_event_loop(self._rpc_channel_io_loop())
         while True:
             if self.__client_thread_task_enabled is True:
                 self.__heartbeat_scheduler_threading_event.wait(10)
-                logger.debug(f"{self.__str__()} run scheduler for heartbeat.")
+                logger.debug(f"{self.__str__()} run send heartbeat in 
scheduler.")
                 all_endpoints = self.__get_all_endpoints().values()
                 try:
                     for endpoints in all_endpoints:
                         if self.__client_thread_task_enabled is True:
                             self.__heartbeat_async(endpoints)
                 except Exception as e:
-                    logger.error(f"{self.__str__()} run scheduler for 
heartbeat exception: {e}")
+                    logger.error(
+                        f"{self.__str__()} scheduler send heartbeat raise 
exception: {e}"
+                    )
             else:
                 break
         logger.info(f"{self.__str__()} stop scheduler for heartbeat success.")
@@ -212,16 +231,16 @@ class Client:
         while True:
             if self.__client_thread_task_enabled is True:
                 self.__sync_setting_scheduler_threading_event.wait(5)
-                logger.debug(f"{self.__str__()} run scheduler for update 
setting.")
+                logger.debug(f"{self.__str__()} run update setting in 
scheduler.")
                 try:
                     all_endpoints = self.__get_all_endpoints().values()
                     for endpoints in all_endpoints:
                         if self.__client_thread_task_enabled is True:
-                            # if stream_stream_call for grpc channel is none, 
create a new one, otherwise use the existing one
-                            
self.__retrieve_telemetry_stream_stream_call(endpoints)
                             self.__setting_write(endpoints)
                 except Exception as e:
-                    logger.error(f"{self.__str__()} run scheduler for update 
setting exception: {e}")
+                    logger.error(
+                        f"{self.__str__()} scheduler set setting raise 
exception: {e}"
+                    )
             else:
                 break
         logger.info(f"{self.__str__()} stop scheduler for update setting 
success.")
@@ -230,15 +249,21 @@ class Client:
         while True:
             if self.__client_thread_task_enabled is True:
                 self.__clear_idle_rpc_channels_threading_event.wait(60)
-                logger.debug(f"{self.__str__()} run scheduler for clear idle 
rpc channels.")
+                logger.debug(
+                    f"{self.__str__()} run scheduler for clear idle rpc 
channels."
+                )
                 try:
                     if self.__client_thread_task_enabled is True:
                         self.__rpc_client.clear_idle_rpc_channels()
                 except Exception as e:
-                    logger.error(f"{self.__str__()} run scheduler for clear 
idle rpc channels: {e}")
+                    logger.error(
+                        f"{self.__str__()} run scheduler for clear idle rpc 
channels: {e}"
+                    )
             else:
                 break
-        logger.info(f"{self.__str__()} stop scheduler for clear idle rpc 
channels success.")
+        logger.info(
+            f"{self.__str__()} stop scheduler for clear idle rpc channels 
success."
+        )
 
     """ callback handler for async method """
 
@@ -247,11 +272,15 @@ class Client:
         # this handler switches user's callback thread from RpcClient's 
_io_loop_thread to client's callback_handler_thread
         try:
             for i in range(Client.CALLBACK_THREADS_NUM):
-                th = threading.Thread(name=f"callback_handler_thread-{i}", 
target=self.__handle_callback)
+                th = threading.Thread(
+                    name=f"callback_handler_thread-{i}", 
target=self.__handle_callback
+                )
                 th.daemon = True
                 self.__callback_threads.append(th)
                 th.start()
-                logger.info(f"{self.__str__()} start async rpc callback 
thread:{th} success.")
+                logger.info(
+                    f"{self.__str__()} start async rpc callback thread:{th} 
success."
+                )
         except Exception as e:
             print(f"{self.__str__()} start async rpc callback raise exception: 
{e}")
             raise e
@@ -260,7 +289,10 @@ class Client:
         while True:
             if self.__client_thread_task_enabled is True:
                 callback_result = self.__callback_result_queue.get()
-                if callback_result.result_type == 
CallbackResultType.END_CALLBACK_THREAD_RESULT:
+                if (
+                    callback_result.result_type
+                    == CallbackResultType.END_CALLBACK_THREAD_RESULT
+                ):
                     # end infinite loop when client shutdown
                     self.__callback_result_queue.task_done()
                     break
@@ -272,7 +304,9 @@ class Client:
                     self.__callback_result_queue.task_done()
             else:
                 break
-        logger.info(f"{self.__str__()} stop client callback result handler 
thread:{threading.current_thread()} success.")
+        logger.info(
+            f"{self.__str__()} stop client callback result handler 
thread:{threading.current_thread()} success."
+        )
 
     """ protect """
 
@@ -282,10 +316,12 @@ class Client:
             return route
         else:
             route = self.__update_topic_route(topic)
-            logger.info(f"{self.__str__()} update topic:{topic} route 
success.")
             if route is not None:
+                logger.info(f"{self.__str__()} update topic:{topic} route 
success.")
                 self.__topics.add(topic)
-            return route
+                return route
+            else:
+                raise Exception(f"failed to fetch topic:{topic} route.")
 
     def _remove_unused_topic_route_data(self, topic):
         self.__topic_route_cache.remove(topic)
@@ -306,31 +342,41 @@ class Client:
     # topic route #
 
     def __update_topic_route(self, topic):
-        try:
-            future = 
self.__rpc_client.query_topic_route_async(self.__client_configuration.rpc_endpoints,
-                                                               
self.__topic_route_req(topic), metadata=self._sign(),
-                                                               
timeout=self.__client_configuration.request_timeout)
-            res = future.result()
-            route = self.__handle_topic_route_res(res, topic)
-            return route
-        except Exception as e:
-            logger.error(f"update topic route error, topic:{topic}, {e}")
-            raise e
+        event = threading.Event()
+        callback = functools.partial(
+            self.__query_topic_route_async_callback, topic=topic, event=event
+        )
+        future = self.__rpc_client.query_topic_route_async(
+            self.__client_configuration.rpc_endpoints,
+            self.__topic_route_req(topic),
+            metadata=self._sign(),
+            timeout=self.__client_configuration.request_timeout,
+        )
+        future.add_done_callback(callback)
+        event.wait()
+        return self.__topic_route_cache.get(topic)
 
     def __update_topic_route_async(self, topic):
-        callback = functools.partial(self.__query_topic_route_async_callback, 
topic=topic)
-        future = 
self.__rpc_client.query_topic_route_async(self.__client_configuration.rpc_endpoints,
-                                                           
self.__topic_route_req(topic),
-                                                           
metadata=self._sign(),
-                                                           
timeout=self.__client_configuration.request_timeout)
+        callback = functools.partial(
+            self.__query_topic_route_async_callback, topic=topic
+        )
+        future = self.__rpc_client.query_topic_route_async(
+            self.__client_configuration.rpc_endpoints,
+            self.__topic_route_req(topic),
+            metadata=self._sign(),
+            timeout=self.__client_configuration.request_timeout,
+        )
         future.add_done_callback(callback)
 
-    def __query_topic_route_async_callback(self, future, topic):
+    def __query_topic_route_async_callback(self, future, topic, event=None):
         try:
             res = future.result()
             self.__handle_topic_route_res(res, topic)
         except Exception as e:
             raise e
+        finally:
+            if event is not None:
+                event.set()
 
     def __topic_route_req(self, topic):
         req = QueryRouteRequest()
@@ -344,7 +390,9 @@ class Client:
             MessagingResultChecker.check(res.status)
             if res.status.code == Code.OK:
                 topic_route = TopicRouteData(res.message_queues)
-                logger.debug(f"{self.__str__()} update topic:{topic} route, 
route info: {topic_route.__str__()}")
+                logger.info(
+                    f"{self.__str__()} update topic:{topic} route, route info: 
{topic_route.__str__()}"
+                )
                 # if topic route has new endpoint, connect
                 self.__check_topic_route_endpoints_changed(topic, topic_route)
                 self.__topic_route_cache.put(topic, topic_route)
@@ -359,27 +407,50 @@ class Client:
     def __heartbeat_async(self, endpoints):
         req = self._heartbeat_req()
         callback = functools.partial(self.__heartbeat_callback, 
endpoints=endpoints)
-        future = self.__rpc_client.heartbeat_async(endpoints, req, 
metadata=self._sign(),
-                                                   
timeout=self.__client_configuration.request_timeout)
+        future = self.__rpc_client.heartbeat_async(
+            endpoints,
+            req,
+            metadata=self._sign(),
+            timeout=self.__client_configuration.request_timeout,
+        )
         future.add_done_callback(callback)
 
     def __heartbeat_callback(self, future, endpoints):
         try:
             res = future.result()
             if res is not None and res.status.code == Code.OK:
-                logger.info(f"{self.__str__()} send heartbeat to 
{endpoints.__str__()} success.")
+                logger.info(
+                    f"{self.__str__()} send heartbeat to {endpoints.__str__()} 
success."
+                )
             else:
                 if res is not None:
                     logger.error(
-                        f"{self.__str__()} send heartbeat to 
{endpoints.__str__()} error, code:{res.status.code}, 
message:{res.status.message}.")
+                        f"{self.__str__()} send heartbeat to 
{endpoints.__str__()} error, code:{res.status.code}, 
message:{res.status.message}."
+                    )
                 else:
-                    logger.error(f"{self.__str__()} send heartbeat to 
{endpoints.__str__()} error, response is none.")
+                    logger.error(
+                        f"{self.__str__()} send heartbeat to 
{endpoints.__str__()} error, response is none."
+                    )
         except Exception as e:
-            logger.error(f"{self.__str__()} send heartbeat to 
{endpoints.__str__()} exception, e: {e}")
+            logger.error(
+                f"{self.__str__()} send heartbeat to {endpoints.__str__()} 
exception, e: {e}"
+            )
             raise e
 
     # sync settings #
 
+    def __retrieve_telemetry_stream_stream_call(self, endpoints, 
rebuild=False):
+        try:
+            self.__rpc_client.telemetry_stream(
+                endpoints, self, self._sign(), rebuild, timeout=60 * 60 * 24 * 
365
+            )
+        except Exception as e:
+            logger.error(
+                f"{self.__str__()} rebuild stream_steam_call to 
{endpoints.__str__()} exception: {e}"
+                if rebuild
+                else f"{self.__str__()} create stream_steam_call to 
{endpoints.__str__()} exception: {e}"
+            )
+
     def __setting_write(self, endpoints):
         req = self._sync_setting_req(endpoints)
         callback = functools.partial(self.__setting_write_callback, 
endpoints=endpoints)
@@ -387,26 +458,26 @@ class Client:
         logger.debug(f"{self.__str__()} send setting to {endpoints.__str__()}, 
{req}")
         future.add_done_callback(callback)
 
-    def __retrieve_telemetry_stream_stream_call(self, endpoints, 
rebuild=False):
-        try:
-            self.__rpc_client.telemetry_stream(endpoints, self, 
metadata=self._sign(), timeout=60 * 60 * 24 * 365,
-                                               rebuild=rebuild)
-        except Exception as e:
-            logger.error(
-                f"{self.__str__()} rebuild stream_steam_call to 
{endpoints.__str__()} exception: {e}" if rebuild else f"{self.__str__()} create 
stream_steam_call to {endpoints.__str__()} exception: {e}")
-
     def __setting_write_callback(self, future, endpoints):
         try:
             future.result()
-            logger.debug(f"{self.__str__()} send setting to 
{endpoints.__str__()} success.")
+            logger.info(
+                f"{self.__str__()} send setting to {endpoints.__str__()} 
success."
+            )
         except InvalidStateError as e:
-            logger.warn(f"{self.__str__()} send setting to 
{endpoints.__str__()} occurred InvalidStateError: {e}")
+            logger.warn(
+                f"{self.__str__()} send setting to {endpoints.__str__()} 
occurred InvalidStateError: {e}"
+            )
             self.__retrieve_telemetry_stream_stream_call(endpoints, 
rebuild=True)
         except AioRpcError as e:
-            logger.warn(f"{self.__str__()} send setting to 
{endpoints.__str__()} occurred AioRpcError: {e}")
+            logger.warn(
+                f"{self.__str__()} send setting to {endpoints.__str__()} 
occurred AioRpcError: {e}"
+            )
             self.__retrieve_telemetry_stream_stream_call(endpoints, 
rebuild=True)
         except Exception as e:
-            logger.error(f"{self.__str__()} send setting to 
{endpoints.__str__()} exception: {e}")
+            logger.error(
+                f"{self.__str__()} send setting to {endpoints.__str__()} 
exception: {e}"
+            )
             self.__retrieve_telemetry_stream_stream_call(endpoints, 
rebuild=True)
 
     # metrics #
@@ -418,8 +489,12 @@ class Client:
 
     def __client_termination(self, endpoints):
         req = self._notify_client_termination_req()
-        future = self.__rpc_client.notify_client_termination(endpoints, req, 
metadata=self._sign(),
-                                                             
timeout=self.__client_configuration.request_timeout)
+        future = self.__rpc_client.notify_client_termination(
+            endpoints,
+            req,
+            metadata=self._sign(),
+            timeout=self.__client_configuration.request_timeout,
+        )
         future.result()
 
     # others ##
@@ -435,11 +510,15 @@ class Client:
         old_route = self.__topic_route_cache.get(topic)
         if old_route is None or old_route != route:
             logger.info(
-                f"topic:{topic} route changed for {self.__str__()}. old route 
is {old_route}, new route is {route}")
+                f"topic:{topic} route changed for {self.__str__()}. old route 
is {old_route}, new route is {route}"
+            )
         all_endpoints = self.__get_all_endpoints()  # the existing endpoints
-        topic_route_endpoints = route.all_endpoints()  # the latest endpoints 
for topic route
+        topic_route_endpoints = (
+            route.all_endpoints()
+        )  # the latest endpoints for topic route
         diff = set(topic_route_endpoints.keys()).difference(
-            set(all_endpoints.keys()))  # the diff between existing and latest
+            set(all_endpoints.keys())
+        )  # the diff between existing and latest
         # create grpc channel, stream_stream_call for new endpoints, send 
setting to new endpoints
         for address in diff:
             endpoints = topic_route_endpoints[address]
@@ -477,7 +556,9 @@ class Client:
                 self.__clear_idle_rpc_channels_scheduler.join()
 
         for i in range(Client.CALLBACK_THREADS_NUM):
-            
self._set_future_callback_result(CallbackResult.end_callback_thread_result())
+            self._set_future_callback_result(
+                CallbackResult.end_callback_thread_result()
+            )
 
         for i in range(Client.CALLBACK_THREADS_NUM):
             self.__callback_threads[i].join()
diff --git a/python/rocketmq/v5/client/client_configuration.py 
b/python/rocketmq/v5/client/client_configuration.py
index 7c9c67f9..922fe6be 100644
--- a/python/rocketmq/v5/client/client_configuration.py
+++ b/python/rocketmq/v5/client/client_configuration.py
@@ -37,8 +37,12 @@ class Credentials:
 
 class ClientConfiguration:
 
-    def __init__(self, endpoints: str, credentials: Credentials, namespace="", 
request_timeout=3):
-        self.__rpc_endpoints = 
RpcEndpoints(ClientConfiguration.__parse_endpoints(endpoints))
+    def __init__(
+        self, endpoints: str, credentials: Credentials, namespace="", 
request_timeout=3
+    ):
+        self.__rpc_endpoints = RpcEndpoints(
+            ClientConfiguration.__parse_endpoints(endpoints)
+        )
         self.__credentials = credentials
         self.__request_timeout = request_timeout  # seconds
         self.__namespace = namespace
@@ -52,7 +56,10 @@ class ClientConfiguration:
                 endpoints = Endpoints()
                 addresses = endpoints_str.split(";")
                 endpoints.scheme = 
ClientConfiguration.__parse_endpoints_scheme_type(
-                    
ClientConfiguration.__parse_endpoints_prefix(addresses[0].split(":")[0]))
+                    ClientConfiguration.__parse_endpoints_prefix(
+                        addresses[0].split(":")[0]
+                    )
+                )
                 for address in addresses:
                     if len(address) == 0:
                         continue
@@ -62,7 +69,9 @@ class ClientConfiguration:
                     ad.port = int(address.split(":")[1])
                 return endpoints
             except Exception as e:
-                logger.error(f"client configuration parse {endpoints_str} 
exception: {e}")
+                logger.error(
+                    f"client configuration parse {endpoints_str} exception: 
{e}"
+                )
                 return None
 
     @staticmethod
diff --git a/python/rocketmq/v5/client/connection/rpc_channel.py 
b/python/rocketmq/v5/client/connection/rpc_channel.py
index d79f5478..246e102b 100644
--- a/python/rocketmq/v5/client/connection/rpc_channel.py
+++ b/python/rocketmq/v5/client/connection/rpc_channel.py
@@ -60,9 +60,13 @@ class RpcEndpoints:
     def __init__(self, endpoints: Endpoints):
         self.__endpoints = endpoints
         self.__scheme = endpoints.scheme
-        self.__addresses = set(map(lambda address: RpcAddress(address), 
endpoints.addresses))
+        self.__addresses = set(
+            map(lambda address: RpcAddress(address), endpoints.addresses)
+        )
         if self.__scheme == AddressScheme.DOMAIN_NAME and 
len(self.__addresses) > 1:
-            raise UnsupportedException("Multiple addresses not allowed in 
domain schema")
+            raise UnsupportedException(
+                "Multiple addresses not allowed in domain schema"
+            )
         self.__facade, self.__endpoint_desc = self.__facade()
 
     def __hash__(self) -> int:
@@ -79,8 +83,11 @@ class RpcEndpoints:
     """ private """
 
     def __facade(self):
-        if self.__scheme is None or len(
-                self.__addresses) == 0 or self.__scheme == 
AddressScheme.ADDRESS_SCHEME_UNSPECIFIED:
+        if (
+            self.__scheme is None
+            or len(self.__addresses) == 0
+            or self.__scheme == AddressScheme.ADDRESS_SCHEME_UNSPECIFIED
+        ):
             return ""
 
         prefix = "dns:"
@@ -94,7 +101,7 @@ class RpcEndpoints:
         ret = ""
         for address in sorted_list:
             ret = ret + address.__str__() + ","
-        return prefix + ret[0: len(ret) - 1], ret[0: len(ret) - 1]
+        return prefix + ret[0:len(ret) - 1], ret[0:len(ret) - 1]
 
     """ property """
 
@@ -123,22 +130,33 @@ class RpcStreamStreamCall:
                     if res.HasField("settings"):
                         # read a response for send setting result
                         if res is not None and res.status.code == Code.OK:
-                            logger.debug(f"async setting success. response 
status code: {res.status.code}")
-                            if res.settings is not None and 
res.settings.metric is not None:
+                            logger.debug(
+                                f"{ self.__handler.__str__()} sync setting 
success. response status code: {res.status.code}"
+                            )
+                            if (
+                                res.settings is not None
+                                and res.settings.metric is not None
+                            ):
                                 # reset metrics if needed
                                 
self.__handler.reset_metric(res.settings.metric)
                     elif res.HasField("recover_orphaned_transaction_command"):
                         # sever check for a transaction message
                         if self.__handler is not None:
-                            transaction_id = 
res.recover_orphaned_transaction_command.transaction_id
+                            transaction_id = (
+                                
res.recover_orphaned_transaction_command.transaction_id
+                            )
                             message = 
res.recover_orphaned_transaction_command.message
-                            await 
self.__handler.on_recover_orphaned_transaction_command(self.__endpoints, 
message,
-                                                                               
          transaction_id)
+                            await 
self.__handler.on_recover_orphaned_transaction_command(
+                                self.__endpoints, message, transaction_id
+                            )
             except AioRpcError as e:
                 logger.warn(
-                    f"stream read from endpoints {self.__endpoints.__str__()} 
occurred AioRpcError. code: {e.code()}, message: {e.details()}")
+                    f"{ self.__handler.__str__()} read stream from endpoints 
{self.__endpoints.__str__()} occurred AioRpcError. code: {e.code()}, message: 
{e.details()}"
+                )
             except Exception as e:
-                logger.error(f"stream read from endpoints 
{self.__endpoints.__str__()} exception, {e}")
+                logger.error(
+                    f"{ self.__handler.__str__()} read stream from endpoints 
{self.__endpoints.__str__()} exception, {e}"
+                )
 
     async def stream_write(self, req):
         if self.__stream_stream_call is not None:
@@ -164,6 +182,7 @@ class RpcChannel:
 
     def create_channel(self, loop):
         # create grpc channel with the given loop
+        # assert loop == RpcClient._io_loop
         asyncio.set_event_loop(loop)
         self.__create_aio_channel()
 
@@ -173,7 +192,9 @@ class RpcChannel:
             if self.__telemetry_stream_stream_call is not None:
                 self.__telemetry_stream_stream_call.close()
                 self.__telemetry_stream_stream_call = None
-                logger.info(f"channel[{self.__endpoints.__str__()}] close 
stream_stream_call success.")
+                logger.info(
+                    f"channel[{self.__endpoints.__str__()}] close 
stream_stream_call success."
+                )
             if self.channel_state() is not ChannelConnectivity.SHUTDOWN:
                 # close grpc channel
                 asyncio.run_coroutine_threadsafe(self.__async_channel.close(), 
loop)
@@ -189,29 +210,43 @@ class RpcChannel:
     def register_telemetry_stream_stream_call(self, stream_stream_call, 
handler):
         if self.__telemetry_stream_stream_call is not None:
             self.__telemetry_stream_stream_call.close()
-        self.__telemetry_stream_stream_call = 
RpcStreamStreamCall(self.__endpoints, stream_stream_call, handler)
+        self.__telemetry_stream_stream_call = RpcStreamStreamCall(
+            self.__endpoints, stream_stream_call, handler
+        )
 
     """ private """
 
     def __create_aio_channel(self):
         try:
             if self.__endpoints is None:
-                raise IllegalArgumentException("create_aio_channel exception, 
endpoints is None")
+                raise IllegalArgumentException(
+                    "create_aio_channel exception, endpoints is None"
+                )
             else:
-                options = [('grpc.enable_retries', 0), 
("grpc.max_send_message_length", -1),
-                           ("grpc.max_receive_message_length", -1)]
+                options = [
+                    ("grpc.enable_retries", 0),
+                    ("grpc.max_send_message_length", -1),
+                    ("grpc.max_receive_message_length", -1),
+                ]
                 if self.__tls_enabled:
-                    self.__async_channel = 
aio.secure_channel(self.__endpoints.facade, grpc.ssl_channel_credentials(),
-                                                              options)
+                    self.__async_channel = aio.secure_channel(
+                        self.__endpoints.facade, 
grpc.ssl_channel_credentials(), options
+                    )
                 else:
-                    self.__async_channel = 
aio.insecure_channel(self.__endpoints.facade, options)
+                    self.__async_channel = aio.insecure_channel(
+                        self.__endpoints.facade, options
+                    )
                 self.__async_stub = MessagingServiceStub(self.__async_channel)
                 logger.debug(
-                    f"create_aio_channel to [{self.__endpoints.__str__()}] 
success. channel state:{self.__async_channel.get_state()}")
+                    f"create_aio_channel to [{self.__endpoints.__str__()}] 
success. channel state:{self.__async_channel.get_state()}"
+                )
         except Exception as e:
-            logger.error(f"create_aio_channel to 
[{self.__endpoints.__str__()}] exception: {e}")
+            logger.error(
+                f"create_aio_channel to [{self.__endpoints.__str__()}] 
exception: {e}"
+            )
             raise e
 
+    #
     """ property """
 
     @property
diff --git a/python/rocketmq/v5/client/connection/rpc_client.py 
b/python/rocketmq/v5/client/connection/rpc_client.py
index 3bcd3e16..77e6473c 100644
--- a/python/rocketmq/v5/client/connection/rpc_client.py
+++ b/python/rocketmq/v5/client/connection/rpc_client.py
@@ -42,8 +42,11 @@ class RpcClient:
             # start an event loop for async io
             if RpcClient._io_loop is None:
                 initialized_event = threading.Event()
-                RpcClient._io_loop_thread = 
threading.Thread(target=RpcClient.__init_io_loop, args=(initialized_event,),
-                                                             
name="channel_io_loop_thread")
+                RpcClient._io_loop_thread = threading.Thread(
+                    target=RpcClient.__init_io_loop,
+                    args=(initialized_event,),
+                    name="channel_io_loop_thread",
+                )
                 RpcClient._io_loop_thread.daemon = True
                 RpcClient._io_loop_thread.start()
                 # waiting for thread start success
@@ -57,18 +60,17 @@ class RpcClient:
     def retrieve_or_create_channel(self, endpoints: RpcEndpoints):
         if self.__enable_retrieve_channel is False:
             raise Exception("RpcClient is not running.")
-
         try:
             # get or create a new grpc channel
-            with RpcClient._channel_lock:
-                channel = self.__get_channel(endpoints)
-                if channel is not None:
-                    channel.update_time = int(time.time())
-                else:
+            channel = self.__get_channel(endpoints)
+            if channel is not None:
+                channel.update_time = int(time.time())
+            else:
+                with RpcClient._channel_lock:
                     channel = RpcChannel(endpoints, self.__tls_enable)
                     channel.create_channel(RpcClient.get_channel_io_loop())
                     self.__put_channel(endpoints, channel)
-                return channel
+            return channel
         except Exception as e:
             logger.error(f"retrieve or create channel exception: {e}")
             raise e
@@ -80,11 +82,12 @@ class RpcClient:
         for endpoints, channel in items:
             if now - channel.update_time > 
RpcClient.RPC_CLIENT_MAX_IDLE_SECONDS:
                 idle_endpoints.append(endpoints)
-        with RpcClient._channel_lock:
-            for endpoints in idle_endpoints:
-                logger.info(f"remove idle channel {endpoints.__str__()}")
-                self.__close_rpc_channel(endpoints)
-                self.channels.remove(endpoints)
+        if len(idle_endpoints) > 0:
+            with RpcClient._channel_lock:
+                for endpoints in idle_endpoints:
+                    logger.info(f"remove idle channel {endpoints.__str__()}")
+                    self.__close_rpc_channel(endpoints)
+                    self.channels.remove(endpoints)
 
     def stop(self):
         with RpcClient._channel_lock:
@@ -99,103 +102,190 @@ class RpcClient:
 
     """ grpc MessageService """
 
-    def query_topic_route_async(self, endpoints: RpcEndpoints, req: 
QueryRouteRequest, metadata, timeout=3):
+    def query_topic_route_async(
+        self, endpoints: RpcEndpoints, req: QueryRouteRequest, metadata, 
timeout=3
+    ):
         return RpcClient.__run_message_service_async(
-            self.__query_route_async_0(endpoints, req, metadata=metadata, 
timeout=timeout))
-
-    def send_message_async(self, endpoints: RpcEndpoints, req: 
SendMessageRequest, metadata, timeout=3):
+            self.__query_route_async_0(
+                endpoints, req, metadata=metadata, timeout=timeout
+            )
+        )
+
+    def send_message_async(
+        self, endpoints: RpcEndpoints, req: SendMessageRequest, metadata, 
timeout=3
+    ):
         return RpcClient.__run_message_service_async(
-            self.__send_message_0(endpoints, req, metadata=metadata, 
timeout=timeout))
+            self.__send_message_0(endpoints, req, metadata=metadata, 
timeout=timeout)
+        )
 
-    def receive_message_async(self, endpoints: RpcEndpoints, req: 
ReceiveMessageRequest, metadata, timeout=3):
+    def receive_message_async(
+        self, endpoints: RpcEndpoints, req: ReceiveMessageRequest, metadata, 
timeout=3
+    ):
         return RpcClient.__run_message_service_async(
-            self.__receive_message_0(endpoints, req, metadata=metadata, 
timeout=timeout))
+            self.__receive_message_0(endpoints, req, metadata=metadata, 
timeout=timeout)
+        )
 
-    def ack_message_async(self, endpoints: RpcEndpoints, req: 
AckMessageRequest, metadata, timeout=3):
+    def ack_message_async(
+        self, endpoints: RpcEndpoints, req: AckMessageRequest, metadata, 
timeout=3
+    ):
         return RpcClient.__run_message_service_async(
-            self.__ack_message_0(endpoints, req, metadata=metadata, 
timeout=timeout))
-
-    def change_invisible_duration_async(self, endpoints: RpcEndpoints, req: 
ChangeInvisibleDurationRequest, metadata,
-                                        timeout=3):
+            self.__ack_message_0(endpoints, req, metadata=metadata, 
timeout=timeout)
+        )
+
+    def change_invisible_duration_async(
+        self,
+        endpoints: RpcEndpoints,
+        req: ChangeInvisibleDurationRequest,
+        metadata,
+        timeout=3,
+    ):
         return RpcClient.__run_message_service_async(
-            self.__change_invisible_duration_0(endpoints, req, 
metadata=metadata, timeout=timeout))
-
-    def heartbeat_async(self, endpoints: RpcEndpoints, req: HeartbeatRequest, 
metadata, timeout=3):
+            self.__change_invisible_duration_0(
+                endpoints, req, metadata=metadata, timeout=timeout
+            )
+        )
+
+    def heartbeat_async(
+        self, endpoints: RpcEndpoints, req: HeartbeatRequest, metadata, 
timeout=3
+    ):
         return RpcClient.__run_message_service_async(
-            self.__heartbeat_async_0(endpoints, req, metadata=metadata, 
timeout=timeout))
+            self.__heartbeat_async_0(endpoints, req, metadata=metadata, 
timeout=timeout)
+        )
 
     def telemetry_write_async(self, endpoints: RpcEndpoints, req: 
TelemetryCommand):
         return RpcClient.__run_message_service_async(
-            
self.retrieve_or_create_channel(endpoints).telemetry_stream_stream_call.stream_write(req))
-
-    def end_transaction_async(self, endpoints: RpcEndpoints, req: 
EndTransactionRequest, metadata, timeout=3):
+            self.retrieve_or_create_channel(
+                endpoints
+            ).telemetry_stream_stream_call.stream_write(req)
+        )
+
+    def end_transaction_async(
+        self, endpoints: RpcEndpoints, req: EndTransactionRequest, metadata, 
timeout=3
+    ):
         return RpcClient.__run_message_service_async(
-            self.__end_transaction_0(endpoints, req, metadata=metadata, 
timeout=timeout))
-
-    def notify_client_termination(self, endpoints: RpcEndpoints, req: 
NotifyClientTerminationRequest, metadata,
-                                  timeout=3):
+            self.__end_transaction_0(endpoints, req, metadata=metadata, 
timeout=timeout)
+        )
+
+    def notify_client_termination(
+        self,
+        endpoints: RpcEndpoints,
+        req: NotifyClientTerminationRequest,
+        metadata,
+        timeout=3,
+    ):
         return RpcClient.__run_message_service_async(
-            self.__notify_client_termination_0(endpoints, req, 
metadata=metadata, timeout=timeout))
-
-    def telemetry_stream(self, endpoints: RpcEndpoints, client, metadata, 
timeout=3000, rebuild=False):
+            self.__notify_client_termination_0(
+                endpoints, req, metadata=metadata, timeout=timeout
+            )
+        )
+
+    def end_transaction_for_server_check(
+        self, endpoints: RpcEndpoints, req: EndTransactionRequest, metadata, 
timeout=3
+    ):
+        # assert asyncio.get_running_loop() == RpcClient._io_loop
         try:
-            channel = self.retrieve_or_create_channel(endpoints)
-            if channel.telemetry_stream_stream_call is None or rebuild is True:
-                stream = channel.async_stub.Telemetry(metadata=metadata, 
timeout=timeout)
-                channel.register_telemetry_stream_stream_call(stream, client)
-                
asyncio.run_coroutine_threadsafe(channel.telemetry_stream_stream_call.start_stream_read(),
-                                                 
RpcClient.get_channel_io_loop())
-                logger.info(
-                    f"{client.__str__()} rebuild stream_steam_call to 
{endpoints.__str__()} success." if rebuild else f"{client.__str__()} create 
stream_steam_call to {endpoints.__str__()} success.")
+            return self.__end_transaction_0(
+                endpoints, req, metadata=metadata, timeout=timeout
+            )
         except Exception as e:
+            logger.error(
+                f"end transaction exception, topic:{req.topic.name}, 
message_id:{req.message_id}, transaction_id:{req.transaction_id}: {e}"
+            )
             raise e
 
-    def end_transaction_for_server_check(self, endpoints: RpcEndpoints, req: 
EndTransactionRequest, metadata,
-                                         timeout=3):
+    """ build stream_stream_call """
+
+    def telemetry_stream(
+        self, endpoints: RpcEndpoints, client, metadata, rebuild, timeout=3000
+    ):
+        # assert asyncio.get_running_loop() == RpcClient._io_loop
         try:
-            return self.__end_transaction_0(endpoints, req, metadata=metadata, 
timeout=timeout)
+            channel = self.retrieve_or_create_channel(endpoints)
+            stream = channel.async_stub.Telemetry(
+                metadata=metadata, timeout=timeout, wait_for_ready=True
+            )
+            channel.register_telemetry_stream_stream_call(stream, client)
+            asyncio.run_coroutine_threadsafe(
+                channel.telemetry_stream_stream_call.start_stream_read(),
+                RpcClient.get_channel_io_loop(),
+            )
+            logger.info(
+                f"{client.__str__()} rebuild stream_steam_call to 
{endpoints.__str__()}."
+                if rebuild
+                else f"{client.__str__()} create stream_steam_call to 
{endpoints.__str__()}."
+            )
+            return channel
         except Exception as e:
-            logger.error(
-                f"end transaction exception, topic:{req.topic.name}, 
message_id:{req.message_id}, transaction_id:{req.transaction_id}: {e}")
             raise e
 
     """ MessageService.stub impl """
 
-    async def __query_route_async_0(self, endpoints: RpcEndpoints, req: 
QueryRouteRequest, metadata, timeout=3):
-        return await 
self.retrieve_or_create_channel(endpoints).async_stub.QueryRoute(req, 
metadata=metadata,
-                                                                               
       timeout=timeout)
-
-    async def __send_message_0(self, endpoints: RpcEndpoints, req: 
SendMessageRequest, metadata, timeout=3):
-        return await 
self.retrieve_or_create_channel(endpoints).async_stub.SendMessage(req, 
metadata=metadata,
-                                                                               
        timeout=timeout)
-
-    async def __receive_message_0(self, endpoints: RpcEndpoints, req: 
ReceiveMessageRequest, metadata, timeout=3):
-        return 
self.retrieve_or_create_channel(endpoints).async_stub.ReceiveMessage(req, 
metadata=metadata,
-                                                                               
     timeout=timeout)
-
-    async def __ack_message_0(self, endpoints: RpcEndpoints, req: 
AckMessageRequest, metadata, timeout=3):
-        return await 
self.retrieve_or_create_channel(endpoints).async_stub.AckMessage(req, 
metadata=metadata,
-                                                                               
       timeout=timeout)
-
-    async def __heartbeat_async_0(self, endpoints: RpcEndpoints, req: 
HeartbeatRequest, metadata, timeout=3):
-        return await 
self.retrieve_or_create_channel(endpoints).async_stub.Heartbeat(req, 
metadata=metadata,
-                                                                               
      timeout=timeout)
-
-    async def __change_invisible_duration_0(self, endpoints: RpcEndpoints, 
req: ChangeInvisibleDurationRequest,
-                                            metadata, timeout=3):
-        return await 
self.retrieve_or_create_channel(endpoints).async_stub.ChangeInvisibleDuration(req,
-                                                                               
                    metadata=metadata,
-                                                                               
                    timeout=timeout)
-
-    async def __end_transaction_0(self, endpoints: RpcEndpoints, req: 
EndTransactionRequest, metadata, timeout=3):
-        return await 
self.retrieve_or_create_channel(endpoints).async_stub.EndTransaction(req, 
metadata=metadata,
-                                                                               
           timeout=timeout)
-
-    async def __notify_client_termination_0(self, endpoints: RpcEndpoints, 
req: NotifyClientTerminationRequest,
-                                            metadata, timeout=3):
-        return await 
self.retrieve_or_create_channel(endpoints).async_stub.NotifyClientTermination(req,
-                                                                               
                    metadata=metadata,
-                                                                               
                    timeout=timeout)
+    async def __query_route_async_0(
+        self, endpoints: RpcEndpoints, req: QueryRouteRequest, metadata, 
timeout=3
+    ):
+        return await 
self.retrieve_or_create_channel(endpoints).async_stub.QueryRoute(
+            req, metadata=metadata, timeout=timeout
+        )
+
+    async def __send_message_0(
+        self, endpoints: RpcEndpoints, req: SendMessageRequest, metadata, 
timeout=3
+    ):
+        return await 
self.retrieve_or_create_channel(endpoints).async_stub.SendMessage(
+            req, metadata=metadata, timeout=timeout
+        )
+
+    async def __receive_message_0(
+        self, endpoints: RpcEndpoints, req: ReceiveMessageRequest, metadata, 
timeout=3
+    ):
+        return 
self.retrieve_or_create_channel(endpoints).async_stub.ReceiveMessage(
+            req, metadata=metadata, timeout=timeout
+        )
+
+    async def __ack_message_0(
+        self, endpoints: RpcEndpoints, req: AckMessageRequest, metadata, 
timeout=3
+    ):
+        return await 
self.retrieve_or_create_channel(endpoints).async_stub.AckMessage(
+            req, metadata=metadata, timeout=timeout
+        )
+
+    async def __heartbeat_async_0(
+        self, endpoints: RpcEndpoints, req: HeartbeatRequest, metadata, 
timeout=3
+    ):
+        return await 
self.retrieve_or_create_channel(endpoints).async_stub.Heartbeat(
+            req, metadata=metadata, timeout=timeout
+        )
+
+    async def __change_invisible_duration_0(
+        self,
+        endpoints: RpcEndpoints,
+        req: ChangeInvisibleDurationRequest,
+        metadata,
+        timeout=3,
+    ):
+        return await self.retrieve_or_create_channel(
+            endpoints
+        ).async_stub.ChangeInvisibleDuration(req, metadata=metadata, 
timeout=timeout)
+
+    async def __end_transaction_0(
+        self, endpoints: RpcEndpoints, req: EndTransactionRequest, metadata, 
timeout=3
+    ):
+        return await self.retrieve_or_create_channel(
+            endpoints
+        ).async_stub.EndTransaction(req, metadata=metadata, timeout=timeout)
+
+    async def __notify_client_termination_0(
+        self,
+        endpoints: RpcEndpoints,
+        req: NotifyClientTerminationRequest,
+        metadata,
+        timeout=3,
+    ):
+        return await self.retrieve_or_create_channel(
+            endpoints
+        ).async_stub.NotifyClientTermination(req, metadata=metadata, 
timeout=timeout)
+
+    async def __create_channel_async(self, endpoints: RpcEndpoints):
+        return self.retrieve_or_create_channel(endpoints)
 
     """ private """
 
@@ -207,7 +297,10 @@ class RpcClient:
 
     def __close_rpc_channel(self, endpoints: RpcEndpoints):
         channel = self.__get_channel(endpoints)
-        if channel is not None and channel.channel_state() is not 
ChannelConnectivity.SHUTDOWN:
+        if (
+            channel is not None
+            and channel.channel_state() is not ChannelConnectivity.SHUTDOWN
+        ):
             try:
                 channel.close_channel(RpcClient.get_channel_io_loop())
                 self.channels.remove(endpoints)
@@ -234,7 +327,9 @@ class RpcClient:
     def __run_message_service_async(func):
         try:
             # execute grpc call in RpcClient._io_loop
-            return asyncio.run_coroutine_threadsafe(func, 
RpcClient.get_channel_io_loop())
+            return asyncio.run_coroutine_threadsafe(
+                func, RpcClient.get_channel_io_loop()
+            )
         except Exception as e:
             future = Future()
             future.set_exception(e)
diff --git a/python/rocketmq/v5/client/metrics/client_metrics.py 
b/python/rocketmq/v5/client/metrics/client_metrics.py
index 7aa94433..210631ee 100644
--- a/python/rocketmq/v5/client/metrics/client_metrics.py
+++ b/python/rocketmq/v5/client/metrics/client_metrics.py
@@ -76,16 +76,21 @@ class ClientMetrics:
 
     def send_after(self, send_context: MetricContext, success: bool):
         if send_context is None:
-            logger.warn("metrics do send after exception. send_context must 
not be none.")
+            logger.warn(
+                "metrics do send after exception. send_context must not be 
none."
+            )
             return
 
         if send_context.metric_type != MessageMetricType.SEND:
             logger.warn(
-                f"metric type must be MessageMetricType.SEND. current 
send_context type is {send_context.metric_type}")
+                f"metric type must be MessageMetricType.SEND. current 
send_context type is {send_context.metric_type}"
+            )
             return
 
         if send_context.get_attr("send_stopwatch") is None:
-            logger.warn("metrics do send after exception. send_stopwatch must 
not be none.")
+            logger.warn(
+                "metrics do send after exception. send_stopwatch must not be 
none."
+            )
             return
 
         if send_context.get_attr("topic") is None:
@@ -108,7 +113,11 @@ class ClientMetrics:
         if metric.endpoints is None:
             return True
         # if metrics endpoints changed, return False
-        if self.__enabled and metric.on and self.__endpoints == 
RpcEndpoints(metric.endpoints):
+        if (
+            self.__enabled
+            and metric.on
+            and self.__endpoints == RpcEndpoints(metric.endpoints)
+        ):
             return True
         return not self.__enabled and not metric.on
 
@@ -122,36 +131,51 @@ class ClientMetrics:
 
     def __meter_provider_start(self):
         if self.__endpoints is None:
-            logger.warn(f"client:{self.__client_id} can't create meter 
provider, because endpoints is none.")
+            logger.warn(
+                f"client:{self.__client_id} can't create meter provider, 
because endpoints is none."
+            )
             return
 
         try:
             # setup OTLP exporter
-            exporter = OTLPMetricExporter(endpoint=self.__endpoints.__str__(), 
insecure=True,
-                                          
timeout=ClientMetrics.METRIC_EXPORTER_RPC_TIMEOUT)
+            exporter = OTLPMetricExporter(
+                endpoint=self.__endpoints.__str__(),
+                insecure=True,
+                timeout=ClientMetrics.METRIC_EXPORTER_RPC_TIMEOUT,
+            )
             # create a metric reader and set the export interval
-            reader = PeriodicExportingMetricReader(exporter,
-                                                   
export_interval_millis=ClientMetrics.METRIC_READER_INTERVAL)
+            reader = PeriodicExportingMetricReader(
+                exporter, 
export_interval_millis=ClientMetrics.METRIC_READER_INTERVAL
+            )
             # create an empty resource
             resource = Resource.get_empty()
             # create view
-            send_cost_time_view = View(instrument_type=Histogram,
-                                       
instrument_name=HistogramEnum.SEND_COST_TIME.histogram_name,
-                                       
aggregation=ExplicitBucketHistogramAggregation(
-                                           
HistogramEnum.SEND_COST_TIME.buckets))
+            send_cost_time_view = View(
+                instrument_type=Histogram,
+                instrument_name=HistogramEnum.SEND_COST_TIME.histogram_name,
+                aggregation=ExplicitBucketHistogramAggregation(
+                    HistogramEnum.SEND_COST_TIME.buckets
+                ),
+            )
             # create MeterProvider
-            self.__meter_provider = MeterProvider(metric_readers=[reader], 
resource=resource,
-                                                  views=[send_cost_time_view])
+            self.__meter_provider = MeterProvider(
+                metric_readers=[reader], resource=resource, 
views=[send_cost_time_view]
+            )
             # define the histogram instruments
             self.__send_success_cost_time_instrument = 
self.__meter_provider.get_meter(
-                
ClientMetrics.METRIC_INSTRUMENTATION_NAME).create_histogram(HistogramEnum.SEND_COST_TIME.histogram_name)
+                ClientMetrics.METRIC_INSTRUMENTATION_NAME
+            ).create_histogram(HistogramEnum.SEND_COST_TIME.histogram_name)
         except Exception as e:
-            logger.error(f"client:{self.__client_id} start meter provider 
exception: {e}")
+            logger.error(
+                f"client:{self.__client_id} start meter provider exception: 
{e}"
+            )
 
     def __record_send_success_cost_time(self, context, amount):
         if self.__enabled:
             try:
                 # record send message cost time and result
-                self.__send_success_cost_time_instrument.record(amount, 
context.attributes)
+                self.__send_success_cost_time_instrument.record(
+                    amount, context.attributes
+                )
             except Exception as e:
                 logger.error(f"record send message cost time exception, e:{e}")
diff --git a/python/rocketmq/v5/consumer/simple_consumer.py 
b/python/rocketmq/v5/consumer/simple_consumer.py
index 11a09d1b..6b1483db 100644
--- a/python/rocketmq/v5/consumer/simple_consumer.py
+++ b/python/rocketmq/v5/consumer/simple_consumer.py
@@ -36,18 +36,27 @@ from rocketmq.v5.util import (AtomicInteger, ConcurrentMap,
 
 class SimpleConsumer(Client):
 
-    def __init__(self, client_configuration: ClientConfiguration, 
consumer_group, subscription: dict = None,
-                 await_duration=20):
-        if consumer_group is None or consumer_group.strip() == '':
+    def __init__(
+        self,
+        client_configuration: ClientConfiguration,
+        consumer_group,
+        subscription: dict = None,
+        await_duration=20,
+    ):
+        if consumer_group is None or consumer_group.strip() == "":
             raise IllegalArgumentException("consumerGroup should not be null")
         if Misc.is_valid_consumer_group(consumer_group) is False:
             raise IllegalArgumentException(
-                f"consumerGroup does not match the regex 
[regex={Misc.CONSUMER_GROUP_PATTERN}]")
+                f"consumerGroup does not match the regex 
[regex={Misc.CONSUMER_GROUP_PATTERN}]"
+            )
         if await_duration is None:
             raise IllegalArgumentException("awaitDuration should not be null")
 
-        super().__init__(client_configuration, None if subscription is None 
else subscription.keys(),
-                         ClientType.SIMPLE_CONSUMER)
+        super().__init__(
+            client_configuration,
+            None if subscription is None else subscription.keys(),
+            ClientType.SIMPLE_CONSUMER,
+        )
         self.__consumer_group = consumer_group
         self.__await_duration = await_duration  # long polling timeout, seconds
         # <String /* topic */, FilterExpression>
@@ -64,19 +73,30 @@ class SimpleConsumer(Client):
 
     def subscribe(self, topic, filter_expression: FilterExpression = None):
         if self.is_running is False:
-            raise IllegalStateException("unable to add subscription because 
simple consumer is not running")
+            raise IllegalStateException(
+                "unable to add subscription because simple consumer is not 
running"
+            )
 
         try:
             if not self.__subscriptions.contains(topic):
                 self._retrieve_topic_route_data(topic)
-            self.__subscriptions.put(topic, filter_expression if 
filter_expression is not None else FilterExpression())
+            self.__subscriptions.put(
+                topic,
+                (
+                    filter_expression
+                    if filter_expression is not None
+                    else FilterExpression()
+                ),
+            )
         except Exception as e:
-            logger.error(f"subscribe exception: {e}")
+            logger.error(f"subscribe raise exception: {e}")
             raise e
 
     def unsubscribe(self, topic):
         if self.is_running is False:
-            raise IllegalStateException("unable to remove subscription because 
simple consumer is not running")
+            raise IllegalStateException(
+                "unable to remove subscription because simple consumer is not 
running"
+            )
 
         if topic in self.__subscriptions:
             self.__subscriptions.remove(topic)
@@ -84,43 +104,57 @@ class SimpleConsumer(Client):
 
     def receive(self, max_message_num, invisible_duration):
         if self.is_running is False:
-            raise IllegalStateException("unable to receive messages because 
simple consumer is not running")
+            raise IllegalStateException(
+                "unable to receive messages because simple consumer is not 
running"
+            )
 
         return self.__receive(max_message_num, invisible_duration)
 
     def receive_async(self, max_message_num, invisible_duration):
         if self.is_running is False:
-            raise IllegalStateException("unable to receive messages because 
simple consumer is not running")
+            raise IllegalStateException(
+                "unable to receive messages because simple consumer is not 
running"
+            )
 
         return self.__receive_async(max_message_num, invisible_duration)
 
     def ack(self, message: Message):
         if self.is_running is False:
-            raise IllegalStateException("unable to ack message because simple 
consumer is not running")
+            raise IllegalStateException(
+                "unable to ack message because simple consumer is not running"
+            )
 
         queue = self.__select_topic_queue(message.topic)
         return self.__ack(message, queue)
 
     def ack_async(self, message: Message):
         if self.is_running is False:
-            raise IllegalStateException("unable to ack message because simple 
consumer is not running")
+            raise IllegalStateException(
+                "unable to ack message because simple consumer is not running"
+            )
 
         queue = self.__select_topic_queue(message.topic)
         return self.__ack_async(message, queue)
 
     def change_invisible_duration(self, message: Message, invisible_duration):
         if self.is_running is False:
-            raise IllegalStateException("unable to change invisible duration 
because simple consumer is not running")
+            raise IllegalStateException(
+                "unable to change invisible duration because simple consumer 
is not running"
+            )
 
         queue = self.__select_topic_queue(message.topic)
         return self.__change_invisible_duration(message, queue, 
invisible_duration)
 
     def change_invisible_duration_async(self, message: Message, 
invisible_duration):
         if self.is_running is False:
-            raise IllegalStateException("unable to change invisible duration 
because simple consumer is not running")
+            raise IllegalStateException(
+                "unable to change invisible duration because simple consumer 
is not running"
+            )
 
         queue = self.__select_topic_queue(message.topic)
-        return self.__change_invisible_duration_async(message, queue, 
invisible_duration)
+        return self.__change_invisible_duration_async(
+            message, queue, invisible_duration
+        )
 
     """ override """
 
@@ -190,21 +224,25 @@ class SimpleConsumer(Client):
     def __select_topic_for_receive(self):
         try:
             # select the next topic for receive
-            mod_index = self.__topic_index.get_and_increment() % 
len(self.__subscriptions.keys())
+            mod_index = self.__topic_index.get_and_increment() % len(
+                self.__subscriptions.keys()
+            )
             return list(self.__subscriptions.keys())[mod_index]
         except Exception as e:
-            logger.error(f"simple consumer select topic for receive message 
exception: {e}")
+            logger.error(
+                f"simple consumer select topic for receive message exception: 
{e}"
+            )
             raise e
 
     def __select_topic_queue(self, topic):
         try:
             route = self._retrieve_topic_route_data(topic)
-            queue_selector = 
self.__receive_queue_selectors.put_if_absent(topic,
-                                                                          
QueueSelector.simple_consumer_queue_selector(
-                                                                              
route))
+            queue_selector = self.__receive_queue_selectors.put_if_absent(
+                topic, QueueSelector.simple_consumer_queue_selector(route)
+            )
             return queue_selector.select_next_queue()
         except Exception as e:
-            logger.error(f"simple consumer select topic queue for receive 
message exception: {e}")
+            logger.error(f"simple consumer select topic queue raise exception: 
{e}")
             raise e
 
     def __receive(self, max_message_num, invisible_duration):
@@ -213,9 +251,13 @@ class SimpleConsumer(Client):
         queue = self.__select_topic_queue(topic)
         req = self.__receive_req(topic, queue, max_message_num, 
invisible_duration)
         timeout = self.client_configuration.request_timeout + 
self.__await_duration
-        future = self.rpc_client.receive_message_async(queue.endpoints, req, 
metadata=self._sign(), timeout=timeout)
-        read_future = 
asyncio.run_coroutine_threadsafe(self.__receive_message_response(future.result()),
-                                                       
self._rpc_channel_io_loop())
+        future = self.rpc_client.receive_message_async(
+            queue.endpoints, req, metadata=self._sign(), timeout=timeout
+        )
+        read_future = asyncio.run_coroutine_threadsafe(
+            self.__receive_message_response(future.result()),
+            self._rpc_channel_io_loop(),
+        )
         return self.__handle_receive_message_response(read_future.result())
 
     def __receive_async(self, max_message_num, invisible_duration):
@@ -225,11 +267,17 @@ class SimpleConsumer(Client):
             queue = self.__select_topic_queue(topic)
             req = self.__receive_req(topic, queue, max_message_num, 
invisible_duration)
             timeout = self.client_configuration.request_timeout + 
self.__await_duration
-            future = self.rpc_client.receive_message_async(queue.endpoints, 
req, metadata=self._sign(), timeout=timeout)
-            read_future = 
asyncio.run_coroutine_threadsafe(self.__receive_message_response(future.result()),
-                                                           
self._rpc_channel_io_loop())
+            future = self.rpc_client.receive_message_async(
+                queue.endpoints, req, metadata=self._sign(), timeout=timeout
+            )
+            read_future = asyncio.run_coroutine_threadsafe(
+                self.__receive_message_response(future.result()),
+                self._rpc_channel_io_loop(),
+            )
             ret_future = Future()
-            handle_send_receipt_callback = 
functools.partial(self.__receive_message_callback, ret_future=ret_future)
+            handle_send_receipt_callback = functools.partial(
+                self.__receive_message_callback, ret_future=ret_future
+            )
             read_future.add_done_callback(handle_send_receipt_callback)
             return ret_future
         except Exception as e:
@@ -261,20 +309,28 @@ class SimpleConsumer(Client):
         try:
             responses = future.result()
             messages = self.__handle_receive_message_response(responses)
-            
self._set_future_callback_result(CallbackResult.async_receive_callback_result(ret_future,
 messages))
+            self._set_future_callback_result(
+                CallbackResult.async_receive_callback_result(ret_future, 
messages)
+            )
         except Exception as e:
-            
self._set_future_callback_result(CallbackResult.async_receive_callback_result(ret_future,
 e, False))
+            self._set_future_callback_result(
+                CallbackResult.async_receive_callback_result(ret_future, e, 
False)
+            )
 
     async def __receive_message_response(self, unary_stream_call):
         try:
             responses = list()
             async for res in unary_stream_call:
                 if res.HasField("message") or res.HasField("status"):
-                    logger.debug(f"consumer:{self.__consumer_group} receive 
response: {res}")
+                    logger.debug(
+                        f"consumer:{self.__consumer_group} receive response: 
{res}"
+                    )
                     responses.append(res)
             return responses
         except Exception as e:
-            logger.error(f"consumer:{self.__consumer_group} receive message 
exception: {e}")
+            logger.error(
+                f"consumer:{self.__consumer_group} receive message exception: 
{e}"
+            )
             raise e
 
     def __handle_receive_message_response(self, responses):
@@ -284,7 +340,8 @@ class SimpleConsumer(Client):
         for res in responses:
             if res.HasField("status"):
                 logger.debug(
-                    f"simple_consumer[{self.__consumer_group}] 
receive_message, code:{res.status.code}, message:{res.status.message}.")
+                    f"simple_consumer[{self.__consumer_group}] 
receive_message, code:{res.status.code}, message:{res.status.message}."
+                )
                 status = res.status
             elif res.HasField("message"):
                 messages.append(Message().fromProtobuf(res.message))
@@ -300,7 +357,12 @@ class SimpleConsumer(Client):
 
         try:
             req = self.__ack_req(message)
-            future = self.rpc_client.ack_message_async(queue.endpoints, req, 
metadata=self._sign())
+            future = self.rpc_client.ack_message_async(
+                queue.endpoints,
+                req,
+                metadata=self._sign(),
+                timeout=self.client_configuration.request_timeout,
+            )
             self.__handle_ack_result(future)
         except Exception as e:
             raise e
@@ -311,9 +373,16 @@ class SimpleConsumer(Client):
 
         try:
             req = self.__ack_req(message)
-            future = self.rpc_client.ack_message_async(queue.endpoints, req, 
metadata=self._sign())
+            future = self.rpc_client.ack_message_async(
+                queue.endpoints,
+                req,
+                metadata=self._sign(),
+                timeout=self.client_configuration.request_timeout,
+            )
             ret_future = Future()
-            ack_callback = functools.partial(self.__handle_ack_result, 
ret_future=ret_future)
+            ack_callback = functools.partial(
+                self.__handle_ack_result, ret_future=ret_future
+            )
             future.add_done_callback(ack_callback)
             return ret_future
         except Exception as e:
@@ -335,15 +404,21 @@ class SimpleConsumer(Client):
     def __handle_ack_result(self, future, ret_future=None):
         try:
             res = future.result()
-            logger.debug(f"consumer[{self.__consumer_group}] ack response, 
{res.status}")
+            logger.debug(
+                f"consumer[{self.__consumer_group}] ack response, {res.status}"
+            )
             MessagingResultChecker.check(res.status)
             if ret_future is not None:
-                
self._set_future_callback_result(CallbackResult.async_ack_callback_result(ret_future,
 None))
+                self._set_future_callback_result(
+                    CallbackResult.async_ack_callback_result(ret_future, None)
+                )
         except Exception as e:
             if ret_future is None:
                 raise e
             else:
-                
self._set_future_callback_result(CallbackResult.async_ack_callback_result(ret_future,
 e, False))
+                self._set_future_callback_result(
+                    CallbackResult.async_ack_callback_result(ret_future, e, 
False)
+                )
 
     # change_invisible
 
@@ -353,20 +428,34 @@ class SimpleConsumer(Client):
 
         try:
             req = self.__change_invisible_req(message, invisible_duration)
-            future = 
self.rpc_client.change_invisible_duration_async(queue.endpoints, req, 
metadata=self._sign())
+            future = self.rpc_client.change_invisible_duration_async(
+                queue.endpoints,
+                req,
+                metadata=self._sign(),
+                timeout=self.client_configuration.request_timeout,
+            )
             self.__handle_change_invisible_result(future)
         except Exception as e:
             raise e
 
-    def __change_invisible_duration_async(self, message: Message, queue, 
invisible_duration):
+    def __change_invisible_duration_async(
+        self, message: Message, queue, invisible_duration
+    ):
         if self.is_running is False:
             raise IllegalArgumentException("consumer is not running now.")
 
         try:
             req = self.__change_invisible_req(message, invisible_duration)
-            future = 
self.rpc_client.change_invisible_duration_async(queue.endpoints, req, 
metadata=self._sign())
+            future = self.rpc_client.change_invisible_duration_async(
+                queue.endpoints,
+                req,
+                metadata=self._sign(),
+                timeout=self.client_configuration.request_timeout,
+            )
             ret_future = Future()
-            change_invisible_callback = 
functools.partial(self.__handle_change_invisible_result, ret_future=ret_future)
+            change_invisible_callback = functools.partial(
+                self.__handle_change_invisible_result, ret_future=ret_future
+            )
             future.add_done_callback(change_invisible_callback)
             return ret_future
         except Exception as e:
@@ -386,17 +475,25 @@ class SimpleConsumer(Client):
     def __handle_change_invisible_result(self, future, ret_future=None):
         try:
             res = future.result()
-            logger.debug(f"consumer[{self.__consumer_group}] change invisible 
response, {res.status}")
+            logger.debug(
+                f"consumer[{self.__consumer_group}] change invisible response, 
{res.status}"
+            )
             MessagingResultChecker.check(res.status)
             if ret_future is not None:
                 self._set_future_callback_result(
-                    
CallbackResult.async_change_invisible_duration_callback_result(ret_future, 
None))
+                    
CallbackResult.async_change_invisible_duration_callback_result(
+                        ret_future, None
+                    )
+                )
         except Exception as e:
             if ret_future is None:
                 raise e
             else:
                 self._set_future_callback_result(
-                    
CallbackResult.async_change_invisible_duration_callback_result(ret_future, e, 
False))
+                    
CallbackResult.async_change_invisible_duration_callback_result(
+                        ret_future, e, False
+                    )
+                )
 
     """ property """
 
diff --git a/python/rocketmq/v5/exception/client_exception.py 
b/python/rocketmq/v5/exception/client_exception.py
index d4239645..8e9b4e44 100644
--- a/python/rocketmq/v5/exception/client_exception.py
+++ b/python/rocketmq/v5/exception/client_exception.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+
 class ClientException(Exception):
 
     def __init__(self, message, code=None):
diff --git a/python/rocketmq/v5/log/log_config.py 
b/python/rocketmq/v5/log/log_config.py
index 1de3240e..a363c650 100644
--- a/python/rocketmq/v5/log/log_config.py
+++ b/python/rocketmq/v5/log/log_config.py
@@ -19,35 +19,33 @@ import os
 __DIR = f'{os.path.expanduser("~/logs/rocketmq_python/")}'
 
 __LOG_CONFIG = {
-    'version': 1.0,
-    'disable_existing_loggers': False,
-    'formatters': {
-        'standard': {
-            'format': '%(asctime)s [%(levelname)s] %(message)s'
-        },
+    "version": 1.0,
+    "disable_existing_loggers": False,
+    "formatters": {
+        "standard": {"format": "%(asctime)s [%(levelname)s] %(message)s"},
     },
-    'handlers': {
+    "handlers": {
         # 'console': {
         #     'level': 'DEBUG',
         #     'class': 'logging.StreamHandler',
         #     'formatter': 'standard'
         # },
-        'file': {
-            'class': 'logging.handlers.RotatingFileHandler',
-            'level': 'INFO',
-            'formatter': 'standard',
-            'filename': f'{__DIR}/rocketmq_client.log',
-            'maxBytes': 1024 * 1024 * 100,  # 100MB
-            'backupCount': 10,
+        "file": {
+            "class": "logging.handlers.RotatingFileHandler",
+            "level": "INFO",
+            "formatter": "standard",
+            "filename": f"{__DIR}/rocketmq_client.log",
+            "maxBytes": 1024 * 1024 * 100,  # 100MB
+            "backupCount": 10,
         },
     },
-    'loggers': {
-        'rocketmq-python-client': {
-            'handlers': ['file'],
-            'level': 'INFO',
-            'propagate': False
+    "loggers": {
+        "rocketmq-python-client": {
+            "handlers": ["file"],
+            "level": "INFO",
+            "propagate": False,
         },
-    }
+    },
 }
 
 if not os.path.exists(__DIR):
diff --git a/python/rocketmq/v5/model/callback_result.py 
b/python/rocketmq/v5/model/callback_result.py
index 568d6c43..ad967973 100644
--- a/python/rocketmq/v5/model/callback_result.py
+++ b/python/rocketmq/v5/model/callback_result.py
@@ -47,25 +47,41 @@ class CallbackResult:
     @staticmethod
     def async_send_callback_result(future, result, success=True):
         callback_result = CallbackResult.callback_result(future, result, 
success)
-        callback_result.__result_type = 
CallbackResultType.ASYNC_SEND_CALLBACK_RESULT if success else 
CallbackResultType.ASYNC_SEND_CALLBACK_EXCEPTION
+        callback_result.__result_type = (
+            CallbackResultType.ASYNC_SEND_CALLBACK_RESULT
+            if success
+            else CallbackResultType.ASYNC_SEND_CALLBACK_EXCEPTION
+        )
         return callback_result
 
     @staticmethod
     def async_receive_callback_result(future, result, success=True):
         callback_result = CallbackResult.callback_result(future, result, 
success)
-        callback_result.__result_type = 
CallbackResultType.ASYNC_ACK_CALLBACK_RESULT if success else 
CallbackResultType.ASYNC_ACK_CALLBACK_EXCEPTION
+        callback_result.__result_type = (
+            CallbackResultType.ASYNC_ACK_CALLBACK_RESULT
+            if success
+            else CallbackResultType.ASYNC_ACK_CALLBACK_EXCEPTION
+        )
         return callback_result
 
     @staticmethod
     def async_ack_callback_result(future, result, success=True):
         callback_result = CallbackResult.callback_result(future, result, 
success)
-        callback_result.__result_type = 
CallbackResultType.ASYNC_RECEIVE_CALLBACK_RESULT if success else 
CallbackResultType.ASYNC_RECEIVE_CALLBACK_EXCEPTION
+        callback_result.__result_type = (
+            CallbackResultType.ASYNC_RECEIVE_CALLBACK_RESULT
+            if success
+            else CallbackResultType.ASYNC_RECEIVE_CALLBACK_EXCEPTION
+        )
         return callback_result
 
     @staticmethod
     def async_change_invisible_duration_callback_result(future, result, 
success=True):
         callback_result = CallbackResult.callback_result(future, result, 
success)
-        callback_result.__result_type = 
CallbackResultType.ASYNC_CHANGE_INVISIBLE_DURATION_RESULT if success else 
CallbackResultType.ASYNC_CHANGE_INVISIBLE_DURATION_EXCEPTION
+        callback_result.__result_type = (
+            CallbackResultType.ASYNC_CHANGE_INVISIBLE_DURATION_RESULT
+            if success
+            else CallbackResultType.ASYNC_CHANGE_INVISIBLE_DURATION_EXCEPTION
+        )
         return callback_result
 
     @staticmethod
diff --git a/python/rocketmq/v5/model/filter_expression.py 
b/python/rocketmq/v5/model/filter_expression.py
index 98e23da4..9cb0666e 100644
--- a/python/rocketmq/v5/model/filter_expression.py
+++ b/python/rocketmq/v5/model/filter_expression.py
@@ -19,7 +19,11 @@ from rocketmq.grpc_protocol import FilterType
 class FilterExpression:
     TAG_EXPRESSION_SUB_ALL = "*"
 
-    def __init__(self, expression=TAG_EXPRESSION_SUB_ALL, filter_type: 
FilterType = FilterType.TAG):
+    def __init__(
+        self,
+        expression=TAG_EXPRESSION_SUB_ALL,
+        filter_type: FilterType = FilterType.TAG,
+    ):
         self.__expression = expression
         self.__filter_type = filter_type
 
diff --git a/python/rocketmq/v5/model/message.py 
b/python/rocketmq/v5/model/message.py
index efd32085..ead7d795 100644
--- a/python/rocketmq/v5/model/message.py
+++ b/python/rocketmq/v5/model/message.py
@@ -37,15 +37,19 @@ class Message:
         self.__message_type = None
 
     def __str__(self) -> str:
-        return f"topic:{self.__topic}, tag:{self.__tag}, 
messageGroup:{self.__message_group}, " \
-               f"deliveryTimestamp:{self.__delivery_timestamp}, 
keys:{self.__keys}, properties:{self.__properties}"
+        return (
+            f"topic:{self.__topic}, tag:{self.__tag}, 
messageGroup:{self.__message_group}, "
+            f"deliveryTimestamp:{self.__delivery_timestamp}, 
keys:{self.__keys}, properties:{self.__properties}"
+        )
 
     def fromProtobuf(self, message: definition_pb2.Message):  # noqa
         try:
             self.__message_body_check_sum(message)
             self.__topic = message.topic.name
             self.__namespace = message.topic.resource_namespace
-            self.__message_id = 
MessageIdCodec.decode(message.system_properties.message_id)
+            self.__message_id = MessageIdCodec.decode(
+                message.system_properties.message_id
+            )
             self.__body = self.__uncompress_body(message)
             self.__tag = message.system_properties.tag
             self.__message_group = message.system_properties.message_group
@@ -71,18 +75,26 @@ class Message:
         if message.system_properties.body_digest.type == DigestType.CRC32:
             crc32_sum = Misc.crc32_checksum(message.body)
             if message.system_properties.body_digest.checksum != crc32_sum:
-                raise Exception(f"(body_check_sum exception, 
{message.digest.checksum} != crc32_sum {crc32_sum}")
+                raise Exception(
+                    f"(body_check_sum exception, {message.digest.checksum} != 
crc32_sum {crc32_sum}"
+                )
         elif message.system_properties.body_digest.type == DigestType.MD5:
             md5_sum = Misc.md5_checksum(message.body)
             if message.system_properties.body_digest.checksum != md5_sum:
-                raise Exception(f"(body_check_sum exception, 
{message.digest.checksum} != crc32_sum {md5_sum}")
+                raise Exception(
+                    f"(body_check_sum exception, {message.digest.checksum} != 
crc32_sum {md5_sum}"
+                )
         elif message.system_properties.body_digest.type == DigestType.SHA1:
             sha1_sum = Misc.sha1_checksum(message.body)
             if message.system_properties.body_digest.checksum != sha1_sum:
-                raise Exception(f"(body_check_sum exception, 
{message.digest.checksum} != crc32_sum {sha1_sum}")
+                raise Exception(
+                    f"(body_check_sum exception, {message.digest.checksum} != 
crc32_sum {sha1_sum}"
+                )
         else:
-            raise Exception(f"unsupported message body digest algorithm, 
{message.system_properties.body_digest.type},"
-                            f" {message.topic}, 
{message.system_properties.message_id}")
+            raise Exception(
+                f"unsupported message body digest algorithm, 
{message.system_properties.body_digest.type},"
+                f" {message.topic}, {message.system_properties.message_id}"
+            )
 
     @staticmethod
     def __uncompress_body(message):
@@ -92,7 +104,8 @@ class Message:
             return message.body
         else:
             raise Exception(
-                f"unsupported message encoding algorithm, 
{message.system_properties.body_encoding}, {message.topic}, 
{message.system_properties.message_id}")
+                f"unsupported message encoding algorithm, 
{message.system_properties.body_encoding}, {message.topic}, 
{message.system_properties.message_id}"
+            )
 
     """ property """
 
@@ -154,18 +167,20 @@ class Message:
 
     @body.setter
     def body(self, body):
-        if body is None or body.strip() == '':
+        if body is None or body.strip() == "":
             raise IllegalArgumentException("body should not be blank")
         self.__body = body
 
     @topic.setter
     def topic(self, topic):
-        if topic is None or topic.strip() == '':
+        if topic is None or topic.strip() == "":
             raise IllegalArgumentException("topic has not been set yet")
         if Misc.is_valid_topic(topic):
             self.__topic = topic
         else:
-            raise IllegalArgumentException(f"topic does not match the regex 
[regex={Misc.TOPIC_PATTERN}]")
+            raise IllegalArgumentException(
+                f"topic does not match the regex [regex={Misc.TOPIC_PATTERN}]"
+            )
 
     @message_id.setter
     def message_id(self, message_id):
@@ -173,16 +188,18 @@ class Message:
 
     @tag.setter
     def tag(self, tag):
-        if tag is None or tag.strip() == '':
+        if tag is None or tag.strip() == "":
             raise IllegalArgumentException("tag should not be blank")
         if "|" in tag:
-            raise IllegalArgumentException("tag should not contain \"|\"")
+            raise IllegalArgumentException('tag should not contain "|"')
         self.__tag = tag
 
     @message_group.setter
     def message_group(self, message_group):
         if self.__delivery_timestamp is not None:
-            raise IllegalArgumentException("deliveryTimestamp and messageGroup 
should not be set at same time")
+            raise IllegalArgumentException(
+                "deliveryTimestamp and messageGroup should not be set at same 
time"
+            )
         if message_group is None or len(message_group) == 0:
             raise IllegalArgumentException("messageGroup should not be blank")
         self.__message_group = message_group
@@ -190,13 +207,15 @@ class Message:
     @delivery_timestamp.setter
     def delivery_timestamp(self, delivery_timestamp):
         if self.__message_group is not None:
-            raise IllegalArgumentException("deliveryTimestamp and messageGroup 
should not be set at same time")
+            raise IllegalArgumentException(
+                "deliveryTimestamp and messageGroup should not be set at same 
time"
+            )
         self.__delivery_timestamp = delivery_timestamp
 
     @keys.setter
     def keys(self, *keys):
         for key in keys:
-            if not key or key.strip() == '':
+            if not key or key.strip() == "":
                 raise IllegalArgumentException("key should not be blank")
         self.__keys.update(set(keys))
 
@@ -205,8 +224,8 @@ class Message:
         self.__message_type = message_type
 
     def add_property(self, key, value):
-        if key is None or key.strip() == '':
+        if key is None or key.strip() == "":
             raise IllegalArgumentException("key should not be blank")
-        if value is None or value.strip() == '':
+        if value is None or value.strip() == "":
             raise IllegalArgumentException("value should not be blank")
         self.__properties[key] = value
diff --git a/python/rocketmq/v5/model/metrics.py 
b/python/rocketmq/v5/model/metrics.py
index 1106ae4c..31e84a4e 100644
--- a/python/rocketmq/v5/model/metrics.py
+++ b/python/rocketmq/v5/model/metrics.py
@@ -54,9 +54,15 @@ class MetricContext:
 
 class HistogramEnum(Enum):
     # a histogram that records the cost time of successful api calls of 
message publishing.
-    SEND_COST_TIME = ("rocketmq_send_cost_time", [1.0, 5.0, 10.0, 20.0, 50.0, 
200.0, 500.0])
+    SEND_COST_TIME = (
+        "rocketmq_send_cost_time",
+        [1.0, 5.0, 10.0, 20.0, 50.0, 200.0, 500.0],
+    )
     # a histogram that records the latency of message delivery from remote.
-    DELIVERY_LATENCY = ("rocketmq_delivery_latency", [1.0, 5.0, 10.0, 20.0, 
50.0, 200.0, 500.0])
+    DELIVERY_LATENCY = (
+        "rocketmq_delivery_latency",
+        [1.0, 5.0, 10.0, 20.0, 50.0, 200.0, 500.0],
+    )
 
     def __init__(self, histogram_name, buckets):
         self.__histogram_name = histogram_name
diff --git a/python/rocketmq/v5/model/send_receipt.py 
b/python/rocketmq/v5/model/send_receipt.py
index 90d576f5..f67997a1 100644
--- a/python/rocketmq/v5/model/send_receipt.py
+++ b/python/rocketmq/v5/model/send_receipt.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+
 class SendReceipt:
 
     def __init__(self, message_id, transaction_id, message_queue, offset):
diff --git a/python/rocketmq/v5/model/topic_route.py 
b/python/rocketmq/v5/model/topic_route.py
index e62b6d6f..b93728e2 100644
--- a/python/rocketmq/v5/model/topic_route.py
+++ b/python/rocketmq/v5/model/topic_route.py
@@ -31,10 +31,16 @@ class MessageQueue:
         self.__accept_message_types = set(queue.accept_message_types)
 
     def is_readable(self):
-        return self.__permission == Permission.READ or self.__permission == 
Permission.READ_WRITE
+        return (
+            self.__permission == Permission.READ
+            or self.__permission == Permission.READ_WRITE
+        )
 
     def is_writable(self):
-        return self.__permission == Permission.WRITE or self.__permission == 
Permission.READ_WRITE
+        return (
+            self.__permission == Permission.WRITE
+            or self.__permission == Permission.READ_WRITE
+        )
 
     def is_master_broker(self):
         return self.__broker_id == MessageQueue.MASTER_BROKER_ID
@@ -42,7 +48,17 @@ class MessageQueue:
     def __eq__(self, other: object) -> bool:
         if not isinstance(other, MessageQueue):
             return False
-        ret = (self.__topic == other.__topic and self.__namespace == 
other.__namespace and self.__queue_id == other.__queue_id and self.__permission 
== other.__permission and self.__broker_name == other.__broker_name and 
self.__broker_id == other.__broker_id and self.__broker_endpoints == 
other.__broker_endpoints and sorted(self.__accept_message_types) == 
sorted(other.__accept_message_types))
+        ret = (
+            self.__topic == other.__topic
+            and self.__namespace == other.__namespace
+            and self.__queue_id == other.__queue_id
+            and self.__permission == other.__permission
+            and self.__broker_name == other.__broker_name
+            and self.__broker_id == other.__broker_id
+            and self.__broker_endpoints == other.__broker_endpoints
+            and sorted(self.__accept_message_types)
+            == sorted(other.__accept_message_types)
+        )
         return ret
 
     def __str__(self):
@@ -75,7 +91,9 @@ class MessageQueue:
 class TopicRouteData:
 
     def __init__(self, message_queues):
-        self.__message_queues = list(map(lambda queue: MessageQueue(queue), 
message_queues))
+        self.__message_queues = list(
+            map(lambda queue: MessageQueue(queue), message_queues)
+        )
 
     def __eq__(self, other):
         if self is other:
@@ -88,7 +106,11 @@ class TopicRouteData:
         return hash(tuple(self.__message_queues))
 
     def __str__(self):
-        return "message_queues:(" + ', '.join(str(queue) for queue in 
self.__message_queues) + ")"
+        return (
+            "message_queues:("
+            + ", ".join(str(queue) for queue in self.__message_queues)
+            + ")"
+        )
 
     def all_endpoints(self):
         endpoints_map = {}
diff --git a/python/rocketmq/v5/producer/producer.py 
b/python/rocketmq/v5/producer/producer.py
index 9f4ae356..ba9e3d9b 100644
--- a/python/rocketmq/v5/producer/producer.py
+++ b/python/rocketmq/v5/producer/producer.py
@@ -48,21 +48,31 @@ class Transaction:
     def add_half_message(self, message: Message):
         with Transaction.__transaction_lock:
             if message is None:
-                raise IllegalArgumentException("add half message error, 
message is none.")
+                raise IllegalArgumentException(
+                    "add half message error, message is none."
+                )
 
             if self.__message is None:
                 self.__message = message
             else:
-                raise IllegalArgumentException(f"message already existed in 
transaction, topic:{message.topic}")
+                raise IllegalArgumentException(
+                    f"message already existed in transaction, 
topic:{message.topic}"
+                )
 
     def add_send_receipt(self, send_receipt):
         with Transaction.__transaction_lock:
             if self.__message is None:
-                raise IllegalArgumentException("add send receipt error, no 
message in transaction.")
+                raise IllegalArgumentException(
+                    "add send receipt error, no message in transaction."
+                )
             if send_receipt is None:
-                raise IllegalArgumentException("add send receipt error, send 
receipt in none.")
+                raise IllegalArgumentException(
+                    "add send receipt error, send receipt in none."
+                )
             if self.__message.message_id != send_receipt.message_id:
-                raise IllegalArgumentException("can't add another send receipt 
to a half message.")
+                raise IllegalArgumentException(
+                    "can't add another send receipt to a half message."
+                )
 
             self.__send_receipt = send_receipt
 
@@ -76,20 +86,28 @@ class Transaction:
         if self.__message is None:
             raise IllegalArgumentException("no message in transaction.")
         if self.__send_receipt is None or self.__send_receipt.transaction_id 
is None:
-            raise IllegalArgumentException("no transaction_id in transaction, 
must send half message at first.")
+            raise IllegalArgumentException(
+                "no transaction_id in transaction, must send half message at 
first."
+            )
 
         try:
-            res = 
self.__producer.end_transaction(self.__send_receipt.message_queue.endpoints, 
self.__message,
-                                                  
self.__send_receipt.transaction_id, result,
-                                                  
TransactionSource.SOURCE_CLIENT)
+            res = self.__producer.end_transaction(
+                self.__send_receipt.message_queue.endpoints,
+                self.__message,
+                self.__send_receipt.transaction_id,
+                result,
+                TransactionSource.SOURCE_CLIENT,
+            )
             if res.status.code != Code.OK:
                 logger.error(
-                    f"transaction commit or rollback error. 
topic:{self.__message.topic}, message_id:{self.__message.message_id}, 
transaction_id:{self.__send_receipt.transaction_id}, 
transactionResolution:{result}")
+                    f"transaction commit or rollback error. 
topic:{self.__message.topic}, message_id:{self.__message.message_id}, 
transaction_id:{self.__send_receipt.transaction_id}, 
transactionResolution:{result}"
+                )
                 raise ClientException(res.status.message, res.status.code)
             return res
         except Exception as e:
             logger.error(
-                f"end transaction error, topic:{self.__message.topic}, 
message_id:{self.__send_receipt.message_id}, 
transaction_id:{self.__send_receipt.transaction_id}, 
transactionResolution:{result}: {e}")
+                f"end transaction error, topic:{self.__message.topic}, 
message_id:{self.__send_receipt.message_id}, 
transaction_id:{self.__send_receipt.transaction_id}, 
transactionResolution:{result}: {e}"
+            )
             raise e
 
     """ property """
@@ -109,11 +127,15 @@ class TransactionChecker(metaclass=abc.ABCMeta):
 class Producer(Client):
     MAX_SEND_ATTEMPTS = 3  # max retry times when send failed
 
-    def __init__(self, client_configuration, topics=None, checker=None, 
tls_enable=False):
+    def __init__(
+        self, client_configuration, topics=None, checker=None, tls_enable=False
+    ):
         super().__init__(client_configuration, topics, ClientType.PRODUCER, 
tls_enable)
         # {topic, QueueSelector}
         self.__send_queue_selectors = ConcurrentMap()
-        self.__checker = checker  # checker for transaction message, handle 
checking from server
+        self.__checker = (
+            checker  # checker for transaction message, handle checking from 
server
+        )
 
     def __str__(self):
         return f"{ClientType.Name(self.client_type)} 
client_id:{self.client_id}"
@@ -128,7 +150,8 @@ class Producer(Client):
         topic_queue = self.__select_send_queue(message.topic)
         if message.message_type not in topic_queue.accept_message_types:
             raise IllegalArgumentException(
-                f"current message type not match with queue accept message 
types, topic:{message.topic}, message_type:{message.message_type}, queue access 
type:{topic_queue.accept_message_types}")
+                f"current message type not match with queue accept message 
types, topic:{message.topic}, message_type:{message.message_type}, queue access 
type:{topic_queue.accept_message_types}"
+            )
 
         if transaction is None:
             try:
@@ -146,7 +169,9 @@ class Producer(Client):
             except IllegalArgumentException as e:
                 raise e
             except Exception as e:
-                logger.error(f"send transaction message exception, topic: 
{message.topic}, e: {e}")
+                logger.error(
+                    f"send transaction message exception, topic: 
{message.topic}, e: {e}"
+                )
                 raise e
 
     def send_async(self, message: Message):
@@ -156,9 +181,11 @@ class Producer(Client):
         self.__wrap_sending_message(message, False)
         topic_queue = self.__select_send_queue(message.topic)
         if message.message_type not in topic_queue.accept_message_types:
-            raise IllegalArgumentException(f"current message type not match 
with queue accept message types, "
-                                           f"topic:{message.topic}, 
message_type:{message.message_type}, "
-                                           f"queue access 
type:{topic_queue.accept_message_types}")
+            raise IllegalArgumentException(
+                f"current message type not match with queue accept message 
types, "
+                f"topic:{message.topic}, message_type:{message.message_type}, "
+                f"queue access type:{topic_queue.accept_message_types}"
+            )
 
         try:
             return self.__send_async(message, topic_queue)
@@ -170,7 +197,9 @@ class Producer(Client):
 
     def begin_transaction(self):
         if self.is_running is False:
-            raise IllegalStateException("unable to begin transaction because 
producer is not running")
+            raise IllegalStateException(
+                "unable to begin transaction because producer is not running"
+            )
 
         if self.__checker is None:
             raise IllegalArgumentException("Transaction checker should not be 
null.")
@@ -178,22 +207,31 @@ class Producer(Client):
 
     def end_transaction(self, endpoints, message, transaction_id, result, 
source):
         if self.is_running is False:
-            raise IllegalStateException("unable to end transaction because 
producer is not running")
+            raise IllegalStateException(
+                "unable to end transaction because producer is not running"
+            )
 
         if self.__checker is None:
             raise IllegalArgumentException("Transaction checker should not be 
null.")
 
         req = self.__end_transaction_req(message, transaction_id, result, 
source)
-        future = self.rpc_client.end_transaction_async(endpoints, req, 
metadata=self._sign(),
-                                                       
timeout=self.client_configuration.request_timeout)
+        future = self.rpc_client.end_transaction_async(
+            endpoints,
+            req,
+            metadata=self._sign(),
+            timeout=self.client_configuration.request_timeout,
+        )
         return future.result()
 
-    async def on_recover_orphaned_transaction_command(self, endpoints, msg, 
transaction_id):
+    async def on_recover_orphaned_transaction_command(
+        self, endpoints, msg, transaction_id
+    ):
         # call this function from server side stream, in RpcClient._io_loop
         try:
             if self.is_running is False:
                 raise IllegalStateException(
-                    "unable to recover orphaned transaction command because 
producer is not running")
+                    "unable to recover orphaned transaction command because 
producer is not running"
+                )
 
             if self.__checker is None:
                 raise IllegalArgumentException("No transaction checker 
registered.")
@@ -201,15 +239,25 @@ class Producer(Client):
             result = self.__checker.check(message)
 
             if result == TransactionResolution.COMMIT:
-                res = await self.__commit_for_server_check(endpoints, message, 
transaction_id,
-                                                           
TransactionSource.SOURCE_SERVER_CHECK)
+                res = await self.__commit_for_server_check(
+                    endpoints,
+                    message,
+                    transaction_id,
+                    TransactionSource.SOURCE_SERVER_CHECK,
+                )
                 logger.debug(
-                    f"commit message. message_id: {message.message_id}, 
transaction_id: {transaction_id}, res: {res}")
+                    f"commit message. message_id: {message.message_id}, 
transaction_id: {transaction_id}, res: {res}"
+                )
             elif result == TransactionResolution.ROLLBACK:
-                res = await self.__rollback_for_server_check(endpoints, 
message, transaction_id,
-                                                             
TransactionSource.SOURCE_SERVER_CHECK)
+                res = await self.__rollback_for_server_check(
+                    endpoints,
+                    message,
+                    transaction_id,
+                    TransactionSource.SOURCE_SERVER_CHECK,
+                )
                 logger.debug(
-                    f"rollback message. message_id: {message.message_id}, 
transaction_id: {transaction_id}, res: {res}")
+                    f"rollback message. message_id: {message.message_id}, 
transaction_id: {transaction_id}, res: {res}"
+                )
         except Exception as e:
             logger.error(f"on_recover_orphaned_transaction_command exception: 
{e}")
 
@@ -273,17 +321,35 @@ class Producer(Client):
     def __send(self, message: Message, topic_queue, attempt=1) -> SendReceipt:
         req = self.__send_req(message)
         send_context = self.client_metrics.send_before(message.topic)
-        send_message_future = 
self.rpc_client.send_message_async(topic_queue.endpoints, req, self._sign())
-        return self.__handle_sync_send_receipt(send_message_future, message, 
topic_queue, attempt, send_context)
-
-    def __handle_sync_send_receipt(self, send_message_future, message, 
topic_queue, attempt, send_metric_context=None):
+        send_message_future = self.rpc_client.send_message_async(
+            topic_queue.endpoints,
+            req,
+            self._sign(),
+            timeout=self.client_configuration.request_timeout,
+        )
+        return self.__handle_sync_send_receipt(
+            send_message_future, message, topic_queue, attempt, send_context
+        )
+
+    def __handle_sync_send_receipt(
+        self,
+        send_message_future,
+        message,
+        topic_queue,
+        attempt,
+        send_metric_context=None,
+    ):
         try:
-            send_receipt = 
self.__process_send_message_response(send_message_future, topic_queue)
+            send_receipt = self.__process_send_message_response(
+                send_message_future, topic_queue
+            )
             self.client_metrics.send_after(send_metric_context, True)
             return send_receipt
         except Exception as e:
             attempt += 1
-            retry_exception_future = 
self.__check_send_retry_condition(message, topic_queue, attempt, e)
+            retry_exception_future = self.__check_send_retry_condition(
+                message, topic_queue, attempt, e
+            )
             if retry_exception_future is not None:
                 # end retry with exception
                 self.client_metrics.send_after(send_metric_context, False)
@@ -296,29 +362,55 @@ class Producer(Client):
     def __send_async(self, message: Message, topic_queue, attempt=1, 
ret_future=None):
         req = self.__send_req(message)
         send_context = self.client_metrics.send_before(message.topic)
-        send_message_future = 
self.rpc_client.send_message_async(topic_queue.endpoints, req, self._sign())
+        send_message_future = self.rpc_client.send_message_async(
+            topic_queue.endpoints,
+            req,
+            self._sign(),
+            timeout=self.client_configuration.request_timeout,
+        )
         if ret_future is None:
             ret_future = Future()
-        handle_send_receipt_callback = 
functools.partial(self.__handle_async_send_receipt, message=message,
-                                                         
topic_queue=topic_queue, attempt=attempt,
-                                                         
ret_future=ret_future, send_metric_context=send_context)
+        handle_send_receipt_callback = functools.partial(
+            self.__handle_async_send_receipt,
+            message=message,
+            topic_queue=topic_queue,
+            attempt=attempt,
+            ret_future=ret_future,
+            send_metric_context=send_context,
+        )
         send_message_future.add_done_callback(handle_send_receipt_callback)
         return ret_future
 
-    def __handle_async_send_receipt(self, send_message_future, message, 
topic_queue, attempt, ret_future,
-                                    send_metric_context=None):
+    def __handle_async_send_receipt(
+        self,
+        send_message_future,
+        message,
+        topic_queue,
+        attempt,
+        ret_future,
+        send_metric_context=None,
+    ):
         try:
-            send_receipt = 
self.__process_send_message_response(send_message_future, topic_queue)
+            send_receipt = self.__process_send_message_response(
+                send_message_future, topic_queue
+            )
             self.client_metrics.send_after(send_metric_context, True)
-            
self._set_future_callback_result(CallbackResult.async_send_callback_result(ret_future,
 send_receipt))
+            self._set_future_callback_result(
+                CallbackResult.async_send_callback_result(ret_future, 
send_receipt)
+            )
         except Exception as e:
             attempt += 1
-            retry_exception_future = 
self.__check_send_retry_condition(message, topic_queue, attempt, e)
+            retry_exception_future = self.__check_send_retry_condition(
+                message, topic_queue, attempt, e
+            )
             if retry_exception_future is not None:
                 # end retry with exception
                 self.client_metrics.send_after(send_metric_context, False)
                 self._set_future_callback_result(
-                    CallbackResult.async_send_callback_result(ret_future, 
retry_exception_future.exception(), False))
+                    CallbackResult.async_send_callback_result(
+                        ret_future, retry_exception_future.exception(), False
+                    )
+                )
                 return
             # resend message
             topic_queue = self.__select_send_queue(message.topic)
@@ -328,28 +420,34 @@ class Producer(Client):
         res = send_message_future.result()
         MessagingResultChecker.check(res.status)
         entries = res.entries
-        assert len(
-            entries) == 1, f"entries size error, the send response entries 
size is {len(entries)}, {self.__str__()}"
+        assert (
+            len(entries) == 1
+        ), f"entries size error, the send response entries size is 
{len(entries)}, {self.__str__()}"
         entry = entries[0]
-        return SendReceipt(entry.message_id, entry.transaction_id, 
topic_queue, entry.offset)
+        return SendReceipt(
+            entry.message_id, entry.transaction_id, topic_queue, entry.offset
+        )
 
     def __check_send_retry_condition(self, message, topic_queue, attempt, e):
         end_retry = False
         if attempt > Producer.MAX_SEND_ATTEMPTS:
             logger.error(
-                f"{self.__str__()} failed to send message to 
{topic_queue.endpoints.__str__()}, because of run out of attempt times, 
topic:{message.topic}, message_id:{message.message_id},  
message_type:{message.message_type}, attempt:{attempt}")
+                f"{self.__str__()} failed to send message to 
{topic_queue.endpoints.__str__()}, because of run out of attempt times, 
topic:{message.topic}, message_id:{message.message_id},  
message_type:{message.message_type}, attempt:{attempt}"
+            )
             end_retry = True
 
         # no need more attempts for transactional message
         if message.message_type == MessageType.TRANSACTION:
             logger.error(
-                f"{self.__str__()} failed to send message to 
{topic_queue.endpoints.__str__()}, topic:{message.topic}, 
message_id:{message.message_id}, message_type:{message.message_type} 
,attempt:{attempt}")
+                f"{self.__str__()} failed to send message to 
{topic_queue.endpoints.__str__()}, topic:{message.topic}, 
message_id:{message.message_id}, message_type:{message.message_type} 
,attempt:{attempt}"
+            )
             end_retry = True
 
         # end retry if system busy
         if isinstance(e, TooManyRequestsException):
             logger.error(
-                f"{self.__str__()} failed to send message to 
{topic_queue.endpoints.__str__()}, because of to too many requests, 
topic:{message.topic},  message_type:{message.message_type}, 
message_id:{message.message_id}, attempt:{attempt}")
+                f"{self.__str__()} failed to send message to 
{topic_queue.endpoints.__str__()}, because of to too many requests, 
topic:{message.topic},  message_type:{message.message_type}, 
message_id:{message.message_id}, attempt:{attempt}"
+            )
             end_retry = True
 
         if end_retry:
@@ -374,7 +472,8 @@ class Producer(Client):
             max_body_size = 4 * 1024 * 1024  # max body size is 4m
             if len(message.body) > max_body_size:
                 raise IllegalArgumentException(
-                    f"Message body size exceeds the threshold, max 
size={max_body_size} bytes")
+                    f"Message body size exceeds the threshold, max 
size={max_body_size} bytes"
+                )
 
             msg.body = message.body
             if message.tag is not None:
@@ -391,13 +490,19 @@ class Producer(Client):
             if message.message_group is not None:
                 msg.system_properties.message_group = message.message_group
             if message.delivery_timestamp is not None:
-                msg.system_properties.delivery_timestamp.seconds = 
message.delivery_timestamp
+                msg.system_properties.delivery_timestamp.seconds = (
+                    message.delivery_timestamp
+                )
             return req
         except Exception as e:
             raise e
 
     def __send_message_type(self, message: Message, is_transaction=False):
-        if message.message_group is None and message.delivery_timestamp is 
None and is_transaction is False:
+        if (
+            message.message_group is None
+            and message.delivery_timestamp is None
+            and is_transaction is False
+        ):
             return MessageType.NORMAL
 
         if message.message_group is not None and is_transaction is False:
@@ -406,21 +511,30 @@ class Producer(Client):
         if message.delivery_timestamp is not None and is_transaction is False:
             return MessageType.DELAY
 
-        if message.message_group is None and message.delivery_timestamp is 
None and is_transaction is True:
+        if (
+            message.message_group is None
+            and message.delivery_timestamp is None
+            and is_transaction is True
+        ):
             return MessageType.TRANSACTION
 
         # transaction semantics is conflicted with fifo/delay.
-        logger.error(f"{self.__str__()} set send message type exception, 
message: {str(message)}")
-        raise IllegalArgumentException("transactional message should not set 
messageGroup or deliveryTimestamp")
+        logger.error(
+            f"{self.__str__()} set send message type exception, message: 
{str(message)}"
+        )
+        raise IllegalArgumentException(
+            "transactional message should not set messageGroup or 
deliveryTimestamp"
+        )
 
     def __select_send_queue(self, topic):
         try:
             route = self._retrieve_topic_route_data(topic)
-            queue_selector = self.__send_queue_selectors.put_if_absent(topic,
-                                                                       
QueueSelector.producer_queue_selector(route))
+            queue_selector = self.__send_queue_selectors.put_if_absent(
+                topic, QueueSelector.producer_queue_selector(route)
+            )
             return queue_selector.select_next_queue()
         except Exception as e:
-            logger.error(f"producer select topic:{topic} queue index 
exception, {e}")
+            logger.error(f"producer select topic:{topic} queue raise 
exception, {e}")
             raise e
 
     def __end_transaction_req(self, message: Message, transaction_id, result, 
source):
@@ -433,15 +547,27 @@ class Producer(Client):
         req.source = source
         return req
 
-    def __commit_for_server_check(self, endpoints, message: Message, 
transaction_id, source):
-        return self.__end_transaction_for_server_check(endpoints, message, 
transaction_id, TransactionResolution.COMMIT,
-                                                       source)
-
-    def __rollback_for_server_check(self, endpoints, message: Message, 
transaction_id, source):
-        return self.__end_transaction_for_server_check(endpoints, message, 
transaction_id,
-                                                       
TransactionResolution.ROLLBACK, source)
-
-    def __end_transaction_for_server_check(self, endpoints, message: Message, 
transaction_id, result, source):
+    def __commit_for_server_check(
+        self, endpoints, message: Message, transaction_id, source
+    ):
+        return self.__end_transaction_for_server_check(
+            endpoints, message, transaction_id, TransactionResolution.COMMIT, 
source
+        )
+
+    def __rollback_for_server_check(
+        self, endpoints, message: Message, transaction_id, source
+    ):
+        return self.__end_transaction_for_server_check(
+            endpoints, message, transaction_id, 
TransactionResolution.ROLLBACK, source
+        )
+
+    def __end_transaction_for_server_check(
+        self, endpoints, message: Message, transaction_id, result, source
+    ):
         req = self.__end_transaction_req(message, transaction_id, result, 
source)
-        return self.rpc_client.end_transaction_for_server_check(endpoints, 
req, metadata=self._sign(),
-                                                                
timeout=self.client_configuration.request_timeout)
+        return self.rpc_client.end_transaction_for_server_check(
+            endpoints,
+            req,
+            metadata=self._sign(),
+            timeout=self.client_configuration.request_timeout,
+        )
diff --git a/python/rocketmq/v5/test/test_base.py 
b/python/rocketmq/v5/test/test_base.py
index aa49fafc..fd315eee 100644
--- a/python/rocketmq/v5/test/test_base.py
+++ b/python/rocketmq/v5/test/test_base.py
@@ -33,7 +33,7 @@ class TestBase:
     FAKE_CLIENT_ID = ClientId()
     FAKE_TOPIC_0 = "foo-bar-topic-0"
     FAKE_TOPIC_1 = "foo-bar-topic-1"
-    FAKE_MESSAGE_BODY = "foobar".encode('utf-8')
+    FAKE_MESSAGE_BODY = "foobar".encode("utf-8")
     FAKE_TAG_0 = "foo-bar-tag-0"
     FAKE_BROKER_NAME_0 = "foo-bar-broker-name-0"
     FAKE_BROKER_NAME_1 = "foo-bar-broker-name-1"
@@ -49,7 +49,9 @@ class TestBase:
     @staticmethod
     def fake_client_config():
         credentials = Credentials(TestBase.FAKE_AK, TestBase.FAKE_SK)
-        config = ClientConfiguration(TestBase.FAKE_ENDPOINTS, credentials, 
TestBase.FAKE_NAMESPACE)
+        config = ClientConfiguration(
+            TestBase.FAKE_ENDPOINTS, credentials, TestBase.FAKE_NAMESPACE
+        )
         return config
 
     @staticmethod
@@ -75,7 +77,9 @@ class TestBase:
         msg.body = TestBase.FAKE_MESSAGE_BODY
         msg.system_properties.born_host = TestBase.FAKE_HOST_0
         msg.system_properties.born_timestamp.seconds = int(time.time() * 1000)
-        msg.system_properties.delivery_timestamp.seconds = 
msg.system_properties.born_timestamp.seconds - 10
+        msg.system_properties.delivery_timestamp.seconds = (
+            msg.system_properties.born_timestamp.seconds - 10
+        )
         msg.system_properties.message_type = 1
         msg.system_properties.body_encoding = 1
         return msg
@@ -103,7 +107,13 @@ class TestBase:
         fake_queue.broker.CopyFrom(fake_broker)
         fake_queue.permission = Permission.READ_WRITE
         fake_queue.accept_message_types.extend(
-            (MessageType.NORMAL, MessageType.FIFO, MessageType.DELAY, 
MessageType.TRANSACTION))
+            (
+                MessageType.NORMAL,
+                MessageType.FIFO,
+                MessageType.DELAY,
+                MessageType.TRANSACTION,
+            )
+        )
         return MessageQueue(fake_queue)
 
     @staticmethod
@@ -135,7 +145,12 @@ class TestBase:
         fake_response = TestBase.fake_send_success_response()
         fake_message_queue = TestBase.fake_queue(topic)
         fake_entry = fake_response.entries[0]
-        return SendReceipt(fake_entry.message_id, fake_entry.transaction_id, 
fake_message_queue, fake_entry.offset)
+        return SendReceipt(
+            fake_entry.message_id,
+            fake_entry.transaction_id,
+            fake_message_queue,
+            fake_entry.offset,
+        )
 
     @staticmethod
     def fake_receive_receipt():
diff --git a/python/rocketmq/v5/test/test_consumer.py 
b/python/rocketmq/v5/test/test_consumer.py
index 47db493a..8bd107cb 100644
--- a/python/rocketmq/v5/test/test_consumer.py
+++ b/python/rocketmq/v5/test/test_consumer.py
@@ -26,24 +26,40 @@ from rocketmq.v5.test import TestBase
 
 class TestNormalConsumer(unittest.TestCase):
 
-    @patch.object(Message, '_Message__message_body_check_sum')
-    @patch.object(SimpleConsumer, '_SimpleConsumer__receive_message_response')
-    @patch.object(RpcClient, 'receive_message_async')
-    @patch.object(SimpleConsumer, '_SimpleConsumer__select_topic_queue',
-                  return_value=TestBase.fake_queue(TestBase.FAKE_TOPIC_0))
-    @patch.object(SimpleConsumer, '_SimpleConsumer__select_topic_for_receive', 
return_value=TestBase.FAKE_TOPIC_0)
-    @patch.object(Client, '_Client__start_scheduler', return_value=None)
-    @patch.object(Client, '_Client__update_topic_route', return_value=None)
-    def test_receive(self, mock_update_topic_route, mock_start_scheduler, 
mock_select_topic_for_receive,
-                     mock_select_topic_queue, mock_receive_message_async, 
mock_receive_message_response,
-                     mock_message_body_check_sum):
+    @patch.object(Message, "_Message__message_body_check_sum")
+    @patch.object(SimpleConsumer, "_SimpleConsumer__receive_message_response")
+    @patch.object(RpcClient, "receive_message_async")
+    @patch.object(
+        SimpleConsumer,
+        "_SimpleConsumer__select_topic_queue",
+        return_value=TestBase.fake_queue(TestBase.FAKE_TOPIC_0),
+    )
+    @patch.object(
+        SimpleConsumer,
+        "_SimpleConsumer__select_topic_for_receive",
+        return_value=TestBase.FAKE_TOPIC_0,
+    )
+    @patch.object(Client, "_Client__start_scheduler", return_value=None)
+    @patch.object(Client, "_Client__update_topic_route", return_value=None)
+    def test_receive(
+        self,
+        mock_update_topic_route,
+        mock_start_scheduler,
+        mock_select_topic_for_receive,
+        mock_select_topic_queue,
+        mock_receive_message_async,
+        mock_receive_message_response,
+        mock_message_body_check_sum,
+    ):
         future = Future()
         future.set_result(list())
         mock_receive_message_async.return_value = future
         mock_receive_message_response.return_value = 
TestBase.fake_receive_receipt()
 
         subs = {TestBase.FAKE_TOPIC_0: FilterExpression()}
-        consumer = SimpleConsumer(TestBase.fake_client_config(), 
TestBase.FAKE_CONSUMER_GROUP_0, subs)
+        consumer = SimpleConsumer(
+            TestBase.fake_client_config(), TestBase.FAKE_CONSUMER_GROUP_0, subs
+        )
         consumer.startup()
         messages = consumer.receive(32, 10)
         self.assertIsInstance(messages[0], Message)
diff --git a/python/rocketmq/v5/test/test_producer.py 
b/python/rocketmq/v5/test/test_producer.py
index 4dc73d29..e85864c1 100644
--- a/python/rocketmq/v5/test/test_producer.py
+++ b/python/rocketmq/v5/test/test_producer.py
@@ -26,10 +26,16 @@ from rocketmq.v5.test import TestBase
 
 class TestNormalProducer(unittest.TestCase):
 
-    @patch.object(Producer, '_Producer__select_send_queue', 
return_value=TestBase.fake_queue(TestBase.FAKE_TOPIC_0))
-    @patch.object(RpcClient, 'send_message_async')
-    @patch.object(Client, '_Client__start_scheduler', return_value=None)
-    def test_send(self, mock_start_scheduler, mock_send_message_async, 
mock_select_send_queue):
+    @patch.object(
+        Producer,
+        "_Producer__select_send_queue",
+        return_value=TestBase.fake_queue(TestBase.FAKE_TOPIC_0),
+    )
+    @patch.object(RpcClient, "send_message_async")
+    @patch.object(Client, "_Client__start_scheduler", return_value=None)
+    def test_send(
+        self, mock_start_scheduler, mock_send_message_async, 
mock_select_send_queue
+    ):
         # mock send_message_async return future
         future = Future()
         future.set_result(TestBase.fake_send_success_response())
diff --git a/python/rocketmq/v5/util/client_id.py 
b/python/rocketmq/v5/util/client_id.py
index 44ef02f0..04054b2c 100644
--- a/python/rocketmq/v5/util/client_id.py
+++ b/python/rocketmq/v5/util/client_id.py
@@ -30,7 +30,15 @@ class ClientId:
         host_name = gethostname()
         process_id = getpid()
         base36_time = Misc.to_base36(time_ns())
-        self.__client_id = (host_name + ClientId.CLIENT_ID_SEPARATOR + 
str(process_id) + ClientId.CLIENT_ID_SEPARATOR + str(self.__client_index) + 
ClientId.CLIENT_ID_SEPARATOR + base36_time)
+        self.__client_id = (
+            host_name
+            + ClientId.CLIENT_ID_SEPARATOR
+            + str(process_id)
+            + ClientId.CLIENT_ID_SEPARATOR
+            + str(self.__client_index)
+            + ClientId.CLIENT_ID_SEPARATOR
+            + base36_time
+        )
 
     @property
     def client_id(self):
diff --git a/python/rocketmq/v5/util/message_id_codec.py 
b/python/rocketmq/v5/util/message_id_codec.py
index eec4f94a..a0b83b4c 100644
--- a/python/rocketmq/v5/util/message_id_codec.py
+++ b/python/rocketmq/v5/util/message_id_codec.py
@@ -74,17 +74,21 @@ class MessageIdCodec:
 
     def __init__(self):
         with MessageIdCodec._instance_lock:
-            if not hasattr(self, 'initialized'):
+            if not hasattr(self, "initialized"):
                 buffer = bytearray(8)
-                mac = getnode().to_bytes(6, byteorder='big')
+                mac = getnode().to_bytes(6, byteorder="big")
                 buffer[0:6] = mac
                 pid = getpid()
                 pid_buffer = bytearray(4)
-                pid_buffer[0:4] = pid.to_bytes(4, byteorder='big')
+                pid_buffer[0:4] = pid.to_bytes(4, byteorder="big")
                 buffer[6:8] = pid_buffer[2:4]
                 self.process_fixed_string_v1 = buffer.hex().upper()
                 self.seconds_since_custom_epoch = int(
-                    (datetime.now(timezone.utc) - datetime(2021, 1, 1, 0, 0, 
0, tzinfo=timezone.utc)).total_seconds())
+                    (
+                        datetime.now(timezone.utc)
+                        - datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
+                    ).total_seconds()
+                )
                 self.seconds_start_timestamp = int(time())
                 self.seconds = self.__delta_time()
                 self.sequence = None
@@ -95,9 +99,13 @@ class MessageIdCodec:
         if self.seconds != delta_seconds:
             self.seconds = delta_seconds
         buffer = bytearray(8)
-        buffer[0:4] = self.seconds.to_bytes(8, byteorder='big')[4:8]
-        buffer[4:8] = self.__sequence_id().to_bytes(4, byteorder='big')
-        return MessageIdCodec.MESSAGE_ID_VERSION_V1 + 
self.process_fixed_string_v1 + buffer.hex().upper()
+        buffer[0:4] = self.seconds.to_bytes(8, byteorder="big")[4:8]
+        buffer[4:8] = self.__sequence_id().to_bytes(4, byteorder="big")
+        return (
+            MessageIdCodec.MESSAGE_ID_VERSION_V1
+            + self.process_fixed_string_v1
+            + buffer.hex().upper()
+        )
 
     @staticmethod
     def decode(message_id):
@@ -106,7 +114,9 @@ class MessageIdCodec:
     """ private """
 
     def __delta_time(self):
-        return int(time()) - self.seconds_start_timestamp + 
self.seconds_since_custom_epoch
+        return (
+            int(time()) - self.seconds_start_timestamp + 
self.seconds_since_custom_epoch
+        )
 
     def __sequence_id(self):
         self.sequence = MessageIdCodec.__index.get_and_increment()
diff --git a/python/rocketmq/v5/util/messaging_result_checker.py 
b/python/rocketmq/v5/util/messaging_result_checker.py
index f8bc1ee0..4091397a 100644
--- a/python/rocketmq/v5/util/messaging_result_checker.py
+++ b/python/rocketmq/v5/util/messaging_result_checker.py
@@ -34,12 +34,27 @@ class MessagingResultChecker:
 
         if code == Code.OK or code == Code.MULTIPLE_RESULTS:
             return
-        elif code == Code.BAD_REQUEST or code == Code.ILLEGAL_ACCESS_POINT or 
code == Code.ILLEGAL_TOPIC \
-                or code == Code.ILLEGAL_CONSUMER_GROUP or code == 
Code.ILLEGAL_MESSAGE_TAG or code == Code.ILLEGAL_MESSAGE_KEY \
-                or code == Code.ILLEGAL_MESSAGE_GROUP or code == 
Code.ILLEGAL_MESSAGE_PROPERTY_KEY or code == Code.INVALID_TRANSACTION_ID \
-                or code == Code.ILLEGAL_MESSAGE_ID or code == 
Code.ILLEGAL_FILTER_EXPRESSION or code == Code.ILLEGAL_INVISIBLE_TIME \
-                or code == Code.ILLEGAL_DELIVERY_TIME or code == 
Code.INVALID_RECEIPT_HANDLE or code == Code.MESSAGE_PROPERTY_CONFLICT_WITH_TYPE 
\
-                or code == Code.UNRECOGNIZED_CLIENT_TYPE or code == 
Code.MESSAGE_CORRUPTED or code == Code.CLIENT_ID_REQUIRED or code == 
Code.ILLEGAL_POLLING_TIME:
+        elif (
+            code == Code.BAD_REQUEST
+            or code == Code.ILLEGAL_ACCESS_POINT
+            or code == Code.ILLEGAL_TOPIC
+            or code == Code.ILLEGAL_CONSUMER_GROUP
+            or code == Code.ILLEGAL_MESSAGE_TAG
+            or code == Code.ILLEGAL_MESSAGE_KEY
+            or code == Code.ILLEGAL_MESSAGE_GROUP
+            or code == Code.ILLEGAL_MESSAGE_PROPERTY_KEY
+            or code == Code.INVALID_TRANSACTION_ID
+            or code == Code.ILLEGAL_MESSAGE_ID
+            or code == Code.ILLEGAL_FILTER_EXPRESSION
+            or code == Code.ILLEGAL_INVISIBLE_TIME
+            or code == Code.ILLEGAL_DELIVERY_TIME
+            or code == Code.INVALID_RECEIPT_HANDLE
+            or code == Code.MESSAGE_PROPERTY_CONFLICT_WITH_TYPE
+            or code == Code.UNRECOGNIZED_CLIENT_TYPE
+            or code == Code.MESSAGE_CORRUPTED
+            or code == Code.CLIENT_ID_REQUIRED
+            or code == Code.ILLEGAL_POLLING_TIME
+        ):
             raise BadRequestException(message, code)
         elif code == Code.UNAUTHORIZED:
             raise UnauthorizedException(message, code)
@@ -49,19 +64,38 @@ class MessagingResultChecker:
             raise ForbiddenException(message, code)
         elif code == Code.MESSAGE_NOT_FOUND:
             return
-        elif code == Code.NOT_FOUND or code == Code.TOPIC_NOT_FOUND or code == 
Code.CONSUMER_GROUP_NOT_FOUND:
+        elif (
+            code == Code.NOT_FOUND
+            or code == Code.TOPIC_NOT_FOUND
+            or code == Code.CONSUMER_GROUP_NOT_FOUND
+        ):
             raise NotFoundException(message, code)
         elif code == Code.PAYLOAD_TOO_LARGE or code == 
Code.MESSAGE_BODY_TOO_LARGE:
             raise PayloadTooLargeException(message, code)
         elif code == Code.TOO_MANY_REQUESTS:
             raise TooManyRequestsException(message, code)
-        elif code == Code.REQUEST_HEADER_FIELDS_TOO_LARGE or code == 
Code.MESSAGE_PROPERTIES_TOO_LARGE:
+        elif (
+            code == Code.REQUEST_HEADER_FIELDS_TOO_LARGE
+            or code == Code.MESSAGE_PROPERTIES_TOO_LARGE
+        ):
             raise RequestHeaderFieldsTooLargeException(message, code)
-        elif code == Code.INTERNAL_ERROR or code == Code.INTERNAL_SERVER_ERROR 
or code == Code.HA_NOT_AVAILABLE:
+        elif (
+            code == Code.INTERNAL_ERROR
+            or code == Code.INTERNAL_SERVER_ERROR
+            or code == Code.HA_NOT_AVAILABLE
+        ):
             raise InternalErrorException(message, code)
-        elif code == Code.PROXY_TIMEOUT or code == 
Code.MASTER_PERSISTENCE_TIMEOUT or code == Code.SLAVE_PERSISTENCE_TIMEOUT:
+        elif (
+            code == Code.PROXY_TIMEOUT
+            or code == Code.MASTER_PERSISTENCE_TIMEOUT
+            or code == Code.SLAVE_PERSISTENCE_TIMEOUT
+        ):
             raise ProxyTimeoutException(message, code)
-        elif code == Code.UNSUPPORTED or code == Code.VERSION_UNSUPPORTED or 
code == Code.VERIFY_FIFO_MESSAGE_UNSUPPORTED:
+        elif (
+            code == Code.UNSUPPORTED
+            or code == Code.VERSION_UNSUPPORTED
+            or code == Code.VERIFY_FIFO_MESSAGE_UNSUPPORTED
+        ):
             raise UnsupportedException(message, code)
         else:
             logger.warn(f"unrecognized status code:{code}, message:{message}")
diff --git a/python/rocketmq/v5/util/misc.py b/python/rocketmq/v5/util/misc.py
index d371625f..b021b690 100644
--- a/python/rocketmq/v5/util/misc.py
+++ b/python/rocketmq/v5/util/misc.py
@@ -27,9 +27,9 @@ from rocketmq.v5.log import logger
 class Misc:
     __LOCAL_IP = None
     __OS_NAME = None
-    TOPIC_PATTERN = compile(r'^[%a-zA-Z0-9_-]+$')
-    CONSUMER_GROUP_PATTERN = compile(r'^[%a-zA-Z0-9_-]+$')
-    SDK_VERSION = "5.0.2"
+    TOPIC_PATTERN = compile(r"^[%a-zA-Z0-9_-]+$")
+    CONSUMER_GROUP_PATTERN = compile(r"^[%a-zA-Z0-9_-]+$")
+    SDK_VERSION = "5.0.3"
 
     @staticmethod
     def sdk_language():
@@ -41,33 +41,33 @@ class Misc:
 
     @staticmethod
     def to_base36(n):
-        chars = '0123456789abcdefghijklmnopqrstuvwxyz'
+        chars = "0123456789abcdefghijklmnopqrstuvwxyz"
         result = []
         if n == 0:
-            return '0'
+            return "0"
         while n > 0:
             n, r = divmod(n, 36)
             result.append(chars[r])
-        return ''.join(reversed(result))
+        return "".join(reversed(result))
 
     @staticmethod
     def get_local_ip():
         if Misc.__LOCAL_IP is None:
             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
             try:
-                s.connect(('8.8.8.8', 80))
+                s.connect(("8.8.8.8", 80))
                 Misc.__LOCAL_IP = s.getsockname()[0]
             except Exception as e:
                 logger.error(f"get local ip exception: {e}")
-                return '127.0.0.1'
+                return "127.0.0.1"
             finally:
                 s.close()
         return Misc.__LOCAL_IP
 
     @staticmethod
     def crc32_checksum(array):
-        crc32_value = zlib.crc32(array) & 0xffffffff
-        return format(crc32_value, '08X')
+        crc32_value = zlib.crc32(array) & 0xFFFFFFFF
+        return format(crc32_value, "08X")
 
     @staticmethod
     def md5_checksum(array):
@@ -83,7 +83,7 @@ class Misc:
 
     @staticmethod
     def uncompress_bytes_gzip(body):
-        if body and body[:2] == b'\x1f\x8b':
+        if body and body[:2] == b"\x1f\x8b":
             body = gzip.decompress(body)  # Standard Gzip format
         else:
             body = zlib.decompress(body)  # deflate zip
diff --git a/python/rocketmq/v5/util/signature.py 
b/python/rocketmq/v5/util/signature.py
index a23dd26b..f5983da7 100644
--- a/python/rocketmq/v5/util/signature.py
+++ b/python/rocketmq/v5/util/signature.py
@@ -30,19 +30,21 @@ class Signature:
         formatted_date_time = now.strftime("%Y%m%dT%H%M%SZ")
         request_id = str(uuid4())
         sign = Signature.sign(config.credentials.sk, formatted_date_time)
-        authorization = "MQv2-HMAC-SHA1" \
-                        + " " \
-                        + "Credential" \
-                        + "=" \
-                        + config.credentials.ak \
-                        + ", " \
-                        + "SignedHeaders" \
-                        + "=" \
-                        + "x-mq-date-time" \
-                        + ", " \
-                        + "Signature" \
-                        + "=" \
-                        + sign
+        authorization = (
+            "MQv2-HMAC-SHA1"
+            + " "
+            + "Credential"
+            + "="
+            + config.credentials.ak
+            + ", "
+            + "SignedHeaders"
+            + "="
+            + "x-mq-date-time"
+            + ", "
+            + "Signature"
+            + "="
+            + sign
+        )
         metadata = [
             ("x-mq-language", "PYTHON"),
             ("x-mq-protocol", "GRPC_V2"),
@@ -51,12 +53,12 @@ class Signature:
             ("x-mq-request-id", request_id),
             ("x-mq-client-id", client_id),
             ("x-mq-namespace", config.namespace),
-            ("authorization", authorization)
+            ("authorization", authorization),
         ]
         return metadata
 
     @staticmethod
     def sign(access_secret, date_time):
-        signing_key = access_secret.encode('utf-8')
-        mac = new(signing_key, date_time.encode('utf-8'), sha1)
-        return hexlify(mac.digest()).decode('utf-8')
+        signing_key = access_secret.encode("utf-8")
+        mac = new(signing_key, date_time.encode("utf-8"), sha1)
+        return hexlify(mac.digest()).decode("utf-8")
diff --git a/python/setup.py b/python/setup.py
index 6e8453f7..3881d6d7 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -17,7 +17,7 @@ from setuptools import find_packages, setup
 
 setup(
     name='rocketmq-python-client',
-    version='5.0.2',
+    version='5.0.3',
     packages=find_packages(),
     install_requires=[
         "grpcio>=1.5.0",


Reply via email to