This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 4362aef0 python sdk 5.0.2 (#900)
4362aef0 is described below

commit 4362aef0cbc16b1eeeb841e0aff3751530e6179c
Author: zhouli11 <04081...@163.com>
AuthorDate: Tue Dec 31 09:53:01 2024 +0800

    python sdk 5.0.2 (#900)
    
    1、Fix the bug related to message ID length
    2、Fix the bug which sync the subscription relationship without expression
    3、The subscribe method adds raise exception
    4、Set default values for AK and SK with empty strings
---
 .github/workflows/python_build.yml                |  6 ++--
 python/example/async_producer_example.py          | 33 ++++++++++++-------
 python/example/async_simple_consumer_example.py   | 37 ++++++++++++++-------
 python/example/normal_producer_example.py         | 36 +++++++++++++--------
 python/example/simple_consumer_example.py         | 39 ++++++++++++++---------
 python/example/transaction_producer_example.py    | 17 +++++++---
 python/rocketmq/v5/client/client.py               | 33 +++++++++++++------
 python/rocketmq/v5/client/client_configuration.py |  6 ++--
 python/rocketmq/v5/consumer/simple_consumer.py    |  4 ++-
 python/rocketmq/v5/model/message.py               |  2 ++
 python/rocketmq/v5/util/message_id_codec.py       |  5 +--
 python/rocketmq/v5/util/misc.py                   |  2 +-
 python/rocketmq/v5/util/signature.py              |  4 +--
 python/setup.py                                   |  2 +-
 14 files changed, 143 insertions(+), 83 deletions(-)

diff --git a/.github/workflows/python_build.yml 
b/.github/workflows/python_build.yml
index e68b1d7a..75d61372 100644
--- a/.github/workflows/python_build.yml
+++ b/.github/workflows/python_build.yml
@@ -4,7 +4,7 @@ on:
 jobs:
   flake8:
     name: flake8
-    runs-on: ubuntu-latest
+    runs-on: ubuntu-22.04
     steps:
       - name: Checkout
         uses: actions/checkout@v3
@@ -17,7 +17,7 @@ jobs:
         run: |
           flake8 --ignore=E501,W503 --exclude python/rocketmq/grpc_protocol 
python
   isort:
-    runs-on: ubuntu-latest
+    runs-on: ubuntu-22.04
     steps:
       - name: Checkout
         uses: actions/checkout@v3
@@ -30,7 +30,7 @@ jobs:
         run: |
           isort --check --diff --skip python/rocketmq/grpc_protocol python
   black:
-    runs-on: ubuntu-latest
+    runs-on: ubuntu-22.04
     steps:
       - name: Checkout
         uses: actions/checkout@v3
diff --git a/python/example/async_producer_example.py 
b/python/example/async_producer_example.py
index 2c0b7948..8f3c986e 100644
--- a/python/example/async_producer_example.py
+++ b/python/example/async_producer_example.py
@@ -25,20 +25,29 @@ def handle_send_result(result_future):
 
 
 if __name__ == '__main__':
-    endpoints = "endpoints"
-    credentials = Credentials("ak", "sk")
+    endpoints = "foobar.com:8080"
+    credentials = Credentials()
+    # if auth enable
+    # credentials = Credentials("ak", "sk")
     config = ClientConfiguration(endpoints, credentials)
     topic = "topic"
+    producer = Producer(config, (topic,))
+
     try:
-        producer = Producer(config, (topic,))
         producer.startup()
-        msg = Message()
-        msg.topic = topic
-        msg.body = "hello, rocketmq.".encode('utf-8')
-        msg.tag = "rocketmq-send-message"
-        msg.keys = "send_async"
-        msg.add_property("send", "async")
-        send_result_future = producer.send_async(msg)
-        send_result_future.add_done_callback(handle_send_result)
+        try:
+            for i in range(10):
+                msg = Message()
+                msg.topic = topic
+                msg.body = "hello, rocketmq.".encode('utf-8')
+                msg.tag = "rocketmq-send-message"
+                msg.keys = "send_async"
+                msg.add_property("send", "async")
+                send_result_future = producer.send_async(msg)
+                send_result_future.add_done_callback(handle_send_result)
+        except Exception as e:
+            print(f"async producer{producer.__str__()} send message raise 
exception: {e}")
+            producer.shutdown()
     except Exception as e:
-        print(f"async producer example raise exception: {e}")
+        print(f"{producer.__str__()} startup raise exception: {e}")
+        producer.shutdown()
diff --git a/python/example/async_simple_consumer_example.py 
b/python/example/async_simple_consumer_example.py
index a518ee7e..eed0630d 100644
--- a/python/example/async_simple_consumer_example.py
+++ b/python/example/async_simple_consumer_example.py
@@ -21,27 +21,40 @@ from rocketmq import ClientConfiguration, Credentials, 
SimpleConsumer
 
 def receive_callback(receive_result_future, consumer):
     messages = receive_result_future.result()
+    print(f"{consumer.__str__()} receive {len(messages)} messages.")
     for msg in messages:
-        print(f"{consumer.__str__()} receive {len(messages)} messages in 
callback.")
         try:
             consumer.ack(msg)
-            print(f"receive and ack message:{msg.message_id} in callback.")
+            print(f"ack message:{msg.message_id}.")
         except Exception as exception:
-            print(f"receive message callback raise exception: {exception}")
+            print(f"receive message raise exception: {exception}")
 
 
 if __name__ == '__main__':
-    endpoints = "endpoints"
-    credentials = Credentials("ak", "sk")
+    endpoints = "foobar.com:8080"
+    credentials = Credentials()
+    # if auth enable
+    # credentials = Credentials("ak", "sk")
     config = ClientConfiguration(endpoints, credentials)
     topic = "topic"
+
+    simple_consumer = SimpleConsumer(config, "consumer-group")
     try:
-        simple_consumer = SimpleConsumer(config, "consumer_group")
         simple_consumer.startup()
-        simple_consumer.subscribe(topic)
-        while True:
-            time.sleep(5)
-            future = simple_consumer.receive_async(32, 15)
-            future.add_done_callback(functools.partial(receive_callback, 
consumer=simple_consumer))
+        try:
+            simple_consumer.subscribe(topic)
+            # use tag filter
+            # simple_consumer.subscribe(topic, FilterExpression("tag"))
+            while True:
+                try:
+                    time.sleep(1)
+                    future = simple_consumer.receive_async(32, 15)
+                    
future.add_done_callback(functools.partial(receive_callback, 
consumer=simple_consumer))
+                except Exception as e:
+                    print(f"{simple_consumer.__str__()} receive topic:{topic} 
raise exception: {e}")
+        except Exception as e:
+            print(f"{simple_consumer.__str__()} subscribe topic:{topic} raise 
exception: {e}")
+            simple_consumer.shutdown()
     except Exception as e:
-        print(f"simple consumer example raise exception: {e}")
+        print(f"{simple_consumer.__str__()} startup raise exception: {e}")
+        simple_consumer.shutdown()
diff --git a/python/example/normal_producer_example.py 
b/python/example/normal_producer_example.py
index 3ffee017..16758d97 100644
--- a/python/example/normal_producer_example.py
+++ b/python/example/normal_producer_example.py
@@ -16,22 +16,30 @@
 from rocketmq import ClientConfiguration, Credentials, Message, Producer
 
 if __name__ == '__main__':
-    endpoints = "endpoints"
-    credentials = Credentials("ak", "sk")
+    endpoints = "foobar.com:8080"
+    credentials = Credentials()
+    # if auth enable
+    # credentials = Credentials("ak", "sk")
     config = ClientConfiguration(endpoints, credentials)
     topic = "topic"
+    producer = Producer(config, (topic,))
+
     try:
-        producer = Producer(config, (topic,))
         producer.startup()
-        msg = Message()
-        msg.topic = topic
-        msg.body = "hello, rocketmq.".encode('utf-8')
-        msg.tag = "rocketmq-send-message"
-        msg.keys = "send_sync"
-        msg.add_property("send", "sync")
-        res = producer.send(msg)
-        print(f"{producer.__str__()} send message success. {res}")
-        producer.shutdown()
-        print(f"{producer.__str__()} shutdown.")
+        try:
+            msg = Message()
+            msg.topic = topic
+            msg.body = "hello, rocketmq.".encode('utf-8')
+            msg.tag = "rocketmq-send-message"
+            msg.keys = "send_sync"
+            msg.add_property("send", "sync")
+            res = producer.send(msg)
+            print(f"{producer.__str__()} send message success. {res}")
+            producer.shutdown()
+            print(f"{producer.__str__()} shutdown.")
+        except Exception as e:
+            print(f"normal producer example raise exception: {e}")
+            producer.shutdown()
     except Exception as e:
-        print(f"normal producer example raise exception: {e}")
+        print(f"{producer.__str__()} startup raise exception: {e}")
+        producer.shutdown()
diff --git a/python/example/simple_consumer_example.py 
b/python/example/simple_consumer_example.py
index 19120ead..77526fe5 100644
--- a/python/example/simple_consumer_example.py
+++ b/python/example/simple_consumer_example.py
@@ -16,23 +16,32 @@
 from rocketmq import ClientConfiguration, Credentials, SimpleConsumer
 
 if __name__ == '__main__':
-    endpoints = "endpoints"
-    credentials = Credentials("ak", "sk")
+    endpoints = "foobar.com:8080"
+    credentials = Credentials()
+    # if auth enable
+    # credentials = Credentials("ak", "sk")
     config = ClientConfiguration(endpoints, credentials)
     topic = "topic"
+    simple_consumer = SimpleConsumer(config, "consumer-group")
     try:
-        simple_consumer = SimpleConsumer(config, "consumer-group")
         simple_consumer.startup()
-        simple_consumer.subscribe(topic)
-        while True:
-            try:
-                messages = simple_consumer.receive(32, 15)
-                if messages is not None:
-                    print(f"{simple_consumer.__str__()} receive 
{len(messages)} messages.")
-                    for msg in messages:
-                        simple_consumer.ack(msg)
-                        print(f"{simple_consumer.__str__()} ack 
message:[{msg.message_id}].")
-            except Exception as e:
-                print(f"receive or ack message raise exception: {e}")
+        try:
+            simple_consumer.subscribe(topic)
+            # use tag filter
+            # simple_consumer.subscribe(topic, FilterExpression("tag"))
+            while True:
+                try:
+                    messages = simple_consumer.receive(32, 15)
+                    if messages is not None:
+                        print(f"{simple_consumer.__str__()} receive 
{len(messages)} messages.")
+                        for msg in messages:
+                            simple_consumer.ack(msg)
+                            print(f"{simple_consumer.__str__()} ack 
message:[{msg.message_id}].")
+                except Exception as e:
+                    print(f"receive or ack message raise exception: {e}")
+        except Exception as e:
+            print(f"{simple_consumer.__str__()} subscribe topic:{topic} raise 
exception: {e}")
+            simple_consumer.shutdown()
     except Exception as e:
-        print(f"simple consumer example raise exception: {e}")
+        print(f"{simple_consumer.__str__()} startup raise exception: {e}")
+        simple_consumer.shutdown()
diff --git a/python/example/transaction_producer_example.py 
b/python/example/transaction_producer_example.py
index 7296ab11..c922f2e7 100644
--- a/python/example/transaction_producer_example.py
+++ b/python/example/transaction_producer_example.py
@@ -25,13 +25,20 @@ class TestChecker(TransactionChecker):
 
 
 if __name__ == '__main__':
-    endpoints = "endpoints"
-    credentials = Credentials("ak", "sk")
+    endpoints = "foobar.com:8080"
+    credentials = Credentials()
+    # if auth enable
+    # credentials = Credentials("ak", "sk")
     config = ClientConfiguration(endpoints, credentials)
     topic = "topic"
+    producer = Producer(config, (topic,))
+
     try:
-        producer = Producer(config, (topic,), TestChecker())
         producer.startup()
+    except Exception as e:
+        print(f"{producer.__str__()} startup raise exception: {e}")
+
+    try:
         transaction = producer.begin_transaction()
         msg = Message()
         msg.topic = topic
@@ -40,6 +47,6 @@ if __name__ == '__main__':
         msg.keys = "send_transaction"
         msg.add_property("send", "transaction")
         res = producer.send(msg, transaction)
-        print(f"{producer.__str__()} send half message. {res}")
+        print(f"transaction producer{producer.__str__()} send half message 
success. {res}")
     except Exception as e:
-        print(f"transaction producer example raise exception: {e}")
+        print(f"transaction producer{producer.__str__()} example raise 
exception: {e}")
diff --git a/python/rocketmq/v5/client/client.py 
b/python/rocketmq/v5/client/client.py
index 14d479c5..57cf2a0b 100644
--- a/python/rocketmq/v5/client/client.py
+++ b/python/rocketmq/v5/client/client.py
@@ -34,6 +34,8 @@ from rocketmq.v5.util import (ClientId, ConcurrentMap, 
MessagingResultChecker,
 
 class Client:
 
+    CALLBACK_THREADS_NUM = 5
+
     def __init__(self, client_configuration, topics, client_type: ClientType, 
tls_enable=False):
         if client_configuration is None:
             raise IllegalArgumentException("clientConfiguration should not be 
null.")
@@ -57,7 +59,7 @@ class Client:
         else:
             self.__topics = set()
         self.__callback_result_queue = Queue()
-        self.__callback_result_thread = None
+        self.__callback_threads = []
         self.__is_running = False
         self.__client_thread_task_enabled = False
         self.__had_shutdown = False
@@ -76,7 +78,7 @@ class Client:
                 logger.warn(
                     f"update topic exception when client startup, ignore it, 
try it again in scheduler. exception: {e}")
             self.__start_scheduler()
-            self.__start_callback_handler()
+            self.__start_async_rpc_callback_handler()
             self.__is_running = True
             self._start_success()
         except Exception as e:
@@ -240,12 +242,19 @@ class Client:
 
     """ callback handler for async method """
 
-    def __start_callback_handler(self):
+    def __start_async_rpc_callback_handler(self):
         # a thread to handle callback when using async method such as 
send_async(), receive_async().
         # this handler switches user's callback thread from RpcClient's 
_io_loop_thread to client's callback_handler_thread
-        self.__callback_result_thread = 
threading.Thread(name="callback_handler_thread", target=self.__handle_callback)
-        self.__callback_result_thread.daemon = True
-        self.__callback_result_thread.start()
+        try:
+            for i in range(Client.CALLBACK_THREADS_NUM):
+                th = threading.Thread(name=f"callback_handler_thread-{i}", 
target=self.__handle_callback)
+                th.daemon = True
+                self.__callback_threads.append(th)
+                th.start()
+                logger.info(f"{self.__str__()} start async rpc callback 
thread:{th} success.")
+        except Exception as e:
+            print(f"{self.__str__()} start async rpc callback raise exception: 
{e}")
+            raise e
 
     def __handle_callback(self):
         while True:
@@ -263,7 +272,7 @@ class Client:
                     self.__callback_result_queue.task_done()
             else:
                 break
-        logger.info(f"{self.__str__()} stop client callback result handler 
thread success.")
+        logger.info(f"{self.__str__()} stop client callback result handler 
thread:{threading.current_thread()} success.")
 
     """ protect """
 
@@ -375,6 +384,7 @@ class Client:
         req = self._sync_setting_req(endpoints)
         callback = functools.partial(self.__setting_write_callback, 
endpoints=endpoints)
         future = self.__rpc_client.telemetry_write_async(endpoints, req)
+        logger.debug(f"{self.__str__()} send setting to {endpoints.__str__()}, 
{req}")
         future.add_done_callback(callback)
 
     def __retrieve_telemetry_stream_stream_call(self, endpoints, 
rebuild=False):
@@ -466,9 +476,11 @@ class Client:
                 self.__clear_idle_rpc_channels_threading_event.set()
                 self.__clear_idle_rpc_channels_scheduler.join()
 
-        if self.__callback_result_thread is not None:
+        for i in range(Client.CALLBACK_THREADS_NUM):
             
self._set_future_callback_result(CallbackResult.end_callback_thread_result())
-            self.__callback_result_thread.join()
+
+        for i in range(Client.CALLBACK_THREADS_NUM):
+            self.__callback_threads[i].join()
 
         self.__topic_route_scheduler = None
         self.__topic_route_scheduler_threading_event = None
@@ -478,7 +490,8 @@ class Client:
         self.__sync_setting_scheduler_threading_event = None
         self.__clear_idle_rpc_channels_scheduler = None
         self.__clear_idle_rpc_channels_threading_event = None
-        self.__callback_result_thread = None
+        self.__callback_result_queue = None
+        self.__callback_threads = None
 
     """ property """
 
diff --git a/python/rocketmq/v5/client/client_configuration.py 
b/python/rocketmq/v5/client/client_configuration.py
index 565402ab..7c9c67f9 100644
--- a/python/rocketmq/v5/client/client_configuration.py
+++ b/python/rocketmq/v5/client/client_configuration.py
@@ -22,9 +22,9 @@ from rocketmq.v5.log import logger
 
 class Credentials:
 
-    def __init__(self, ak, sk):
-        self.__ak = ak
-        self.__sk = sk
+    def __init__(self, ak="", sk=""):
+        self.__ak = ak if ak is not None else ""
+        self.__sk = sk if sk is not None else ""
 
     @property
     def ak(self):
diff --git a/python/rocketmq/v5/consumer/simple_consumer.py 
b/python/rocketmq/v5/consumer/simple_consumer.py
index 33e936c4..11a09d1b 100644
--- a/python/rocketmq/v5/consumer/simple_consumer.py
+++ b/python/rocketmq/v5/consumer/simple_consumer.py
@@ -72,6 +72,7 @@ class SimpleConsumer(Client):
             self.__subscriptions.put(topic, filter_expression if 
filter_expression is not None else FilterExpression())
         except Exception as e:
             logger.error(f"subscribe exception: {e}")
+            raise e
 
     def unsubscribe(self, topic):
         if self.is_running is False:
@@ -141,6 +142,7 @@ class SimpleConsumer(Client):
             sub_entry.topic.name = topic
             sub_entry.topic.resource_namespace = 
self.client_configuration.namespace
             sub_entry.expression.type = expression.filter_type
+            sub_entry.expression.expression = expression.expression
 
         settings = Settings()
         settings.client_type = self.client_type
@@ -148,7 +150,7 @@ class SimpleConsumer(Client):
         settings.request_timeout.seconds = 
self.client_configuration.request_timeout
         settings.subscription.CopyFrom(subscription)
         settings.user_agent.language = 6
-        settings.user_agent.version = "5.0.1.1"
+        settings.user_agent.version = Misc.sdk_version()
         settings.user_agent.platform = Misc.get_os_description()
         settings.user_agent.hostname = Misc.get_local_ip()
         settings.metric.on = False
diff --git a/python/rocketmq/v5/model/message.py 
b/python/rocketmq/v5/model/message.py
index c333a9de..efd32085 100644
--- a/python/rocketmq/v5/model/message.py
+++ b/python/rocketmq/v5/model/message.py
@@ -160,6 +160,8 @@ class Message:
 
     @topic.setter
     def topic(self, topic):
+        if topic is None or topic.strip() == '':
+            raise IllegalArgumentException("topic has not been set yet")
         if Misc.is_valid_topic(topic):
             self.__topic = topic
         else:
diff --git a/python/rocketmq/v5/util/message_id_codec.py 
b/python/rocketmq/v5/util/message_id_codec.py
index 1fe26486..eec4f94a 100644
--- a/python/rocketmq/v5/util/message_id_codec.py
+++ b/python/rocketmq/v5/util/message_id_codec.py
@@ -101,10 +101,7 @@ class MessageIdCodec:
 
     @staticmethod
     def decode(message_id):
-        if len(message_id) == MessageIdCodec.MESSAGE_ID_LENGTH_FOR_V1_OR_LATER:
-            return message_id[2:]
-        else:
-            return message_id
+        return message_id
 
     """ private """
 
diff --git a/python/rocketmq/v5/util/misc.py b/python/rocketmq/v5/util/misc.py
index 3d368b17..d371625f 100644
--- a/python/rocketmq/v5/util/misc.py
+++ b/python/rocketmq/v5/util/misc.py
@@ -29,7 +29,7 @@ class Misc:
     __OS_NAME = None
     TOPIC_PATTERN = compile(r'^[%a-zA-Z0-9_-]+$')
     CONSUMER_GROUP_PATTERN = compile(r'^[%a-zA-Z0-9_-]+$')
-    SDK_VERSION = "V5_0_1_SNAPSHOT"
+    SDK_VERSION = "5.0.2"
 
     @staticmethod
     def sdk_language():
diff --git a/python/rocketmq/v5/util/signature.py 
b/python/rocketmq/v5/util/signature.py
index 52949a5f..a23dd26b 100644
--- a/python/rocketmq/v5/util/signature.py
+++ b/python/rocketmq/v5/util/signature.py
@@ -19,7 +19,7 @@ from hashlib import sha1
 from hmac import new
 from uuid import uuid4
 
-from rocketmq.v5.util import ClientId
+from rocketmq.v5.util import ClientId, Misc
 
 
 class Signature:
@@ -46,7 +46,7 @@ class Signature:
         metadata = [
             ("x-mq-language", "PYTHON"),
             ("x-mq-protocol", "GRPC_V2"),
-            ("x-mq-client-version", "5.0.1.1"),
+            ("x-mq-client-version", Misc.sdk_version()),
             ("x-mq-date-time", formatted_date_time),
             ("x-mq-request-id", request_id),
             ("x-mq-client-id", client_id),
diff --git a/python/setup.py b/python/setup.py
index a0450466..6e8453f7 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -17,7 +17,7 @@ from setuptools import find_packages, setup
 
 setup(
     name='rocketmq-python-client',
-    version='5.0.1',
+    version='5.0.2',
     packages=find_packages(),
     install_requires=[
         "grpcio>=1.5.0",

Reply via email to