This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new c334f25e Implement a series of features (#538)
c334f25e is described below
commit c334f25e8613db953f92668deaf225ba539de3e9
Author: Aaron Ai <[email protected]>
AuthorDate: Mon Jun 12 14:49:12 2023 +0800
Implement a series of features (#538)
* Implement a series of features
1. Encoder for client id
2. Enrich the logging information
3. Impelement authentication algorithm
* Fix style issue
---
python/rocketmq/client_config.py | 33 +++++++++
python/rocketmq/client_id_encoder.py | 47 ++++++++++++
python/rocketmq/client_manager.py | 137 +++++++++++++++++++++++++++++++++++
python/rocketmq/log.py | 38 ++++++++++
python/rocketmq/message.py | 125 ++++++++++++++++++++++++++++++++
python/rocketmq/message_id.py | 20 +++++
python/rocketmq/send_receipt.py | 25 +++++++
python/rocketmq/signature.py | 88 ++++++++++++++++++++++
python/rocketmq/utils.py | 39 ++++++++++
9 files changed, 552 insertions(+)
diff --git a/python/rocketmq/client_config.py b/python/rocketmq/client_config.py
new file mode 100644
index 00000000..74f9a8ee
--- /dev/null
+++ b/python/rocketmq/client_config.py
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class ClientConfig:
+ def __init__(self, endpoints: str, session_credentials_provider,
ssl_enabled: bool):
+ self.__endpoints = endpoints
+ self.__session_credentials_provider = session_credentials_provider
+ self.__ssl_enabled = ssl_enabled
+
+ @property
+ def session_credentials_provider(self):
+ return self.__session_credentials_provider
+
+ @property
+ def endpoints(self):
+ return self.__endpoints
+
+ @property
+ def ssl_enabled(self):
+ return self.__ssl_enabled
diff --git a/python/rocketmq/client_id_encoder.py
b/python/rocketmq/client_id_encoder.py
new file mode 100644
index 00000000..138b05f0
--- /dev/null
+++ b/python/rocketmq/client_id_encoder.py
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import socket
+import threading
+import time
+
+import rocketmq.utils
+
+
+class ClientIdEncoder:
+ __INDEX = 0
+ __INDEX_LOCK = threading.Lock()
+ __CLIENT_ID_SEPARATOR = "@"
+
+ @staticmethod
+ def __get_and_increment_sequence():
+ with ClientIdEncoder.__INDEX_LOCK:
+ temp = ClientIdEncoder.__INDEX
+ ClientIdEncoder.__INDEX += 1
+ return temp
+
+ @staticmethod
+ def generate() -> str:
+ index = ClientIdEncoder.__get_and_increment_sequence()
+ return (
+ socket.gethostname()
+ + ClientIdEncoder.__CLIENT_ID_SEPARATOR
+ + str(os.getpid())
+ + ClientIdEncoder.__CLIENT_ID_SEPARATOR
+ + str(index)
+ + ClientIdEncoder.__CLIENT_ID_SEPARATOR
+ + str(rocketmq.utils.number_to_base(time.monotonic_ns(), 36))
+ )
diff --git a/python/rocketmq/client_manager.py
b/python/rocketmq/client_manager.py
new file mode 100644
index 00000000..dd13b2f5
--- /dev/null
+++ b/python/rocketmq/client_manager.py
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import threading
+
+from protocol import service_pb2
+from rocketmq.client import Client
+from rocketmq.rpc_client import Endpoints, RpcClient
+
+
+class ClientManager:
+ def __init__(self, client: Client):
+ self.__client = client
+ self.__rpc_clients = {}
+ self.__rpc_clients_lock = threading.Lock()
+
+ def __get_rpc_client(self, endpoints: Endpoints, ssl_enabled: bool) ->
RpcClient:
+ with self.__rpc_clients_lock:
+ rpc_client = self.__rpc_clients.get(endpoints)
+ if rpc_client:
+ return rpc_client
+ rpc_client = RpcClient(endpoints.get_target(), ssl_enabled)
+ self.__rpc_clients[endpoints] = rpc_client
+ return rpc_client
+
+ async def query_route(
+ self,
+ endpoints: Endpoints,
+ request: service_pb2.QueryRouteRequest,
+ timeout_seconds: int,
+ ):
+ rpc_client = self.__get_rpc_client(
+ endpoints, self.__client.client_config.ssl_enabled
+ )
+ return await rpc_client.query_route(request, timeout_seconds)
+
+ async def heartbeat(
+ self,
+ endpoints: Endpoints,
+ request: service_pb2.HeartbeatRequest,
+ timeout_seconds: int,
+ ):
+ rpc_client = self.__get_rpc_client(
+ endpoints, self.__client.client_config.ssl_enabled
+ )
+ return await rpc_client.heartbeat(request, timeout_seconds)
+
+ async def send_message(
+ self,
+ endpoints: Endpoints,
+ request: service_pb2.SendMessageRequest,
+ timeout_seconds: int,
+ ):
+ rpc_client = self.__get_rpc_client(
+ endpoints, self.__client.client_config.ssl_enabled
+ )
+ return await rpc_client.send_message(request, timeout_seconds)
+
+ async def query_assignment(
+ self,
+ endpoints: Endpoints,
+ request: service_pb2.QueryAssignmentRequest,
+ timeout_seconds: int,
+ ):
+ rpc_client = self.__get_rpc_client(
+ endpoints, self.__client.client_config.ssl_enabled
+ )
+ return await rpc_client.query_assignment(request, timeout_seconds)
+
+ async def ack_message(
+ self,
+ endpoints: Endpoints,
+ request: service_pb2.AckMessageRequest,
+ timeout_seconds: int,
+ ):
+ rpc_client = self.__get_rpc_client(
+ endpoints, self.__client.client_config.ssl_enabled
+ )
+ return await rpc_client.ack_message(request, timeout_seconds)
+
+ async def forward_message_to_dead_letter_queue(
+ self,
+ endpoints: Endpoints,
+ request: service_pb2.ForwardMessageToDeadLetterQueueRequest,
+ timeout_seconds: int,
+ ):
+ rpc_client = self.__get_rpc_client(
+ endpoints, self.__client.client_config.ssl_enabled
+ )
+ return await rpc_client.forward_message_to_dead_letter_queue(
+ request, timeout_seconds
+ )
+
+ async def end_transaction(
+ self,
+ endpoints: Endpoints,
+ request: service_pb2.EndTransactionRequest,
+ timeout_seconds: int,
+ ):
+ rpc_client = self.__get_rpc_client(
+ endpoints, self.__client.client_config.ssl_enabled
+ )
+ return await rpc_client.end_transaction(request, timeout_seconds)
+
+ async def notify_client_termination(
+ self,
+ endpoints: Endpoints,
+ request: service_pb2.NotifyClientTerminationRequest,
+ timeout_seconds: int,
+ ):
+ rpc_client = self.__get_rpc_client(
+ endpoints, self.__client.client_config.ssl_enabled
+ )
+ return await rpc_client.notify_client_termination(request,
timeout_seconds)
+
+ async def change_invisible_duration(
+ self,
+ endpoints: Endpoints,
+ request: service_pb2.ChangeInvisibleDurationRequest,
+ timeout_seconds: int,
+ ):
+ rpc_client = self.__get_rpc_client(
+ endpoints, self.__client.client_config.ssl_enabled
+ )
+ return await rpc_client.change_invisible_duration(request,
timeout_seconds)
diff --git a/python/rocketmq/log.py b/python/rocketmq/log.py
new file mode 100644
index 00000000..f3e4eae3
--- /dev/null
+++ b/python/rocketmq/log.py
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+
+logger = logging.getLogger("rocketmqlogger")
+logger.setLevel(logging.DEBUG)
+
+log_path = os.path.join(
+ os.path.expanduser("~"), "logs", "rocketmq", "rocketmq-client.log"
+)
+file_handler = logging.FileHandler(log_path)
+file_handler.setLevel(logging.DEBUG)
+
+console_handler = logging.StreamHandler()
+console_handler.setLevel(logging.DEBUG)
+
+formatter = logging.Formatter(
+ "%(asctime)s [%(levelname)s] [%(process)d] [%(threadName)s]
[%(filename)s#%(funcName)s:%(lineno)d] %(message)s"
+)
+file_handler.setFormatter(formatter)
+console_handler.setFormatter(formatter)
+
+logger.addHandler(file_handler)
+logger.addHandler(console_handler)
diff --git a/python/rocketmq/message.py b/python/rocketmq/message.py
new file mode 100644
index 00000000..f20e8651
--- /dev/null
+++ b/python/rocketmq/message.py
@@ -0,0 +1,125 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from rocketmq.message_id import MessageId
+
+
+class Message:
+ def __init__(
+ self,
+ topic: str,
+ body: bytes,
+ properties: map = None,
+ tag: str = None,
+ keys: str = None,
+ message_group: str = None,
+ delivery_timestamp: int = None,
+ ):
+ if properties is None:
+ properties = {}
+ self.__topic = topic
+ self.__body = body
+ self.__properties = properties
+ self.__tag = tag
+ self.__keys = keys
+ self.__message_group = message_group
+ self.__delivery_timestamp = delivery_timestamp
+
+ @property
+ def topic(self):
+ return self.__topic
+
+ @property
+ def body(self):
+ return self.__body
+
+ @property
+ def properties(self):
+ return self.__properties
+
+ @property
+ def tag(self):
+ return self.__tag
+
+ @property
+ def keys(self):
+ return self.__keys
+
+ @property
+ def message_group(self):
+ return self.__message_group
+
+ @property
+ def delivery_timestamp(self):
+ return self.__delivery_timestamp
+
+
+class MessageView:
+ def __init__(
+ self,
+ message_id: MessageId,
+ topic: str,
+ body: bytes,
+ properties: map,
+ tag: str,
+ keys: str,
+ message_group: str,
+ delivery_timestamp: int,
+ born_host: str,
+ delivery_attempt: int,
+ ):
+ self.__message_id = message_id
+ self.__topic = topic
+ self.__body = body
+ self.__properties = properties
+ self.__tag = tag
+ self.__keys = keys
+ self.__message_group = message_group
+ self.__delivery_timestamp = delivery_timestamp
+ self.__born_host = born_host
+ self.__delivery_attempt = delivery_attempt
+
+ @property
+ def topic(self):
+ return self.__topic
+
+ @property
+ def message_id(self):
+ return self.__message_id
+
+ @property
+ def born_host(self):
+ return self.__born_host
+
+ @property
+ def keys(self):
+ return self.__keys
+
+ @property
+ def properties(self):
+ return self.__properties
+
+ @property
+ def tag(self):
+ return self.__tag
+
+ @property
+ def message_group(self):
+ return self.__message_group
+
+ @property
+ def delivery_timestamp(self):
+ return self.__delivery_timestamp
diff --git a/python/rocketmq/message_id.py b/python/rocketmq/message_id.py
new file mode 100644
index 00000000..9bb9176b
--- /dev/null
+++ b/python/rocketmq/message_id.py
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class MessageId:
+ def __init__(self, version: str, suffix: str):
+ self.__version = version
+ self.__suffix = suffix
diff --git a/python/rocketmq/send_receipt.py b/python/rocketmq/send_receipt.py
new file mode 100644
index 00000000..3aaa5978
--- /dev/null
+++ b/python/rocketmq/send_receipt.py
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from rocketmq.message_id import MessageId
+
+
+class SendReceipt:
+ def __init__(self, message_id: MessageId):
+ self.__message_id = message_id
+
+ @property
+ def message_id(self):
+ return self.__message_id
diff --git a/python/rocketmq/signature.py b/python/rocketmq/signature.py
new file mode 100644
index 00000000..c77a355a
--- /dev/null
+++ b/python/rocketmq/signature.py
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import importlib.metadata
+import uuid
+
+from rocketmq.client_config import ClientConfig
+from rocketmq.utils import sign
+
+
+class Signature:
+ __AUTHORIZATION_KEY = "authorization"
+
+ __DATE_TIME_KEY = "x-mq-date-time"
+ __DATE_TIME_FORMAT = "%Y%m%dT%H%M%SZ"
+
+ __SESSION_TOKEN_KEY = "x-mq-session-token"
+ __CLIENT_ID_KEY = "x-mq-client-id"
+ __REQUEST_ID_KEY = "x-mq-request-id"
+ __LANGUAGE_KEY = "x-mq-language"
+ __CLIENT_VERSION_KEY = "x-mq-client-version"
+ __PROTOCOL_VERSION = "x-mq-protocol"
+
+ __ALGORITHM = "MQv2-HMAC-SHA1"
+ __CREDENTIAL = "Credential"
+ __SIGNED_HEADERS = "SignedHeaders"
+ __SIGNATURE = "Signature"
+
+ @staticmethod
+ def sign(client_config: ClientConfig, client_id: str):
+ date_time =
datetime.datetime.now().strftime(Signature.__DATE_TIME_FORMAT)
+ metadata = [
+ (Signature.__LANGUAGE_KEY, "PYTHON"),
+ (Signature.__PROTOCOL_VERSION, "v2"),
+ (Signature.__CLIENT_VERSION_KEY,
importlib.metadata.version("rocketmq")),
+ (
+ Signature.__DATE_TIME_KEY,
+ date_time,
+ ),
+ (Signature.__REQUEST_ID_KEY, uuid.uuid4()),
+ (Signature.__CLIENT_ID_KEY, client_id),
+ ]
+ if not client_config.session_credentials_provider:
+ return metadata
+ session_credentials = (
+ client_config.session_credentials_provider.session_credentials()
+ )
+ if not session_credentials:
+ return metadata
+ if session_credentials.security_token:
+ metadata.append(
+ (Signature.__SESSION_TOKEN_KEY,
session_credentials.security_token)
+ )
+ if (not session_credentials.access_key) or (
+ not session_credentials.access_secret
+ ):
+ return metadata
+ signature = sign(session_credentials.access_key, date_time)
+ authorization = (
+ Signature.__ALGORITHM
+ + " "
+ + Signature.__CREDENTIAL
+ + "="
+ + session_credentials.access_key
+ + ", "
+ + Signature.__SIGNED_HEADERS
+ + "="
+ + Signature.__DATE_TIME_KEY
+ + ", "
+ + Signature.__SIGNATURE
+ + "="
+ + signature
+ )
+ metadata.append((Signature.__AUTHORIZATION_KEY, authorization))
+ return metadata
diff --git a/python/rocketmq/utils.py b/python/rocketmq/utils.py
new file mode 100644
index 00000000..adec636b
--- /dev/null
+++ b/python/rocketmq/utils.py
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import hashlib
+import hmac
+
+
+def number_to_base(number, base):
+ alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+ if number == 0:
+ return alphabet[0]
+
+ result = []
+ while number:
+ number, remainder = divmod(number, base)
+ result.append(alphabet[remainder])
+
+ return "".join(reversed(result))
+
+
+def sign(access_secret: str, datetime: str) -> str:
+ digester = hmac.new(
+ bytes(access_secret, encoding="UTF-8"),
+ bytes(datetime, encoding="UTF-8"),
+ hashlib.sha1,
+ )
+ return digester.hexdigest().upper()