This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 34049364 complete stream rpc function & rpc_client_mock (#528)
34049364 is described below
commit 34049364a1211cf1452e21f1ea4e7057959846ea
Author: Yan Chao Mei <[email protected]>
AuthorDate: Wed Jun 7 11:33:20 2023 +0800
complete stream rpc function & rpc_client_mock (#528)
* stream rpc functions
* use log
---
python/rocketmq/rpc_client.py | 34 ++++++++++++++++++++++++++++++++++
1 file changed, 34 insertions(+)
diff --git a/python/rocketmq/rpc_client.py b/python/rocketmq/rpc_client.py
index 420559c1..545419c7 100644
--- a/python/rocketmq/rpc_client.py
+++ b/python/rocketmq/rpc_client.py
@@ -67,6 +67,20 @@ class RpcClient:
):
return await self.__stub.SendMessage(request, timeout=timeout_seconds)
+ async def receive_message(
+ self, request: service_pb2.ReceiveMessageRequest, timeout_seconds: int
+ ):
+ results = self.__stub.ReceiveMessage(request, timeout=timeout_seconds)
+ response = []
+ try:
+ async for result in results:
+ if result.HasField('message'):
+ response.append(result.message)
+ except Exception as e:
+ logger.info("An error occurred: %s", e)
+ # Handle error as appropriate for your use case
+ return response
+
async def query_assignment(
self, request: service_pb2.QueryAssignmentRequest, timeout_seconds: int
):
@@ -105,6 +119,26 @@ class RpcClient:
request, timeout=timeout_seconds
)
+ async def send_requests(self, requests, stream):
+ for request in requests:
+ await stream.send_message(request)
+
+ async def telemetry(
+ self, timeout_seconds: int, requests
+ ):
+ responses = []
+ async with self.__stub.Telemetry() as stream:
+ # Create a task for sending requests
+ send_task = asyncio.create_task(self.send_requests(requests,
stream))
+ # Receiving responses
+ async for response in stream:
+ responses.append(response)
+
+ # Await the send task to ensure all requests have been sent
+ await send_task
+
+ return responses
+
async def test():
client = RpcClient("rmq-cn-72u353icd01.cn-hangzhou.rmq.aliyuncs.com:8080")