This is an automated email from the ASF dual-hosted git repository. lollipop pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push: new 093d793e Python 5.0.6 (#1004) 093d793e is described below commit 093d793ecaf5a72afeb80c1b1eeedb7eb5b3e9c5 Author: zhouli11 <04081...@163.com> AuthorDate: Thu May 29 11:05:18 2025 +0800 Python 5.0.6 (#1004) * Python 5.0.6 --- python/rocketmq/v5/client/balancer/queue_selector.py | 1 - python/rocketmq/v5/client/client.py | 4 ++-- python/rocketmq/v5/client/metrics/client_metrics.py | 2 ++ python/rocketmq/v5/consumer/simple_consumer.py | 4 +++- python/rocketmq/v5/producer/producer.py | 1 - python/rocketmq/v5/util/misc.py | 2 +- python/setup.py | 2 +- 7 files changed, 9 insertions(+), 7 deletions(-) diff --git a/python/rocketmq/v5/client/balancer/queue_selector.py b/python/rocketmq/v5/client/balancer/queue_selector.py index bd48f21c..98780a80 100644 --- a/python/rocketmq/v5/client/balancer/queue_selector.py +++ b/python/rocketmq/v5/client/balancer/queue_selector.py @@ -68,7 +68,6 @@ class QueueSelector: def select_queue_by_hash_key(self, key): hash_object = hashlib.sha256(key.encode('utf-8')) hash_code = int.from_bytes(hash_object.digest(), byteorder='big') - print(f"hashcode: {hash_code}") return self.__message_queues[hash_code % len(self.__message_queues)] def all_queues(self): diff --git a/python/rocketmq/v5/client/client.py b/python/rocketmq/v5/client/client.py index d039a9a0..49bb6b3a 100644 --- a/python/rocketmq/v5/client/client.py +++ b/python/rocketmq/v5/client/client.py @@ -227,7 +227,6 @@ class Client: asyncio.set_event_loop(self._rpc_channel_io_loop()) while True: if self.__client_thread_task_enabled is True: - self.__sync_setting_scheduler_threading_event.wait(5) logger.debug(f"{self.__str__()} run update setting in scheduler.") try: all_endpoints = self.__get_all_endpoints().values() @@ -238,6 +237,7 @@ class Client: logger.error( f"{self.__str__()} scheduler set setting raise exception: {e}" ) + self.__sync_setting_scheduler_threading_event.wait(300) else: break logger.info(f"{self.__str__()} stop scheduler for update setting success.") @@ -273,7 +273,7 @@ class Client: thread_name_prefix=f"client_callback_worker-{self.__client_id}") logger.info(f"{self.__str__()} start callback executor success. max_workers:{workers}") except Exception as e: - print(f"{self.__str__()} start async rpc callback raise exception: {e}") + logger.error(f"{self.__str__()} start async rpc callback raise exception: {e}") raise e @staticmethod diff --git a/python/rocketmq/v5/client/metrics/client_metrics.py b/python/rocketmq/v5/client/metrics/client_metrics.py index 210631ee..22bef06f 100644 --- a/python/rocketmq/v5/client/metrics/client_metrics.py +++ b/python/rocketmq/v5/client/metrics/client_metrics.py @@ -28,6 +28,7 @@ from rocketmq.grpc_protocol import Metric from rocketmq.v5.client.connection import RpcEndpoints from rocketmq.v5.log import logger from rocketmq.v5.model import HistogramEnum, MessageMetricType, MetricContext +from rocketmq.v5.util import Signature class ClientMetrics: @@ -142,6 +143,7 @@ class ClientMetrics: endpoint=self.__endpoints.__str__(), insecure=True, timeout=ClientMetrics.METRIC_EXPORTER_RPC_TIMEOUT, + headers=Signature.metadata(self.__client_configuration, self.__client_id) ) # create a metric reader and set the export interval reader = PeriodicExportingMetricReader( diff --git a/python/rocketmq/v5/consumer/simple_consumer.py b/python/rocketmq/v5/consumer/simple_consumer.py index 7a19c903..d00bb1a7 100644 --- a/python/rocketmq/v5/consumer/simple_consumer.py +++ b/python/rocketmq/v5/consumer/simple_consumer.py @@ -42,6 +42,7 @@ class SimpleConsumer(Client): consumer_group, subscription: dict = None, await_duration=20, + tls_enable=False ): if consumer_group is None or consumer_group.strip() == "": raise IllegalArgumentException("consumerGroup should not be null") @@ -56,6 +57,7 @@ class SimpleConsumer(Client): client_configuration, None if subscription is None else subscription.keys(), ClientType.SIMPLE_CONSUMER, + tls_enable ) self.__consumer_group = consumer_group self.__await_duration = await_duration # long polling timeout, seconds @@ -98,7 +100,7 @@ class SimpleConsumer(Client): "unable to remove subscription because simple consumer is not running" ) - if topic in self.__subscriptions: + if self.__subscriptions.contains(topic): self.__subscriptions.remove(topic) self._remove_unused_topic_route_data(topic) diff --git a/python/rocketmq/v5/producer/producer.py b/python/rocketmq/v5/producer/producer.py index 5072d90a..4f5f089f 100644 --- a/python/rocketmq/v5/producer/producer.py +++ b/python/rocketmq/v5/producer/producer.py @@ -303,7 +303,6 @@ class Producer(Client): def __send(self, message: Message, topic_queue, attempt=1) -> SendReceipt: req = self.__send_req(message) send_context = self.client_metrics.send_before(message.topic) - print(f"{topic_queue}") send_message_future = self.rpc_client.send_message_async( topic_queue.endpoints, req, diff --git a/python/rocketmq/v5/util/misc.py b/python/rocketmq/v5/util/misc.py index e09faf99..79d6be34 100644 --- a/python/rocketmq/v5/util/misc.py +++ b/python/rocketmq/v5/util/misc.py @@ -29,7 +29,7 @@ class Misc: __OS_NAME = None TOPIC_PATTERN = compile(r"^[%a-zA-Z0-9_-]+$") CONSUMER_GROUP_PATTERN = compile(r"^[%a-zA-Z0-9_-]+$") - SDK_VERSION = "5.0.5" + SDK_VERSION = "5.0.6" @staticmethod def sdk_language(): diff --git a/python/setup.py b/python/setup.py index 75f95b68..02a125be 100644 --- a/python/setup.py +++ b/python/setup.py @@ -17,7 +17,7 @@ from setuptools import find_packages, setup setup( name='rocketmq-python-client', - version='5.0.5', + version='5.0.6', packages=find_packages(), install_requires=[ "grpcio>=1.5.0",