This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new c334f25e Implement a series of features (#538)
c334f25e is described below

commit c334f25e8613db953f92668deaf225ba539de3e9
Author: Aaron Ai <[email protected]>
AuthorDate: Mon Jun 12 14:49:12 2023 +0800

    Implement a series of features (#538)
    
    * Implement a series of features
    
    1. Encoder for client id
    2. Enrich the logging information
    3. Impelement authentication algorithm
    
    * Fix style issue
---
 python/rocketmq/client_config.py     |  33 +++++++++
 python/rocketmq/client_id_encoder.py |  47 ++++++++++++
 python/rocketmq/client_manager.py    | 137 +++++++++++++++++++++++++++++++++++
 python/rocketmq/log.py               |  38 ++++++++++
 python/rocketmq/message.py           | 125 ++++++++++++++++++++++++++++++++
 python/rocketmq/message_id.py        |  20 +++++
 python/rocketmq/send_receipt.py      |  25 +++++++
 python/rocketmq/signature.py         |  88 ++++++++++++++++++++++
 python/rocketmq/utils.py             |  39 ++++++++++
 9 files changed, 552 insertions(+)

diff --git a/python/rocketmq/client_config.py b/python/rocketmq/client_config.py
new file mode 100644
index 00000000..74f9a8ee
--- /dev/null
+++ b/python/rocketmq/client_config.py
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class ClientConfig:
+    def __init__(self, endpoints: str, session_credentials_provider, 
ssl_enabled: bool):
+        self.__endpoints = endpoints
+        self.__session_credentials_provider = session_credentials_provider
+        self.__ssl_enabled = ssl_enabled
+
+    @property
+    def session_credentials_provider(self):
+        return self.__session_credentials_provider
+
+    @property
+    def endpoints(self):
+        return self.__endpoints
+
+    @property
+    def ssl_enabled(self):
+        return self.__ssl_enabled
diff --git a/python/rocketmq/client_id_encoder.py 
b/python/rocketmq/client_id_encoder.py
new file mode 100644
index 00000000..138b05f0
--- /dev/null
+++ b/python/rocketmq/client_id_encoder.py
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import socket
+import threading
+import time
+
+import rocketmq.utils
+
+
+class ClientIdEncoder:
+    __INDEX = 0
+    __INDEX_LOCK = threading.Lock()
+    __CLIENT_ID_SEPARATOR = "@"
+
+    @staticmethod
+    def __get_and_increment_sequence():
+        with ClientIdEncoder.__INDEX_LOCK:
+            temp = ClientIdEncoder.__INDEX
+            ClientIdEncoder.__INDEX += 1
+            return temp
+
+    @staticmethod
+    def generate() -> str:
+        index = ClientIdEncoder.__get_and_increment_sequence()
+        return (
+            socket.gethostname()
+            + ClientIdEncoder.__CLIENT_ID_SEPARATOR
+            + str(os.getpid())
+            + ClientIdEncoder.__CLIENT_ID_SEPARATOR
+            + str(index)
+            + ClientIdEncoder.__CLIENT_ID_SEPARATOR
+            + str(rocketmq.utils.number_to_base(time.monotonic_ns(), 36))
+        )
diff --git a/python/rocketmq/client_manager.py 
b/python/rocketmq/client_manager.py
new file mode 100644
index 00000000..dd13b2f5
--- /dev/null
+++ b/python/rocketmq/client_manager.py
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import threading
+
+from protocol import service_pb2
+from rocketmq.client import Client
+from rocketmq.rpc_client import Endpoints, RpcClient
+
+
+class ClientManager:
+    def __init__(self, client: Client):
+        self.__client = client
+        self.__rpc_clients = {}
+        self.__rpc_clients_lock = threading.Lock()
+
+    def __get_rpc_client(self, endpoints: Endpoints, ssl_enabled: bool) -> 
RpcClient:
+        with self.__rpc_clients_lock:
+            rpc_client = self.__rpc_clients.get(endpoints)
+            if rpc_client:
+                return rpc_client
+            rpc_client = RpcClient(endpoints.get_target(), ssl_enabled)
+            self.__rpc_clients[endpoints] = rpc_client
+            return rpc_client
+
+    async def query_route(
+        self,
+        endpoints: Endpoints,
+        request: service_pb2.QueryRouteRequest,
+        timeout_seconds: int,
+    ):
+        rpc_client = self.__get_rpc_client(
+            endpoints, self.__client.client_config.ssl_enabled
+        )
+        return await rpc_client.query_route(request, timeout_seconds)
+
+    async def heartbeat(
+        self,
+        endpoints: Endpoints,
+        request: service_pb2.HeartbeatRequest,
+        timeout_seconds: int,
+    ):
+        rpc_client = self.__get_rpc_client(
+            endpoints, self.__client.client_config.ssl_enabled
+        )
+        return await rpc_client.heartbeat(request, timeout_seconds)
+
+    async def send_message(
+        self,
+        endpoints: Endpoints,
+        request: service_pb2.SendMessageRequest,
+        timeout_seconds: int,
+    ):
+        rpc_client = self.__get_rpc_client(
+            endpoints, self.__client.client_config.ssl_enabled
+        )
+        return await rpc_client.send_message(request, timeout_seconds)
+
+    async def query_assignment(
+        self,
+        endpoints: Endpoints,
+        request: service_pb2.QueryAssignmentRequest,
+        timeout_seconds: int,
+    ):
+        rpc_client = self.__get_rpc_client(
+            endpoints, self.__client.client_config.ssl_enabled
+        )
+        return await rpc_client.query_assignment(request, timeout_seconds)
+
+    async def ack_message(
+        self,
+        endpoints: Endpoints,
+        request: service_pb2.AckMessageRequest,
+        timeout_seconds: int,
+    ):
+        rpc_client = self.__get_rpc_client(
+            endpoints, self.__client.client_config.ssl_enabled
+        )
+        return await rpc_client.ack_message(request, timeout_seconds)
+
+    async def forward_message_to_dead_letter_queue(
+        self,
+        endpoints: Endpoints,
+        request: service_pb2.ForwardMessageToDeadLetterQueueRequest,
+        timeout_seconds: int,
+    ):
+        rpc_client = self.__get_rpc_client(
+            endpoints, self.__client.client_config.ssl_enabled
+        )
+        return await rpc_client.forward_message_to_dead_letter_queue(
+            request, timeout_seconds
+        )
+
+    async def end_transaction(
+        self,
+        endpoints: Endpoints,
+        request: service_pb2.EndTransactionRequest,
+        timeout_seconds: int,
+    ):
+        rpc_client = self.__get_rpc_client(
+            endpoints, self.__client.client_config.ssl_enabled
+        )
+        return await rpc_client.end_transaction(request, timeout_seconds)
+
+    async def notify_client_termination(
+        self,
+        endpoints: Endpoints,
+        request: service_pb2.NotifyClientTerminationRequest,
+        timeout_seconds: int,
+    ):
+        rpc_client = self.__get_rpc_client(
+            endpoints, self.__client.client_config.ssl_enabled
+        )
+        return await rpc_client.notify_client_termination(request, 
timeout_seconds)
+
+    async def change_invisible_duration(
+        self,
+        endpoints: Endpoints,
+        request: service_pb2.ChangeInvisibleDurationRequest,
+        timeout_seconds: int,
+    ):
+        rpc_client = self.__get_rpc_client(
+            endpoints, self.__client.client_config.ssl_enabled
+        )
+        return await rpc_client.change_invisible_duration(request, 
timeout_seconds)
diff --git a/python/rocketmq/log.py b/python/rocketmq/log.py
new file mode 100644
index 00000000..f3e4eae3
--- /dev/null
+++ b/python/rocketmq/log.py
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+
+logger = logging.getLogger("rocketmqlogger")
+logger.setLevel(logging.DEBUG)
+
+log_path = os.path.join(
+    os.path.expanduser("~"), "logs", "rocketmq", "rocketmq-client.log"
+)
+file_handler = logging.FileHandler(log_path)
+file_handler.setLevel(logging.DEBUG)
+
+console_handler = logging.StreamHandler()
+console_handler.setLevel(logging.DEBUG)
+
+formatter = logging.Formatter(
+    "%(asctime)s [%(levelname)s] [%(process)d] [%(threadName)s] 
[%(filename)s#%(funcName)s:%(lineno)d] %(message)s"
+)
+file_handler.setFormatter(formatter)
+console_handler.setFormatter(formatter)
+
+logger.addHandler(file_handler)
+logger.addHandler(console_handler)
diff --git a/python/rocketmq/message.py b/python/rocketmq/message.py
new file mode 100644
index 00000000..f20e8651
--- /dev/null
+++ b/python/rocketmq/message.py
@@ -0,0 +1,125 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from rocketmq.message_id import MessageId
+
+
+class Message:
+    def __init__(
+        self,
+        topic: str,
+        body: bytes,
+        properties: map = None,
+        tag: str = None,
+        keys: str = None,
+        message_group: str = None,
+        delivery_timestamp: int = None,
+    ):
+        if properties is None:
+            properties = {}
+        self.__topic = topic
+        self.__body = body
+        self.__properties = properties
+        self.__tag = tag
+        self.__keys = keys
+        self.__message_group = message_group
+        self.__delivery_timestamp = delivery_timestamp
+
+    @property
+    def topic(self):
+        return self.__topic
+
+    @property
+    def body(self):
+        return self.__body
+
+    @property
+    def properties(self):
+        return self.__properties
+
+    @property
+    def tag(self):
+        return self.__tag
+
+    @property
+    def keys(self):
+        return self.__keys
+
+    @property
+    def message_group(self):
+        return self.__message_group
+
+    @property
+    def delivery_timestamp(self):
+        return self.__delivery_timestamp
+
+
+class MessageView:
+    def __init__(
+        self,
+        message_id: MessageId,
+        topic: str,
+        body: bytes,
+        properties: map,
+        tag: str,
+        keys: str,
+        message_group: str,
+        delivery_timestamp: int,
+        born_host: str,
+        delivery_attempt: int,
+    ):
+        self.__message_id = message_id
+        self.__topic = topic
+        self.__body = body
+        self.__properties = properties
+        self.__tag = tag
+        self.__keys = keys
+        self.__message_group = message_group
+        self.__delivery_timestamp = delivery_timestamp
+        self.__born_host = born_host
+        self.__delivery_attempt = delivery_attempt
+
+    @property
+    def topic(self):
+        return self.__topic
+
+    @property
+    def message_id(self):
+        return self.__message_id
+
+    @property
+    def born_host(self):
+        return self.__born_host
+
+    @property
+    def keys(self):
+        return self.__keys
+
+    @property
+    def properties(self):
+        return self.__properties
+
+    @property
+    def tag(self):
+        return self.__tag
+
+    @property
+    def message_group(self):
+        return self.__message_group
+
+    @property
+    def delivery_timestamp(self):
+        return self.__delivery_timestamp
diff --git a/python/rocketmq/message_id.py b/python/rocketmq/message_id.py
new file mode 100644
index 00000000..9bb9176b
--- /dev/null
+++ b/python/rocketmq/message_id.py
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class MessageId:
+    def __init__(self, version: str, suffix: str):
+        self.__version = version
+        self.__suffix = suffix
diff --git a/python/rocketmq/send_receipt.py b/python/rocketmq/send_receipt.py
new file mode 100644
index 00000000..3aaa5978
--- /dev/null
+++ b/python/rocketmq/send_receipt.py
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from rocketmq.message_id import MessageId
+
+
+class SendReceipt:
+    def __init__(self, message_id: MessageId):
+        self.__message_id = message_id
+
+    @property
+    def message_id(self):
+        return self.__message_id
diff --git a/python/rocketmq/signature.py b/python/rocketmq/signature.py
new file mode 100644
index 00000000..c77a355a
--- /dev/null
+++ b/python/rocketmq/signature.py
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import importlib.metadata
+import uuid
+
+from rocketmq.client_config import ClientConfig
+from rocketmq.utils import sign
+
+
+class Signature:
+    __AUTHORIZATION_KEY = "authorization"
+
+    __DATE_TIME_KEY = "x-mq-date-time"
+    __DATE_TIME_FORMAT = "%Y%m%dT%H%M%SZ"
+
+    __SESSION_TOKEN_KEY = "x-mq-session-token"
+    __CLIENT_ID_KEY = "x-mq-client-id"
+    __REQUEST_ID_KEY = "x-mq-request-id"
+    __LANGUAGE_KEY = "x-mq-language"
+    __CLIENT_VERSION_KEY = "x-mq-client-version"
+    __PROTOCOL_VERSION = "x-mq-protocol"
+
+    __ALGORITHM = "MQv2-HMAC-SHA1"
+    __CREDENTIAL = "Credential"
+    __SIGNED_HEADERS = "SignedHeaders"
+    __SIGNATURE = "Signature"
+
+    @staticmethod
+    def sign(client_config: ClientConfig, client_id: str):
+        date_time = 
datetime.datetime.now().strftime(Signature.__DATE_TIME_FORMAT)
+        metadata = [
+            (Signature.__LANGUAGE_KEY, "PYTHON"),
+            (Signature.__PROTOCOL_VERSION, "v2"),
+            (Signature.__CLIENT_VERSION_KEY, 
importlib.metadata.version("rocketmq")),
+            (
+                Signature.__DATE_TIME_KEY,
+                date_time,
+            ),
+            (Signature.__REQUEST_ID_KEY, uuid.uuid4()),
+            (Signature.__CLIENT_ID_KEY, client_id),
+        ]
+        if not client_config.session_credentials_provider:
+            return metadata
+        session_credentials = (
+            client_config.session_credentials_provider.session_credentials()
+        )
+        if not session_credentials:
+            return metadata
+        if session_credentials.security_token:
+            metadata.append(
+                (Signature.__SESSION_TOKEN_KEY, 
session_credentials.security_token)
+            )
+        if (not session_credentials.access_key) or (
+            not session_credentials.access_secret
+        ):
+            return metadata
+        signature = sign(session_credentials.access_key, date_time)
+        authorization = (
+            Signature.__ALGORITHM
+            + " "
+            + Signature.__CREDENTIAL
+            + "="
+            + session_credentials.access_key
+            + ", "
+            + Signature.__SIGNED_HEADERS
+            + "="
+            + Signature.__DATE_TIME_KEY
+            + ", "
+            + Signature.__SIGNATURE
+            + "="
+            + signature
+        )
+        metadata.append((Signature.__AUTHORIZATION_KEY, authorization))
+        return metadata
diff --git a/python/rocketmq/utils.py b/python/rocketmq/utils.py
new file mode 100644
index 00000000..adec636b
--- /dev/null
+++ b/python/rocketmq/utils.py
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import hashlib
+import hmac
+
+
+def number_to_base(number, base):
+    alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+    if number == 0:
+        return alphabet[0]
+
+    result = []
+    while number:
+        number, remainder = divmod(number, base)
+        result.append(alphabet[remainder])
+
+    return "".join(reversed(result))
+
+
+def sign(access_secret: str, datetime: str) -> str:
+    digester = hmac.new(
+        bytes(access_secret, encoding="UTF-8"),
+        bytes(datetime, encoding="UTF-8"),
+        hashlib.sha1,
+    )
+    return digester.hexdigest().upper()

Reply via email to