This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new f3c288e3 5.0.4 (#950) transaction_check max workers
f3c288e3 is described below

commit f3c288e3816a407a62f4bc12f794bc33d56b9657
Author: zhouli11 <04081...@163.com>
AuthorDate: Tue Mar 4 11:07:18 2025 +0800

    5.0.4 (#950) transaction_check max workers
    
    * transaction_check max workers
---
 python/example/async_producer_example.py           | 10 ++-
 python/example/async_simple_consumer_example.py    | 14 +++-
 python/example/transaction_producer_example.py     | 18 ++++-
 python/rocketmq/v5/client/client.py                | 76 ++++++------------
 .../rocketmq/v5/client/connection/rpc_channel.py   |  2 +-
 python/rocketmq/v5/client/connection/rpc_client.py | 14 ----
 python/rocketmq/v5/consumer/simple_consumer.py     | 19 ++---
 python/rocketmq/v5/log/log_config.py               |  2 +-
 python/rocketmq/v5/model/message.py                |  4 +
 python/rocketmq/v5/producer/producer.py            | 91 +++++++++-------------
 python/rocketmq/v5/util/misc.py                    |  2 +-
 python/setup.py                                    |  2 +-
 12 files changed, 109 insertions(+), 145 deletions(-)

diff --git a/python/example/async_producer_example.py 
b/python/example/async_producer_example.py
index 8f3c986e..fd5e5a08 100644
--- a/python/example/async_producer_example.py
+++ b/python/example/async_producer_example.py
@@ -18,10 +18,11 @@ from rocketmq import ClientConfiguration, Credentials, 
Message, Producer
 
 def handle_send_result(result_future):
     try:
+        # don't write time-consuming code in the callback. if needed, use 
other thread
         res = result_future.result()
-        print(f"async send message success, {res}")
+        print(f"send message success, {res}")
     except Exception as exception:
-        print(f"async send message failed, raise exception: {exception}")
+        print(f"send message failed, raise exception: {exception}")
 
 
 if __name__ == '__main__':
@@ -46,8 +47,11 @@ if __name__ == '__main__':
                 send_result_future = producer.send_async(msg)
                 send_result_future.add_done_callback(handle_send_result)
         except Exception as e:
-            print(f"async producer{producer.__str__()} send message raise 
exception: {e}")
+            print(f"producer{producer.__str__()} send message raise exception: 
{e}")
             producer.shutdown()
     except Exception as e:
         print(f"{producer.__str__()} startup raise exception: {e}")
         producer.shutdown()
+
+    input("Please Enter to Stop the Application.")
+    producer.shutdown()
diff --git a/python/example/async_simple_consumer_example.py 
b/python/example/async_simple_consumer_example.py
index eed0630d..5b341d5d 100644
--- a/python/example/async_simple_consumer_example.py
+++ b/python/example/async_simple_consumer_example.py
@@ -15,17 +15,27 @@
 
 import functools
 import time
+from concurrent.futures.thread import ThreadPoolExecutor
 
 from rocketmq import ClientConfiguration, Credentials, SimpleConsumer
 
+consume_executor = ThreadPoolExecutor(max_workers=2, 
thread_name_prefix="consume-message")
+
+
+def consume_message(consumer, message):
+    try:
+        consumer.ack(message)
+        print(f"ack message:{message.message_id}.")
+    except Exception as exception:
+        print(f"consume message raise exception: {exception}")
+
 
 def receive_callback(receive_result_future, consumer):
     messages = receive_result_future.result()
     print(f"{consumer.__str__()} receive {len(messages)} messages.")
     for msg in messages:
         try:
-            consumer.ack(msg)
-            print(f"ack message:{msg.message_id}.")
+            consume_executor.submit(consume_message, consumer=consumer, 
message=msg)
         except Exception as exception:
             print(f"receive message raise exception: {exception}")
 
diff --git a/python/example/transaction_producer_example.py 
b/python/example/transaction_producer_example.py
index c922f2e7..fa0abdd3 100644
--- a/python/example/transaction_producer_example.py
+++ b/python/example/transaction_producer_example.py
@@ -20,7 +20,7 @@ from rocketmq import (ClientConfiguration, Credentials, 
Message, Producer,
 class TestChecker(TransactionChecker):
 
     def check(self, message: Message) -> TransactionResolution:
-        print(f"do TestChecker check. message_id: {message.message_id}, commit 
message.")
+        print(f"do TestChecker check, topic:{message.topic}, message_id: 
{message.message_id}, commit message.")
         return TransactionResolution.COMMIT
 
 
@@ -31,7 +31,8 @@ if __name__ == '__main__':
     # credentials = Credentials("ak", "sk")
     config = ClientConfiguration(endpoints, credentials)
     topic = "topic"
-    producer = Producer(config, (topic,))
+    check_from_server = True  # commit message from server check
+    producer = Producer(config, (topic,), checker=TestChecker())
 
     try:
         producer.startup()
@@ -47,6 +48,17 @@ if __name__ == '__main__':
         msg.keys = "send_transaction"
         msg.add_property("send", "transaction")
         res = producer.send(msg, transaction)
-        print(f"transaction producer{producer.__str__()} send half message 
success. {res}")
+        print(f"send message: {res}")
+        if check_from_server:
+            input("Please Enter to Stop the Application.\r\n")
+            producer.shutdown()
+        else:
+            # producer directly commit or rollback
+            transaction.commit()
+            print(f"producer commit message:{transaction.message_id}")
+            # transaction.rollback()
+            # print(f"producer rollback message:{transaction.message_id}")
+            producer.shutdown()
     except Exception as e:
         print(f"transaction producer{producer.__str__()} example raise 
exception: {e}")
+        producer.shutdown()
diff --git a/python/rocketmq/v5/client/client.py 
b/python/rocketmq/v5/client/client.py
index b6c40f93..d039a9a0 100644
--- a/python/rocketmq/v5/client/client.py
+++ b/python/rocketmq/v5/client/client.py
@@ -15,9 +15,10 @@
 
 import asyncio
 import functools
+import os
 import threading
 from asyncio import InvalidStateError
-from queue import Queue
+from concurrent.futures import ThreadPoolExecutor
 
 from grpc.aio import AioRpcError
 from rocketmq.grpc_protocol import ClientType, Code, QueryRouteRequest
@@ -26,16 +27,13 @@ from rocketmq.v5.client.metrics import ClientMetrics
 from rocketmq.v5.exception import (IllegalArgumentException,
                                    IllegalStateException)
 from rocketmq.v5.log import logger
-from rocketmq.v5.model import (CallbackResult, CallbackResultType,
-                               TopicRouteData)
+from rocketmq.v5.model import TopicRouteData
 from rocketmq.v5.util import (ClientId, ConcurrentMap, MessagingResultChecker,
                               Misc, Signature)
 
 
 class Client:
 
-    CALLBACK_THREADS_NUM = 5
-
     def __init__(
         self, client_configuration, topics, client_type: ClientType, 
tls_enable=False
     ):
@@ -62,8 +60,7 @@ class Client:
             )
         else:
             self.__topics = set()
-        self.__callback_result_queue = Queue()
-        self.__callback_threads = []
+        self.__client_callback_executor = None
         self.__is_running = False
         self.__client_thread_task_enabled = False
         self.__had_shutdown = False
@@ -268,45 +265,23 @@ class Client:
     """ callback handler for async method """
 
     def __start_async_rpc_callback_handler(self):
-        # a thread to handle callback when using async method such as 
send_async(), receive_async().
-        # this handler switches user's callback thread from RpcClient's 
_io_loop_thread to client's callback_handler_thread
+        # to handle callback when using async method such as send_async(), 
receive_async().
+        # switches user's callback thread from RpcClient's _io_loop_thread to 
client's client_callback_worker_thread
         try:
-            for i in range(Client.CALLBACK_THREADS_NUM):
-                th = threading.Thread(
-                    name=f"callback_handler_thread-{i}", 
target=self.__handle_callback
-                )
-                th.daemon = True
-                self.__callback_threads.append(th)
-                th.start()
-                logger.info(
-                    f"{self.__str__()} start async rpc callback thread:{th} 
success."
-                )
+            workers = os.cpu_count()
+            self.__client_callback_executor = 
ThreadPoolExecutor(max_workers=workers,
+                                                                 
thread_name_prefix=f"client_callback_worker-{self.__client_id}")
+            logger.info(f"{self.__str__()} start callback executor success. 
max_workers:{workers}")
         except Exception as e:
             print(f"{self.__str__()} start async rpc callback raise exception: 
{e}")
             raise e
 
-    def __handle_callback(self):
-        while True:
-            if self.__client_thread_task_enabled is True:
-                callback_result = self.__callback_result_queue.get()
-                if (
-                    callback_result.result_type
-                    == CallbackResultType.END_CALLBACK_THREAD_RESULT
-                ):
-                    # end infinite loop when client shutdown
-                    self.__callback_result_queue.task_done()
-                    break
-                else:
-                    if callback_result.is_success:
-                        
callback_result.future.set_result(callback_result.result)
-                    else:
-                        
callback_result.future.set_exception(callback_result.result)
-                    self.__callback_result_queue.task_done()
-            else:
-                break
-        logger.info(
-            f"{self.__str__()} stop client callback result handler 
thread:{threading.current_thread()} success."
-        )
+    @staticmethod
+    def __handle_callback(callback_result):
+        if callback_result.is_success:
+            callback_result.future.set_result(callback_result.result)
+        else:
+            callback_result.future.set_exception(callback_result.result)
 
     """ protect """
 
@@ -330,13 +305,12 @@ class Client:
     def _sign(self):
         return Signature.metadata(self.__client_configuration, 
self.__client_id)
 
-    def _set_future_callback_result(self, callback_result):
-        if self.__callback_result_queue is not None:
-            self.__callback_result_queue.put_nowait(callback_result)
-
     def _rpc_channel_io_loop(self):
         return self.__rpc_client.get_channel_io_loop()
 
+    def _submit_callback(self, callback_result):
+        self.__client_callback_executor.submit(Client.__handle_callback, 
callback_result)
+
     """ private """
 
     # topic route #
@@ -555,13 +529,8 @@ class Client:
                 self.__clear_idle_rpc_channels_threading_event.set()
                 self.__clear_idle_rpc_channels_scheduler.join()
 
-        for i in range(Client.CALLBACK_THREADS_NUM):
-            self._set_future_callback_result(
-                CallbackResult.end_callback_thread_result()
-            )
-
-        for i in range(Client.CALLBACK_THREADS_NUM):
-            self.__callback_threads[i].join()
+        if self.__client_callback_executor is not None:
+            self.__client_callback_executor.shutdown()
 
         self.__topic_route_scheduler = None
         self.__topic_route_scheduler_threading_event = None
@@ -571,8 +540,7 @@ class Client:
         self.__sync_setting_scheduler_threading_event = None
         self.__clear_idle_rpc_channels_scheduler = None
         self.__clear_idle_rpc_channels_threading_event = None
-        self.__callback_result_queue = None
-        self.__callback_threads = None
+        self.__client_callback_executor = None
 
     """ property """
 
diff --git a/python/rocketmq/v5/client/connection/rpc_channel.py 
b/python/rocketmq/v5/client/connection/rpc_channel.py
index 246e102b..1783026e 100644
--- a/python/rocketmq/v5/client/connection/rpc_channel.py
+++ b/python/rocketmq/v5/client/connection/rpc_channel.py
@@ -146,7 +146,7 @@ class RpcStreamStreamCall:
                                 
res.recover_orphaned_transaction_command.transaction_id
                             )
                             message = 
res.recover_orphaned_transaction_command.message
-                            await 
self.__handler.on_recover_orphaned_transaction_command(
+                            
self.__handler.on_recover_orphaned_transaction_command(
                                 self.__endpoints, message, transaction_id
                             )
             except AioRpcError as e:
diff --git a/python/rocketmq/v5/client/connection/rpc_client.py 
b/python/rocketmq/v5/client/connection/rpc_client.py
index 77e6473c..92b0fafb 100644
--- a/python/rocketmq/v5/client/connection/rpc_client.py
+++ b/python/rocketmq/v5/client/connection/rpc_client.py
@@ -179,20 +179,6 @@ class RpcClient:
             )
         )
 
-    def end_transaction_for_server_check(
-        self, endpoints: RpcEndpoints, req: EndTransactionRequest, metadata, 
timeout=3
-    ):
-        # assert asyncio.get_running_loop() == RpcClient._io_loop
-        try:
-            return self.__end_transaction_0(
-                endpoints, req, metadata=metadata, timeout=timeout
-            )
-        except Exception as e:
-            logger.error(
-                f"end transaction exception, topic:{req.topic.name}, 
message_id:{req.message_id}, transaction_id:{req.transaction_id}: {e}"
-            )
-            raise e
-
     """ build stream_stream_call """
 
     def telemetry_stream(
diff --git a/python/rocketmq/v5/consumer/simple_consumer.py 
b/python/rocketmq/v5/consumer/simple_consumer.py
index 6b1483db..638416d6 100644
--- a/python/rocketmq/v5/consumer/simple_consumer.py
+++ b/python/rocketmq/v5/consumer/simple_consumer.py
@@ -309,11 +309,11 @@ class SimpleConsumer(Client):
         try:
             responses = future.result()
             messages = self.__handle_receive_message_response(responses)
-            self._set_future_callback_result(
+            self._submit_callback(
                 CallbackResult.async_receive_callback_result(ret_future, 
messages)
             )
         except Exception as e:
-            self._set_future_callback_result(
+            self._submit_callback(
                 CallbackResult.async_receive_callback_result(ret_future, e, 
False)
             )
 
@@ -409,14 +409,14 @@ class SimpleConsumer(Client):
             )
             MessagingResultChecker.check(res.status)
             if ret_future is not None:
-                self._set_future_callback_result(
+                self._submit_callback(
                     CallbackResult.async_ack_callback_result(ret_future, None)
                 )
         except Exception as e:
             if ret_future is None:
                 raise e
             else:
-                self._set_future_callback_result(
+                self._submit_callback(
                     CallbackResult.async_ack_callback_result(ret_future, e, 
False)
                 )
 
@@ -434,7 +434,7 @@ class SimpleConsumer(Client):
                 metadata=self._sign(),
                 timeout=self.client_configuration.request_timeout,
             )
-            self.__handle_change_invisible_result(future)
+            self.__handle_change_invisible_result(future, message)
         except Exception as e:
             raise e
 
@@ -454,7 +454,7 @@ class SimpleConsumer(Client):
             )
             ret_future = Future()
             change_invisible_callback = functools.partial(
-                self.__handle_change_invisible_result, ret_future=ret_future
+                self.__handle_change_invisible_result, message=message, 
ret_future=ret_future
             )
             future.add_done_callback(change_invisible_callback)
             return ret_future
@@ -472,15 +472,16 @@ class SimpleConsumer(Client):
         req.message_id = message.message_id
         return req
 
-    def __handle_change_invisible_result(self, future, ret_future=None):
+    def __handle_change_invisible_result(self, future, message, 
ret_future=None):
         try:
             res = future.result()
             logger.debug(
                 f"consumer[{self.__consumer_group}] change invisible response, 
{res.status}"
             )
+            message.receipt_handle = res.receipt_handle
             MessagingResultChecker.check(res.status)
             if ret_future is not None:
-                self._set_future_callback_result(
+                self._submit_callback(
                     
CallbackResult.async_change_invisible_duration_callback_result(
                         ret_future, None
                     )
@@ -489,7 +490,7 @@ class SimpleConsumer(Client):
             if ret_future is None:
                 raise e
             else:
-                self._set_future_callback_result(
+                self._submit_callback(
                     
CallbackResult.async_change_invisible_duration_callback_result(
                         ret_future, e, False
                     )
diff --git a/python/rocketmq/v5/log/log_config.py 
b/python/rocketmq/v5/log/log_config.py
index a363c650..f179a89f 100644
--- a/python/rocketmq/v5/log/log_config.py
+++ b/python/rocketmq/v5/log/log_config.py
@@ -32,7 +32,7 @@ __LOG_CONFIG = {
         # },
         "file": {
             "class": "logging.handlers.RotatingFileHandler",
-            "level": "INFO",
+            "level": "DEBUG",
             "formatter": "standard",
             "filename": f"{__DIR}/rocketmq_client.log",
             "maxBytes": 1024 * 1024 * 100,  # 100MB
diff --git a/python/rocketmq/v5/model/message.py 
b/python/rocketmq/v5/model/message.py
index ead7d795..dee7dad4 100644
--- a/python/rocketmq/v5/model/message.py
+++ b/python/rocketmq/v5/model/message.py
@@ -219,6 +219,10 @@ class Message:
                 raise IllegalArgumentException("key should not be blank")
         self.__keys.update(set(keys))
 
+    @receipt_handle.setter
+    def receipt_handle(self, receipt_handle):
+        self.__receipt_handle = receipt_handle
+
     @message_type.setter
     def message_type(self, message_type):
         self.__message_type = message_type
diff --git a/python/rocketmq/v5/producer/producer.py 
b/python/rocketmq/v5/producer/producer.py
index ba9e3d9b..2a9bba70 100644
--- a/python/rocketmq/v5/producer/producer.py
+++ b/python/rocketmq/v5/producer/producer.py
@@ -17,7 +17,7 @@ import abc
 import functools
 import threading
 import time
-from concurrent.futures import Future
+from concurrent.futures import Future, ThreadPoolExecutor
 
 from rocketmq.grpc_protocol import (ClientType, Code, Encoding,
                                     EndTransactionRequest, HeartbeatRequest,
@@ -136,6 +136,7 @@ class Producer(Client):
         self.__checker = (
             checker  # checker for transaction message, handle checking from 
server
         )
+        self.__transaction_check_executor = ThreadPoolExecutor(max_workers=1, 
thread_name_prefix="transaction_check_worker")
 
     def __str__(self):
         return f"{ClientType.Name(self.client_type)} 
client_id:{self.client_id}"
@@ -223,7 +224,7 @@ class Producer(Client):
         )
         return future.result()
 
-    async def on_recover_orphaned_transaction_command(
+    def on_recover_orphaned_transaction_command(
         self, endpoints, msg, transaction_id
     ):
         # call this function from server side stream, in RpcClient._io_loop
@@ -236,28 +237,7 @@ class Producer(Client):
             if self.__checker is None:
                 raise IllegalArgumentException("No transaction checker 
registered.")
             message = Message().fromProtobuf(msg)
-            result = self.__checker.check(message)
-
-            if result == TransactionResolution.COMMIT:
-                res = await self.__commit_for_server_check(
-                    endpoints,
-                    message,
-                    transaction_id,
-                    TransactionSource.SOURCE_SERVER_CHECK,
-                )
-                logger.debug(
-                    f"commit message. message_id: {message.message_id}, 
transaction_id: {transaction_id}, res: {res}"
-                )
-            elif result == TransactionResolution.ROLLBACK:
-                res = await self.__rollback_for_server_check(
-                    endpoints,
-                    message,
-                    transaction_id,
-                    TransactionSource.SOURCE_SERVER_CHECK,
-                )
-                logger.debug(
-                    f"rollback message. message_id: {message.message_id}, 
transaction_id: {transaction_id}, res: {res}"
-                )
+            
self.__transaction_check_executor.submit(self.__server_transaction_check, 
endpoints, message, transaction_id)
         except Exception as e:
             logger.error(f"on_recover_orphaned_transaction_command exception: 
{e}")
 
@@ -313,6 +293,8 @@ class Producer(Client):
 
     def shutdown(self):
         logger.info(f"begin to shutdown {self.__str__()}")
+        self.__transaction_check_executor.shutdown()
+        self.__transaction_check_executor = None
         super().shutdown()
         logger.info(f"shutdown {self.__str__()} success.")
 
@@ -395,7 +377,7 @@ class Producer(Client):
                 send_message_future, topic_queue
             )
             self.client_metrics.send_after(send_metric_context, True)
-            self._set_future_callback_result(
+            self._submit_callback(
                 CallbackResult.async_send_callback_result(ret_future, 
send_receipt)
             )
         except Exception as e:
@@ -406,7 +388,7 @@ class Producer(Client):
             if retry_exception_future is not None:
                 # end retry with exception
                 self.client_metrics.send_after(send_metric_context, False)
-                self._set_future_callback_result(
+                self._submit_callback(
                     CallbackResult.async_send_callback_result(
                         ret_future, retry_exception_future.exception(), False
                     )
@@ -436,13 +418,6 @@ class Producer(Client):
             )
             end_retry = True
 
-        # no need more attempts for transactional message
-        if message.message_type == MessageType.TRANSACTION:
-            logger.error(
-                f"{self.__str__()} failed to send message to 
{topic_queue.endpoints.__str__()}, topic:{message.topic}, 
message_id:{message.message_id}, message_type:{message.message_type} 
,attempt:{attempt}"
-            )
-            end_retry = True
-
         # end retry if system busy
         if isinstance(e, TooManyRequestsException):
             logger.error(
@@ -547,27 +522,31 @@ class Producer(Client):
         req.source = source
         return req
 
-    def __commit_for_server_check(
-        self, endpoints, message: Message, transaction_id, source
-    ):
-        return self.__end_transaction_for_server_check(
-            endpoints, message, transaction_id, TransactionResolution.COMMIT, 
source
-        )
-
-    def __rollback_for_server_check(
-        self, endpoints, message: Message, transaction_id, source
-    ):
-        return self.__end_transaction_for_server_check(
-            endpoints, message, transaction_id, 
TransactionResolution.ROLLBACK, source
-        )
+    def __server_transaction_check_callback(self, future, message, 
transaction_id, result):
+        try:
+            res = future.result()
+            if res is not None and res.status.code == Code.OK:
+                if result == TransactionResolution.COMMIT:
+                    logger.debug(
+                        f"{self.__str__()} commit message. message_id: 
{message.message_id}, transaction_id: {transaction_id}, res: {res}"
+                    )
+                elif result == TransactionResolution.ROLLBACK:
+                    logger.debug(
+                        f"{self.__str__()} rollback message. message_id: 
{message.message_id}, transaction_id: {transaction_id}, res: {res}"
+                    )
+            else:
+                if result == TransactionResolution.COMMIT:
+                    raise Exception(f"{self.__str__()} commit message: 
{message.message_id} raise exception")
+                elif result == TransactionResolution.ROLLBACK:
+                    raise Exception(f"{self.__str__()} rollback message: 
{message.message_id} raise exception")
+        except Exception as e:
+            logger.error(f"server transaction check raise exception, {e}")
 
-    def __end_transaction_for_server_check(
-        self, endpoints, message: Message, transaction_id, result, source
-    ):
-        req = self.__end_transaction_req(message, transaction_id, result, 
source)
-        return self.rpc_client.end_transaction_for_server_check(
-            endpoints,
-            req,
-            metadata=self._sign(),
-            timeout=self.client_configuration.request_timeout,
-        )
+    def __server_transaction_check(self, endpoints, message, transaction_id):
+        try:
+            result = self.__checker.check(message)
+            req = self.__end_transaction_req(message, transaction_id, result, 
TransactionSource.SOURCE_SERVER_CHECK)
+            future = self.rpc_client.end_transaction_async(endpoints, req, 
metadata=self._sign(), timeout=self.client_configuration.request_timeout)
+            
future.add_done_callback(functools.partial(self.__server_transaction_check_callback,
 message=message, transaction_id=transaction_id, result=result))
+        except Exception as e:
+            raise e
diff --git a/python/rocketmq/v5/util/misc.py b/python/rocketmq/v5/util/misc.py
index b021b690..c0c00092 100644
--- a/python/rocketmq/v5/util/misc.py
+++ b/python/rocketmq/v5/util/misc.py
@@ -29,7 +29,7 @@ class Misc:
     __OS_NAME = None
     TOPIC_PATTERN = compile(r"^[%a-zA-Z0-9_-]+$")
     CONSUMER_GROUP_PATTERN = compile(r"^[%a-zA-Z0-9_-]+$")
-    SDK_VERSION = "5.0.3"
+    SDK_VERSION = "5.0.4"
 
     @staticmethod
     def sdk_language():
diff --git a/python/setup.py b/python/setup.py
index 3881d6d7..f869f7ee 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -17,7 +17,7 @@ from setuptools import find_packages, setup
 
 setup(
     name='rocketmq-python-client',
-    version='5.0.3',
+    version='5.0.4',
     packages=find_packages(),
     install_requires=[
         "grpcio>=1.5.0",

Reply via email to