This is an automated email from the ASF dual-hosted git repository. lollipop pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push: new 4362aef0 python sdk 5.0.2 (#900) 4362aef0 is described below commit 4362aef0cbc16b1eeeb841e0aff3751530e6179c Author: zhouli11 <04081...@163.com> AuthorDate: Tue Dec 31 09:53:01 2024 +0800 python sdk 5.0.2 (#900) 1、Fix the bug related to message ID length 2、Fix the bug which sync the subscription relationship without expression 3、The subscribe method adds raise exception 4、Set default values for AK and SK with empty strings --- .github/workflows/python_build.yml | 6 ++-- python/example/async_producer_example.py | 33 ++++++++++++------- python/example/async_simple_consumer_example.py | 37 ++++++++++++++------- python/example/normal_producer_example.py | 36 +++++++++++++-------- python/example/simple_consumer_example.py | 39 ++++++++++++++--------- python/example/transaction_producer_example.py | 17 +++++++--- python/rocketmq/v5/client/client.py | 33 +++++++++++++------ python/rocketmq/v5/client/client_configuration.py | 6 ++-- python/rocketmq/v5/consumer/simple_consumer.py | 4 ++- python/rocketmq/v5/model/message.py | 2 ++ python/rocketmq/v5/util/message_id_codec.py | 5 +-- python/rocketmq/v5/util/misc.py | 2 +- python/rocketmq/v5/util/signature.py | 4 +-- python/setup.py | 2 +- 14 files changed, 143 insertions(+), 83 deletions(-) diff --git a/.github/workflows/python_build.yml b/.github/workflows/python_build.yml index e68b1d7a..75d61372 100644 --- a/.github/workflows/python_build.yml +++ b/.github/workflows/python_build.yml @@ -4,7 +4,7 @@ on: jobs: flake8: name: flake8 - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 steps: - name: Checkout uses: actions/checkout@v3 @@ -17,7 +17,7 @@ jobs: run: | flake8 --ignore=E501,W503 --exclude python/rocketmq/grpc_protocol python isort: - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 steps: - name: Checkout uses: actions/checkout@v3 @@ -30,7 +30,7 @@ jobs: run: | isort --check --diff --skip python/rocketmq/grpc_protocol python black: - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 steps: - name: Checkout uses: actions/checkout@v3 diff --git a/python/example/async_producer_example.py b/python/example/async_producer_example.py index 2c0b7948..8f3c986e 100644 --- a/python/example/async_producer_example.py +++ b/python/example/async_producer_example.py @@ -25,20 +25,29 @@ def handle_send_result(result_future): if __name__ == '__main__': - endpoints = "endpoints" - credentials = Credentials("ak", "sk") + endpoints = "foobar.com:8080" + credentials = Credentials() + # if auth enable + # credentials = Credentials("ak", "sk") config = ClientConfiguration(endpoints, credentials) topic = "topic" + producer = Producer(config, (topic,)) + try: - producer = Producer(config, (topic,)) producer.startup() - msg = Message() - msg.topic = topic - msg.body = "hello, rocketmq.".encode('utf-8') - msg.tag = "rocketmq-send-message" - msg.keys = "send_async" - msg.add_property("send", "async") - send_result_future = producer.send_async(msg) - send_result_future.add_done_callback(handle_send_result) + try: + for i in range(10): + msg = Message() + msg.topic = topic + msg.body = "hello, rocketmq.".encode('utf-8') + msg.tag = "rocketmq-send-message" + msg.keys = "send_async" + msg.add_property("send", "async") + send_result_future = producer.send_async(msg) + send_result_future.add_done_callback(handle_send_result) + except Exception as e: + print(f"async producer{producer.__str__()} send message raise exception: {e}") + producer.shutdown() except Exception as e: - print(f"async producer example raise exception: {e}") + print(f"{producer.__str__()} startup raise exception: {e}") + producer.shutdown() diff --git a/python/example/async_simple_consumer_example.py b/python/example/async_simple_consumer_example.py index a518ee7e..eed0630d 100644 --- a/python/example/async_simple_consumer_example.py +++ b/python/example/async_simple_consumer_example.py @@ -21,27 +21,40 @@ from rocketmq import ClientConfiguration, Credentials, SimpleConsumer def receive_callback(receive_result_future, consumer): messages = receive_result_future.result() + print(f"{consumer.__str__()} receive {len(messages)} messages.") for msg in messages: - print(f"{consumer.__str__()} receive {len(messages)} messages in callback.") try: consumer.ack(msg) - print(f"receive and ack message:{msg.message_id} in callback.") + print(f"ack message:{msg.message_id}.") except Exception as exception: - print(f"receive message callback raise exception: {exception}") + print(f"receive message raise exception: {exception}") if __name__ == '__main__': - endpoints = "endpoints" - credentials = Credentials("ak", "sk") + endpoints = "foobar.com:8080" + credentials = Credentials() + # if auth enable + # credentials = Credentials("ak", "sk") config = ClientConfiguration(endpoints, credentials) topic = "topic" + + simple_consumer = SimpleConsumer(config, "consumer-group") try: - simple_consumer = SimpleConsumer(config, "consumer_group") simple_consumer.startup() - simple_consumer.subscribe(topic) - while True: - time.sleep(5) - future = simple_consumer.receive_async(32, 15) - future.add_done_callback(functools.partial(receive_callback, consumer=simple_consumer)) + try: + simple_consumer.subscribe(topic) + # use tag filter + # simple_consumer.subscribe(topic, FilterExpression("tag")) + while True: + try: + time.sleep(1) + future = simple_consumer.receive_async(32, 15) + future.add_done_callback(functools.partial(receive_callback, consumer=simple_consumer)) + except Exception as e: + print(f"{simple_consumer.__str__()} receive topic:{topic} raise exception: {e}") + except Exception as e: + print(f"{simple_consumer.__str__()} subscribe topic:{topic} raise exception: {e}") + simple_consumer.shutdown() except Exception as e: - print(f"simple consumer example raise exception: {e}") + print(f"{simple_consumer.__str__()} startup raise exception: {e}") + simple_consumer.shutdown() diff --git a/python/example/normal_producer_example.py b/python/example/normal_producer_example.py index 3ffee017..16758d97 100644 --- a/python/example/normal_producer_example.py +++ b/python/example/normal_producer_example.py @@ -16,22 +16,30 @@ from rocketmq import ClientConfiguration, Credentials, Message, Producer if __name__ == '__main__': - endpoints = "endpoints" - credentials = Credentials("ak", "sk") + endpoints = "foobar.com:8080" + credentials = Credentials() + # if auth enable + # credentials = Credentials("ak", "sk") config = ClientConfiguration(endpoints, credentials) topic = "topic" + producer = Producer(config, (topic,)) + try: - producer = Producer(config, (topic,)) producer.startup() - msg = Message() - msg.topic = topic - msg.body = "hello, rocketmq.".encode('utf-8') - msg.tag = "rocketmq-send-message" - msg.keys = "send_sync" - msg.add_property("send", "sync") - res = producer.send(msg) - print(f"{producer.__str__()} send message success. {res}") - producer.shutdown() - print(f"{producer.__str__()} shutdown.") + try: + msg = Message() + msg.topic = topic + msg.body = "hello, rocketmq.".encode('utf-8') + msg.tag = "rocketmq-send-message" + msg.keys = "send_sync" + msg.add_property("send", "sync") + res = producer.send(msg) + print(f"{producer.__str__()} send message success. {res}") + producer.shutdown() + print(f"{producer.__str__()} shutdown.") + except Exception as e: + print(f"normal producer example raise exception: {e}") + producer.shutdown() except Exception as e: - print(f"normal producer example raise exception: {e}") + print(f"{producer.__str__()} startup raise exception: {e}") + producer.shutdown() diff --git a/python/example/simple_consumer_example.py b/python/example/simple_consumer_example.py index 19120ead..77526fe5 100644 --- a/python/example/simple_consumer_example.py +++ b/python/example/simple_consumer_example.py @@ -16,23 +16,32 @@ from rocketmq import ClientConfiguration, Credentials, SimpleConsumer if __name__ == '__main__': - endpoints = "endpoints" - credentials = Credentials("ak", "sk") + endpoints = "foobar.com:8080" + credentials = Credentials() + # if auth enable + # credentials = Credentials("ak", "sk") config = ClientConfiguration(endpoints, credentials) topic = "topic" + simple_consumer = SimpleConsumer(config, "consumer-group") try: - simple_consumer = SimpleConsumer(config, "consumer-group") simple_consumer.startup() - simple_consumer.subscribe(topic) - while True: - try: - messages = simple_consumer.receive(32, 15) - if messages is not None: - print(f"{simple_consumer.__str__()} receive {len(messages)} messages.") - for msg in messages: - simple_consumer.ack(msg) - print(f"{simple_consumer.__str__()} ack message:[{msg.message_id}].") - except Exception as e: - print(f"receive or ack message raise exception: {e}") + try: + simple_consumer.subscribe(topic) + # use tag filter + # simple_consumer.subscribe(topic, FilterExpression("tag")) + while True: + try: + messages = simple_consumer.receive(32, 15) + if messages is not None: + print(f"{simple_consumer.__str__()} receive {len(messages)} messages.") + for msg in messages: + simple_consumer.ack(msg) + print(f"{simple_consumer.__str__()} ack message:[{msg.message_id}].") + except Exception as e: + print(f"receive or ack message raise exception: {e}") + except Exception as e: + print(f"{simple_consumer.__str__()} subscribe topic:{topic} raise exception: {e}") + simple_consumer.shutdown() except Exception as e: - print(f"simple consumer example raise exception: {e}") + print(f"{simple_consumer.__str__()} startup raise exception: {e}") + simple_consumer.shutdown() diff --git a/python/example/transaction_producer_example.py b/python/example/transaction_producer_example.py index 7296ab11..c922f2e7 100644 --- a/python/example/transaction_producer_example.py +++ b/python/example/transaction_producer_example.py @@ -25,13 +25,20 @@ class TestChecker(TransactionChecker): if __name__ == '__main__': - endpoints = "endpoints" - credentials = Credentials("ak", "sk") + endpoints = "foobar.com:8080" + credentials = Credentials() + # if auth enable + # credentials = Credentials("ak", "sk") config = ClientConfiguration(endpoints, credentials) topic = "topic" + producer = Producer(config, (topic,)) + try: - producer = Producer(config, (topic,), TestChecker()) producer.startup() + except Exception as e: + print(f"{producer.__str__()} startup raise exception: {e}") + + try: transaction = producer.begin_transaction() msg = Message() msg.topic = topic @@ -40,6 +47,6 @@ if __name__ == '__main__': msg.keys = "send_transaction" msg.add_property("send", "transaction") res = producer.send(msg, transaction) - print(f"{producer.__str__()} send half message. {res}") + print(f"transaction producer{producer.__str__()} send half message success. {res}") except Exception as e: - print(f"transaction producer example raise exception: {e}") + print(f"transaction producer{producer.__str__()} example raise exception: {e}") diff --git a/python/rocketmq/v5/client/client.py b/python/rocketmq/v5/client/client.py index 14d479c5..57cf2a0b 100644 --- a/python/rocketmq/v5/client/client.py +++ b/python/rocketmq/v5/client/client.py @@ -34,6 +34,8 @@ from rocketmq.v5.util import (ClientId, ConcurrentMap, MessagingResultChecker, class Client: + CALLBACK_THREADS_NUM = 5 + def __init__(self, client_configuration, topics, client_type: ClientType, tls_enable=False): if client_configuration is None: raise IllegalArgumentException("clientConfiguration should not be null.") @@ -57,7 +59,7 @@ class Client: else: self.__topics = set() self.__callback_result_queue = Queue() - self.__callback_result_thread = None + self.__callback_threads = [] self.__is_running = False self.__client_thread_task_enabled = False self.__had_shutdown = False @@ -76,7 +78,7 @@ class Client: logger.warn( f"update topic exception when client startup, ignore it, try it again in scheduler. exception: {e}") self.__start_scheduler() - self.__start_callback_handler() + self.__start_async_rpc_callback_handler() self.__is_running = True self._start_success() except Exception as e: @@ -240,12 +242,19 @@ class Client: """ callback handler for async method """ - def __start_callback_handler(self): + def __start_async_rpc_callback_handler(self): # a thread to handle callback when using async method such as send_async(), receive_async(). # this handler switches user's callback thread from RpcClient's _io_loop_thread to client's callback_handler_thread - self.__callback_result_thread = threading.Thread(name="callback_handler_thread", target=self.__handle_callback) - self.__callback_result_thread.daemon = True - self.__callback_result_thread.start() + try: + for i in range(Client.CALLBACK_THREADS_NUM): + th = threading.Thread(name=f"callback_handler_thread-{i}", target=self.__handle_callback) + th.daemon = True + self.__callback_threads.append(th) + th.start() + logger.info(f"{self.__str__()} start async rpc callback thread:{th} success.") + except Exception as e: + print(f"{self.__str__()} start async rpc callback raise exception: {e}") + raise e def __handle_callback(self): while True: @@ -263,7 +272,7 @@ class Client: self.__callback_result_queue.task_done() else: break - logger.info(f"{self.__str__()} stop client callback result handler thread success.") + logger.info(f"{self.__str__()} stop client callback result handler thread:{threading.current_thread()} success.") """ protect """ @@ -375,6 +384,7 @@ class Client: req = self._sync_setting_req(endpoints) callback = functools.partial(self.__setting_write_callback, endpoints=endpoints) future = self.__rpc_client.telemetry_write_async(endpoints, req) + logger.debug(f"{self.__str__()} send setting to {endpoints.__str__()}, {req}") future.add_done_callback(callback) def __retrieve_telemetry_stream_stream_call(self, endpoints, rebuild=False): @@ -466,9 +476,11 @@ class Client: self.__clear_idle_rpc_channels_threading_event.set() self.__clear_idle_rpc_channels_scheduler.join() - if self.__callback_result_thread is not None: + for i in range(Client.CALLBACK_THREADS_NUM): self._set_future_callback_result(CallbackResult.end_callback_thread_result()) - self.__callback_result_thread.join() + + for i in range(Client.CALLBACK_THREADS_NUM): + self.__callback_threads[i].join() self.__topic_route_scheduler = None self.__topic_route_scheduler_threading_event = None @@ -478,7 +490,8 @@ class Client: self.__sync_setting_scheduler_threading_event = None self.__clear_idle_rpc_channels_scheduler = None self.__clear_idle_rpc_channels_threading_event = None - self.__callback_result_thread = None + self.__callback_result_queue = None + self.__callback_threads = None """ property """ diff --git a/python/rocketmq/v5/client/client_configuration.py b/python/rocketmq/v5/client/client_configuration.py index 565402ab..7c9c67f9 100644 --- a/python/rocketmq/v5/client/client_configuration.py +++ b/python/rocketmq/v5/client/client_configuration.py @@ -22,9 +22,9 @@ from rocketmq.v5.log import logger class Credentials: - def __init__(self, ak, sk): - self.__ak = ak - self.__sk = sk + def __init__(self, ak="", sk=""): + self.__ak = ak if ak is not None else "" + self.__sk = sk if sk is not None else "" @property def ak(self): diff --git a/python/rocketmq/v5/consumer/simple_consumer.py b/python/rocketmq/v5/consumer/simple_consumer.py index 33e936c4..11a09d1b 100644 --- a/python/rocketmq/v5/consumer/simple_consumer.py +++ b/python/rocketmq/v5/consumer/simple_consumer.py @@ -72,6 +72,7 @@ class SimpleConsumer(Client): self.__subscriptions.put(topic, filter_expression if filter_expression is not None else FilterExpression()) except Exception as e: logger.error(f"subscribe exception: {e}") + raise e def unsubscribe(self, topic): if self.is_running is False: @@ -141,6 +142,7 @@ class SimpleConsumer(Client): sub_entry.topic.name = topic sub_entry.topic.resource_namespace = self.client_configuration.namespace sub_entry.expression.type = expression.filter_type + sub_entry.expression.expression = expression.expression settings = Settings() settings.client_type = self.client_type @@ -148,7 +150,7 @@ class SimpleConsumer(Client): settings.request_timeout.seconds = self.client_configuration.request_timeout settings.subscription.CopyFrom(subscription) settings.user_agent.language = 6 - settings.user_agent.version = "5.0.1.1" + settings.user_agent.version = Misc.sdk_version() settings.user_agent.platform = Misc.get_os_description() settings.user_agent.hostname = Misc.get_local_ip() settings.metric.on = False diff --git a/python/rocketmq/v5/model/message.py b/python/rocketmq/v5/model/message.py index c333a9de..efd32085 100644 --- a/python/rocketmq/v5/model/message.py +++ b/python/rocketmq/v5/model/message.py @@ -160,6 +160,8 @@ class Message: @topic.setter def topic(self, topic): + if topic is None or topic.strip() == '': + raise IllegalArgumentException("topic has not been set yet") if Misc.is_valid_topic(topic): self.__topic = topic else: diff --git a/python/rocketmq/v5/util/message_id_codec.py b/python/rocketmq/v5/util/message_id_codec.py index 1fe26486..eec4f94a 100644 --- a/python/rocketmq/v5/util/message_id_codec.py +++ b/python/rocketmq/v5/util/message_id_codec.py @@ -101,10 +101,7 @@ class MessageIdCodec: @staticmethod def decode(message_id): - if len(message_id) == MessageIdCodec.MESSAGE_ID_LENGTH_FOR_V1_OR_LATER: - return message_id[2:] - else: - return message_id + return message_id """ private """ diff --git a/python/rocketmq/v5/util/misc.py b/python/rocketmq/v5/util/misc.py index 3d368b17..d371625f 100644 --- a/python/rocketmq/v5/util/misc.py +++ b/python/rocketmq/v5/util/misc.py @@ -29,7 +29,7 @@ class Misc: __OS_NAME = None TOPIC_PATTERN = compile(r'^[%a-zA-Z0-9_-]+$') CONSUMER_GROUP_PATTERN = compile(r'^[%a-zA-Z0-9_-]+$') - SDK_VERSION = "V5_0_1_SNAPSHOT" + SDK_VERSION = "5.0.2" @staticmethod def sdk_language(): diff --git a/python/rocketmq/v5/util/signature.py b/python/rocketmq/v5/util/signature.py index 52949a5f..a23dd26b 100644 --- a/python/rocketmq/v5/util/signature.py +++ b/python/rocketmq/v5/util/signature.py @@ -19,7 +19,7 @@ from hashlib import sha1 from hmac import new from uuid import uuid4 -from rocketmq.v5.util import ClientId +from rocketmq.v5.util import ClientId, Misc class Signature: @@ -46,7 +46,7 @@ class Signature: metadata = [ ("x-mq-language", "PYTHON"), ("x-mq-protocol", "GRPC_V2"), - ("x-mq-client-version", "5.0.1.1"), + ("x-mq-client-version", Misc.sdk_version()), ("x-mq-date-time", formatted_date_time), ("x-mq-request-id", request_id), ("x-mq-client-id", client_id), diff --git a/python/setup.py b/python/setup.py index a0450466..6e8453f7 100644 --- a/python/setup.py +++ b/python/setup.py @@ -17,7 +17,7 @@ from setuptools import find_packages, setup setup( name='rocketmq-python-client', - version='5.0.1', + version='5.0.2', packages=find_packages(), install_requires=[ "grpcio>=1.5.0",