This is an automated email from the ASF dual-hosted git repository. lollipop pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push: new f3c288e3 5.0.4 (#950) transaction_check max workers f3c288e3 is described below commit f3c288e3816a407a62f4bc12f794bc33d56b9657 Author: zhouli11 <04081...@163.com> AuthorDate: Tue Mar 4 11:07:18 2025 +0800 5.0.4 (#950) transaction_check max workers * transaction_check max workers --- python/example/async_producer_example.py | 10 ++- python/example/async_simple_consumer_example.py | 14 +++- python/example/transaction_producer_example.py | 18 ++++- python/rocketmq/v5/client/client.py | 76 ++++++------------ .../rocketmq/v5/client/connection/rpc_channel.py | 2 +- python/rocketmq/v5/client/connection/rpc_client.py | 14 ---- python/rocketmq/v5/consumer/simple_consumer.py | 19 ++--- python/rocketmq/v5/log/log_config.py | 2 +- python/rocketmq/v5/model/message.py | 4 + python/rocketmq/v5/producer/producer.py | 91 +++++++++------------- python/rocketmq/v5/util/misc.py | 2 +- python/setup.py | 2 +- 12 files changed, 109 insertions(+), 145 deletions(-) diff --git a/python/example/async_producer_example.py b/python/example/async_producer_example.py index 8f3c986e..fd5e5a08 100644 --- a/python/example/async_producer_example.py +++ b/python/example/async_producer_example.py @@ -18,10 +18,11 @@ from rocketmq import ClientConfiguration, Credentials, Message, Producer def handle_send_result(result_future): try: + # don't write time-consuming code in the callback. if needed, use other thread res = result_future.result() - print(f"async send message success, {res}") + print(f"send message success, {res}") except Exception as exception: - print(f"async send message failed, raise exception: {exception}") + print(f"send message failed, raise exception: {exception}") if __name__ == '__main__': @@ -46,8 +47,11 @@ if __name__ == '__main__': send_result_future = producer.send_async(msg) send_result_future.add_done_callback(handle_send_result) except Exception as e: - print(f"async producer{producer.__str__()} send message raise exception: {e}") + print(f"producer{producer.__str__()} send message raise exception: {e}") producer.shutdown() except Exception as e: print(f"{producer.__str__()} startup raise exception: {e}") producer.shutdown() + + input("Please Enter to Stop the Application.") + producer.shutdown() diff --git a/python/example/async_simple_consumer_example.py b/python/example/async_simple_consumer_example.py index eed0630d..5b341d5d 100644 --- a/python/example/async_simple_consumer_example.py +++ b/python/example/async_simple_consumer_example.py @@ -15,17 +15,27 @@ import functools import time +from concurrent.futures.thread import ThreadPoolExecutor from rocketmq import ClientConfiguration, Credentials, SimpleConsumer +consume_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="consume-message") + + +def consume_message(consumer, message): + try: + consumer.ack(message) + print(f"ack message:{message.message_id}.") + except Exception as exception: + print(f"consume message raise exception: {exception}") + def receive_callback(receive_result_future, consumer): messages = receive_result_future.result() print(f"{consumer.__str__()} receive {len(messages)} messages.") for msg in messages: try: - consumer.ack(msg) - print(f"ack message:{msg.message_id}.") + consume_executor.submit(consume_message, consumer=consumer, message=msg) except Exception as exception: print(f"receive message raise exception: {exception}") diff --git a/python/example/transaction_producer_example.py b/python/example/transaction_producer_example.py index c922f2e7..fa0abdd3 100644 --- a/python/example/transaction_producer_example.py +++ b/python/example/transaction_producer_example.py @@ -20,7 +20,7 @@ from rocketmq import (ClientConfiguration, Credentials, Message, Producer, class TestChecker(TransactionChecker): def check(self, message: Message) -> TransactionResolution: - print(f"do TestChecker check. message_id: {message.message_id}, commit message.") + print(f"do TestChecker check, topic:{message.topic}, message_id: {message.message_id}, commit message.") return TransactionResolution.COMMIT @@ -31,7 +31,8 @@ if __name__ == '__main__': # credentials = Credentials("ak", "sk") config = ClientConfiguration(endpoints, credentials) topic = "topic" - producer = Producer(config, (topic,)) + check_from_server = True # commit message from server check + producer = Producer(config, (topic,), checker=TestChecker()) try: producer.startup() @@ -47,6 +48,17 @@ if __name__ == '__main__': msg.keys = "send_transaction" msg.add_property("send", "transaction") res = producer.send(msg, transaction) - print(f"transaction producer{producer.__str__()} send half message success. {res}") + print(f"send message: {res}") + if check_from_server: + input("Please Enter to Stop the Application.\r\n") + producer.shutdown() + else: + # producer directly commit or rollback + transaction.commit() + print(f"producer commit message:{transaction.message_id}") + # transaction.rollback() + # print(f"producer rollback message:{transaction.message_id}") + producer.shutdown() except Exception as e: print(f"transaction producer{producer.__str__()} example raise exception: {e}") + producer.shutdown() diff --git a/python/rocketmq/v5/client/client.py b/python/rocketmq/v5/client/client.py index b6c40f93..d039a9a0 100644 --- a/python/rocketmq/v5/client/client.py +++ b/python/rocketmq/v5/client/client.py @@ -15,9 +15,10 @@ import asyncio import functools +import os import threading from asyncio import InvalidStateError -from queue import Queue +from concurrent.futures import ThreadPoolExecutor from grpc.aio import AioRpcError from rocketmq.grpc_protocol import ClientType, Code, QueryRouteRequest @@ -26,16 +27,13 @@ from rocketmq.v5.client.metrics import ClientMetrics from rocketmq.v5.exception import (IllegalArgumentException, IllegalStateException) from rocketmq.v5.log import logger -from rocketmq.v5.model import (CallbackResult, CallbackResultType, - TopicRouteData) +from rocketmq.v5.model import TopicRouteData from rocketmq.v5.util import (ClientId, ConcurrentMap, MessagingResultChecker, Misc, Signature) class Client: - CALLBACK_THREADS_NUM = 5 - def __init__( self, client_configuration, topics, client_type: ClientType, tls_enable=False ): @@ -62,8 +60,7 @@ class Client: ) else: self.__topics = set() - self.__callback_result_queue = Queue() - self.__callback_threads = [] + self.__client_callback_executor = None self.__is_running = False self.__client_thread_task_enabled = False self.__had_shutdown = False @@ -268,45 +265,23 @@ class Client: """ callback handler for async method """ def __start_async_rpc_callback_handler(self): - # a thread to handle callback when using async method such as send_async(), receive_async(). - # this handler switches user's callback thread from RpcClient's _io_loop_thread to client's callback_handler_thread + # to handle callback when using async method such as send_async(), receive_async(). + # switches user's callback thread from RpcClient's _io_loop_thread to client's client_callback_worker_thread try: - for i in range(Client.CALLBACK_THREADS_NUM): - th = threading.Thread( - name=f"callback_handler_thread-{i}", target=self.__handle_callback - ) - th.daemon = True - self.__callback_threads.append(th) - th.start() - logger.info( - f"{self.__str__()} start async rpc callback thread:{th} success." - ) + workers = os.cpu_count() + self.__client_callback_executor = ThreadPoolExecutor(max_workers=workers, + thread_name_prefix=f"client_callback_worker-{self.__client_id}") + logger.info(f"{self.__str__()} start callback executor success. max_workers:{workers}") except Exception as e: print(f"{self.__str__()} start async rpc callback raise exception: {e}") raise e - def __handle_callback(self): - while True: - if self.__client_thread_task_enabled is True: - callback_result = self.__callback_result_queue.get() - if ( - callback_result.result_type - == CallbackResultType.END_CALLBACK_THREAD_RESULT - ): - # end infinite loop when client shutdown - self.__callback_result_queue.task_done() - break - else: - if callback_result.is_success: - callback_result.future.set_result(callback_result.result) - else: - callback_result.future.set_exception(callback_result.result) - self.__callback_result_queue.task_done() - else: - break - logger.info( - f"{self.__str__()} stop client callback result handler thread:{threading.current_thread()} success." - ) + @staticmethod + def __handle_callback(callback_result): + if callback_result.is_success: + callback_result.future.set_result(callback_result.result) + else: + callback_result.future.set_exception(callback_result.result) """ protect """ @@ -330,13 +305,12 @@ class Client: def _sign(self): return Signature.metadata(self.__client_configuration, self.__client_id) - def _set_future_callback_result(self, callback_result): - if self.__callback_result_queue is not None: - self.__callback_result_queue.put_nowait(callback_result) - def _rpc_channel_io_loop(self): return self.__rpc_client.get_channel_io_loop() + def _submit_callback(self, callback_result): + self.__client_callback_executor.submit(Client.__handle_callback, callback_result) + """ private """ # topic route # @@ -555,13 +529,8 @@ class Client: self.__clear_idle_rpc_channels_threading_event.set() self.__clear_idle_rpc_channels_scheduler.join() - for i in range(Client.CALLBACK_THREADS_NUM): - self._set_future_callback_result( - CallbackResult.end_callback_thread_result() - ) - - for i in range(Client.CALLBACK_THREADS_NUM): - self.__callback_threads[i].join() + if self.__client_callback_executor is not None: + self.__client_callback_executor.shutdown() self.__topic_route_scheduler = None self.__topic_route_scheduler_threading_event = None @@ -571,8 +540,7 @@ class Client: self.__sync_setting_scheduler_threading_event = None self.__clear_idle_rpc_channels_scheduler = None self.__clear_idle_rpc_channels_threading_event = None - self.__callback_result_queue = None - self.__callback_threads = None + self.__client_callback_executor = None """ property """ diff --git a/python/rocketmq/v5/client/connection/rpc_channel.py b/python/rocketmq/v5/client/connection/rpc_channel.py index 246e102b..1783026e 100644 --- a/python/rocketmq/v5/client/connection/rpc_channel.py +++ b/python/rocketmq/v5/client/connection/rpc_channel.py @@ -146,7 +146,7 @@ class RpcStreamStreamCall: res.recover_orphaned_transaction_command.transaction_id ) message = res.recover_orphaned_transaction_command.message - await self.__handler.on_recover_orphaned_transaction_command( + self.__handler.on_recover_orphaned_transaction_command( self.__endpoints, message, transaction_id ) except AioRpcError as e: diff --git a/python/rocketmq/v5/client/connection/rpc_client.py b/python/rocketmq/v5/client/connection/rpc_client.py index 77e6473c..92b0fafb 100644 --- a/python/rocketmq/v5/client/connection/rpc_client.py +++ b/python/rocketmq/v5/client/connection/rpc_client.py @@ -179,20 +179,6 @@ class RpcClient: ) ) - def end_transaction_for_server_check( - self, endpoints: RpcEndpoints, req: EndTransactionRequest, metadata, timeout=3 - ): - # assert asyncio.get_running_loop() == RpcClient._io_loop - try: - return self.__end_transaction_0( - endpoints, req, metadata=metadata, timeout=timeout - ) - except Exception as e: - logger.error( - f"end transaction exception, topic:{req.topic.name}, message_id:{req.message_id}, transaction_id:{req.transaction_id}: {e}" - ) - raise e - """ build stream_stream_call """ def telemetry_stream( diff --git a/python/rocketmq/v5/consumer/simple_consumer.py b/python/rocketmq/v5/consumer/simple_consumer.py index 6b1483db..638416d6 100644 --- a/python/rocketmq/v5/consumer/simple_consumer.py +++ b/python/rocketmq/v5/consumer/simple_consumer.py @@ -309,11 +309,11 @@ class SimpleConsumer(Client): try: responses = future.result() messages = self.__handle_receive_message_response(responses) - self._set_future_callback_result( + self._submit_callback( CallbackResult.async_receive_callback_result(ret_future, messages) ) except Exception as e: - self._set_future_callback_result( + self._submit_callback( CallbackResult.async_receive_callback_result(ret_future, e, False) ) @@ -409,14 +409,14 @@ class SimpleConsumer(Client): ) MessagingResultChecker.check(res.status) if ret_future is not None: - self._set_future_callback_result( + self._submit_callback( CallbackResult.async_ack_callback_result(ret_future, None) ) except Exception as e: if ret_future is None: raise e else: - self._set_future_callback_result( + self._submit_callback( CallbackResult.async_ack_callback_result(ret_future, e, False) ) @@ -434,7 +434,7 @@ class SimpleConsumer(Client): metadata=self._sign(), timeout=self.client_configuration.request_timeout, ) - self.__handle_change_invisible_result(future) + self.__handle_change_invisible_result(future, message) except Exception as e: raise e @@ -454,7 +454,7 @@ class SimpleConsumer(Client): ) ret_future = Future() change_invisible_callback = functools.partial( - self.__handle_change_invisible_result, ret_future=ret_future + self.__handle_change_invisible_result, message=message, ret_future=ret_future ) future.add_done_callback(change_invisible_callback) return ret_future @@ -472,15 +472,16 @@ class SimpleConsumer(Client): req.message_id = message.message_id return req - def __handle_change_invisible_result(self, future, ret_future=None): + def __handle_change_invisible_result(self, future, message, ret_future=None): try: res = future.result() logger.debug( f"consumer[{self.__consumer_group}] change invisible response, {res.status}" ) + message.receipt_handle = res.receipt_handle MessagingResultChecker.check(res.status) if ret_future is not None: - self._set_future_callback_result( + self._submit_callback( CallbackResult.async_change_invisible_duration_callback_result( ret_future, None ) @@ -489,7 +490,7 @@ class SimpleConsumer(Client): if ret_future is None: raise e else: - self._set_future_callback_result( + self._submit_callback( CallbackResult.async_change_invisible_duration_callback_result( ret_future, e, False ) diff --git a/python/rocketmq/v5/log/log_config.py b/python/rocketmq/v5/log/log_config.py index a363c650..f179a89f 100644 --- a/python/rocketmq/v5/log/log_config.py +++ b/python/rocketmq/v5/log/log_config.py @@ -32,7 +32,7 @@ __LOG_CONFIG = { # }, "file": { "class": "logging.handlers.RotatingFileHandler", - "level": "INFO", + "level": "DEBUG", "formatter": "standard", "filename": f"{__DIR}/rocketmq_client.log", "maxBytes": 1024 * 1024 * 100, # 100MB diff --git a/python/rocketmq/v5/model/message.py b/python/rocketmq/v5/model/message.py index ead7d795..dee7dad4 100644 --- a/python/rocketmq/v5/model/message.py +++ b/python/rocketmq/v5/model/message.py @@ -219,6 +219,10 @@ class Message: raise IllegalArgumentException("key should not be blank") self.__keys.update(set(keys)) + @receipt_handle.setter + def receipt_handle(self, receipt_handle): + self.__receipt_handle = receipt_handle + @message_type.setter def message_type(self, message_type): self.__message_type = message_type diff --git a/python/rocketmq/v5/producer/producer.py b/python/rocketmq/v5/producer/producer.py index ba9e3d9b..2a9bba70 100644 --- a/python/rocketmq/v5/producer/producer.py +++ b/python/rocketmq/v5/producer/producer.py @@ -17,7 +17,7 @@ import abc import functools import threading import time -from concurrent.futures import Future +from concurrent.futures import Future, ThreadPoolExecutor from rocketmq.grpc_protocol import (ClientType, Code, Encoding, EndTransactionRequest, HeartbeatRequest, @@ -136,6 +136,7 @@ class Producer(Client): self.__checker = ( checker # checker for transaction message, handle checking from server ) + self.__transaction_check_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="transaction_check_worker") def __str__(self): return f"{ClientType.Name(self.client_type)} client_id:{self.client_id}" @@ -223,7 +224,7 @@ class Producer(Client): ) return future.result() - async def on_recover_orphaned_transaction_command( + def on_recover_orphaned_transaction_command( self, endpoints, msg, transaction_id ): # call this function from server side stream, in RpcClient._io_loop @@ -236,28 +237,7 @@ class Producer(Client): if self.__checker is None: raise IllegalArgumentException("No transaction checker registered.") message = Message().fromProtobuf(msg) - result = self.__checker.check(message) - - if result == TransactionResolution.COMMIT: - res = await self.__commit_for_server_check( - endpoints, - message, - transaction_id, - TransactionSource.SOURCE_SERVER_CHECK, - ) - logger.debug( - f"commit message. message_id: {message.message_id}, transaction_id: {transaction_id}, res: {res}" - ) - elif result == TransactionResolution.ROLLBACK: - res = await self.__rollback_for_server_check( - endpoints, - message, - transaction_id, - TransactionSource.SOURCE_SERVER_CHECK, - ) - logger.debug( - f"rollback message. message_id: {message.message_id}, transaction_id: {transaction_id}, res: {res}" - ) + self.__transaction_check_executor.submit(self.__server_transaction_check, endpoints, message, transaction_id) except Exception as e: logger.error(f"on_recover_orphaned_transaction_command exception: {e}") @@ -313,6 +293,8 @@ class Producer(Client): def shutdown(self): logger.info(f"begin to shutdown {self.__str__()}") + self.__transaction_check_executor.shutdown() + self.__transaction_check_executor = None super().shutdown() logger.info(f"shutdown {self.__str__()} success.") @@ -395,7 +377,7 @@ class Producer(Client): send_message_future, topic_queue ) self.client_metrics.send_after(send_metric_context, True) - self._set_future_callback_result( + self._submit_callback( CallbackResult.async_send_callback_result(ret_future, send_receipt) ) except Exception as e: @@ -406,7 +388,7 @@ class Producer(Client): if retry_exception_future is not None: # end retry with exception self.client_metrics.send_after(send_metric_context, False) - self._set_future_callback_result( + self._submit_callback( CallbackResult.async_send_callback_result( ret_future, retry_exception_future.exception(), False ) @@ -436,13 +418,6 @@ class Producer(Client): ) end_retry = True - # no need more attempts for transactional message - if message.message_type == MessageType.TRANSACTION: - logger.error( - f"{self.__str__()} failed to send message to {topic_queue.endpoints.__str__()}, topic:{message.topic}, message_id:{message.message_id}, message_type:{message.message_type} ,attempt:{attempt}" - ) - end_retry = True - # end retry if system busy if isinstance(e, TooManyRequestsException): logger.error( @@ -547,27 +522,31 @@ class Producer(Client): req.source = source return req - def __commit_for_server_check( - self, endpoints, message: Message, transaction_id, source - ): - return self.__end_transaction_for_server_check( - endpoints, message, transaction_id, TransactionResolution.COMMIT, source - ) - - def __rollback_for_server_check( - self, endpoints, message: Message, transaction_id, source - ): - return self.__end_transaction_for_server_check( - endpoints, message, transaction_id, TransactionResolution.ROLLBACK, source - ) + def __server_transaction_check_callback(self, future, message, transaction_id, result): + try: + res = future.result() + if res is not None and res.status.code == Code.OK: + if result == TransactionResolution.COMMIT: + logger.debug( + f"{self.__str__()} commit message. message_id: {message.message_id}, transaction_id: {transaction_id}, res: {res}" + ) + elif result == TransactionResolution.ROLLBACK: + logger.debug( + f"{self.__str__()} rollback message. message_id: {message.message_id}, transaction_id: {transaction_id}, res: {res}" + ) + else: + if result == TransactionResolution.COMMIT: + raise Exception(f"{self.__str__()} commit message: {message.message_id} raise exception") + elif result == TransactionResolution.ROLLBACK: + raise Exception(f"{self.__str__()} rollback message: {message.message_id} raise exception") + except Exception as e: + logger.error(f"server transaction check raise exception, {e}") - def __end_transaction_for_server_check( - self, endpoints, message: Message, transaction_id, result, source - ): - req = self.__end_transaction_req(message, transaction_id, result, source) - return self.rpc_client.end_transaction_for_server_check( - endpoints, - req, - metadata=self._sign(), - timeout=self.client_configuration.request_timeout, - ) + def __server_transaction_check(self, endpoints, message, transaction_id): + try: + result = self.__checker.check(message) + req = self.__end_transaction_req(message, transaction_id, result, TransactionSource.SOURCE_SERVER_CHECK) + future = self.rpc_client.end_transaction_async(endpoints, req, metadata=self._sign(), timeout=self.client_configuration.request_timeout) + future.add_done_callback(functools.partial(self.__server_transaction_check_callback, message=message, transaction_id=transaction_id, result=result)) + except Exception as e: + raise e diff --git a/python/rocketmq/v5/util/misc.py b/python/rocketmq/v5/util/misc.py index b021b690..c0c00092 100644 --- a/python/rocketmq/v5/util/misc.py +++ b/python/rocketmq/v5/util/misc.py @@ -29,7 +29,7 @@ class Misc: __OS_NAME = None TOPIC_PATTERN = compile(r"^[%a-zA-Z0-9_-]+$") CONSUMER_GROUP_PATTERN = compile(r"^[%a-zA-Z0-9_-]+$") - SDK_VERSION = "5.0.3" + SDK_VERSION = "5.0.4" @staticmethod def sdk_language(): diff --git a/python/setup.py b/python/setup.py index 3881d6d7..f869f7ee 100644 --- a/python/setup.py +++ b/python/setup.py @@ -17,7 +17,7 @@ from setuptools import find_packages, setup setup( name='rocketmq-python-client', - version='5.0.3', + version='5.0.4', packages=find_packages(), install_requires=[ "grpcio>=1.5.0",