This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new eecb3a67 [ISSUE #891] Implement message recalling API in C# SDK
eecb3a67 is described below

commit eecb3a678abf2acc38d741fe6f23c0bc33d1d925
Author: Jack Tsai <tsunghanjackt...@outlook.com>
AuthorDate: Wed Jan 22 09:56:15 2025 +0800

    [ISSUE #891] Implement message recalling API in C# SDK
---
 csharp/rocketmq-client-csharp/ClientManager.cs     |  9 +++++
 csharp/rocketmq-client-csharp/IClientManager.cs    | 10 ++++++
 .../{ISendReceipt.cs => IRecallReceipt.cs}         |  2 +-
 csharp/rocketmq-client-csharp/IRpcClient.cs        |  2 ++
 csharp/rocketmq-client-csharp/ISendReceipt.cs      |  1 +
 csharp/rocketmq-client-csharp/Producer.cs          | 27 ++++++++++++++
 .../{ISendReceipt.cs => RecallReceipt.cs}          | 14 ++++++--
 csharp/rocketmq-client-csharp/RpcClient.cs         |  9 +++++
 csharp/rocketmq-client-csharp/SendReceipt.cs       | 12 +++++--
 csharp/tests/ClientManagerTest.cs                  |  9 +++++
 csharp/tests/ProducerTest.cs                       | 42 ++++++++++++++++++++++
 11 files changed, 131 insertions(+), 6 deletions(-)

diff --git a/csharp/rocketmq-client-csharp/ClientManager.cs 
b/csharp/rocketmq-client-csharp/ClientManager.cs
index e42a29da..b061b5c9 100644
--- a/csharp/rocketmq-client-csharp/ClientManager.cs
+++ b/csharp/rocketmq-client-csharp/ClientManager.cs
@@ -120,6 +120,15 @@ namespace Org.Apache.Rocketmq
                 request, response, metadata);
         }
 
+        public async Task<RpcInvocation<Proto.RecallMessageRequest, 
Proto.RecallMessageResponse>>
+            RecallMessage(Endpoints endpoints, Proto.RecallMessageRequest 
request, TimeSpan timeout)
+        {
+            var metadata = _client.Sign();
+            var response = await 
GetRpcClient(endpoints).RecallMessage(metadata, request, timeout);
+            return new RpcInvocation<Proto.RecallMessageRequest, 
Proto.RecallMessageResponse>(
+                request, response, metadata);
+        }
+
         public async Task<RpcInvocation<Proto.SendMessageRequest, 
Proto.SendMessageResponse>> SendMessage(
             Endpoints endpoints, Proto::SendMessageRequest request, TimeSpan 
timeout)
         {
diff --git a/csharp/rocketmq-client-csharp/IClientManager.cs 
b/csharp/rocketmq-client-csharp/IClientManager.cs
index 743df9fe..62d733e9 100644
--- a/csharp/rocketmq-client-csharp/IClientManager.cs
+++ b/csharp/rocketmq-client-csharp/IClientManager.cs
@@ -62,6 +62,16 @@ namespace Org.Apache.Rocketmq
         Task<RpcInvocation<NotifyClientTerminationRequest, 
NotifyClientTerminationResponse>> NotifyClientTermination(
             Endpoints endpoints, NotifyClientTerminationRequest request, 
TimeSpan timeout);
 
+        /// <summary>
+        /// Recall messages.
+        /// </summary>
+        /// <param name="endpoints">The target endpoints.</param>
+        /// <param name="request">gRPC request of recalling messages.</param>
+        /// <param name="timeout">Request max duration.</param>
+        /// <returns>Task of response.</returns>
+        Task<RpcInvocation<RecallMessageRequest, RecallMessageResponse>> 
RecallMessage(
+            Endpoints endpoints, RecallMessageRequest request, TimeSpan 
timeout);
+
         /// <summary>
         /// Send message to remote endpoints.
         /// </summary>
diff --git a/csharp/rocketmq-client-csharp/ISendReceipt.cs 
b/csharp/rocketmq-client-csharp/IRecallReceipt.cs
similarity index 95%
copy from csharp/rocketmq-client-csharp/ISendReceipt.cs
copy to csharp/rocketmq-client-csharp/IRecallReceipt.cs
index f1004b5b..8291cd66 100644
--- a/csharp/rocketmq-client-csharp/ISendReceipt.cs
+++ b/csharp/rocketmq-client-csharp/IRecallReceipt.cs
@@ -17,7 +17,7 @@
 
 namespace Org.Apache.Rocketmq
 {
-    public interface ISendReceipt
+    public interface IRecallReceipt
     {
         string MessageId { get; }
     }
diff --git a/csharp/rocketmq-client-csharp/IRpcClient.cs 
b/csharp/rocketmq-client-csharp/IRpcClient.cs
index 8145ea18..eb369c2d 100644
--- a/csharp/rocketmq-client-csharp/IRpcClient.cs
+++ b/csharp/rocketmq-client-csharp/IRpcClient.cs
@@ -52,6 +52,8 @@ namespace Org.Apache.Rocketmq
         Task<NotifyClientTerminationResponse> NotifyClientTermination(Metadata 
metadata,
             NotifyClientTerminationRequest request, TimeSpan timeout);
 
+        Task<RecallMessageResponse> RecallMessage(Metadata metadata, 
RecallMessageRequest request, TimeSpan timeout);
+
         Task Shutdown();
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/ISendReceipt.cs 
b/csharp/rocketmq-client-csharp/ISendReceipt.cs
index f1004b5b..eeba4e03 100644
--- a/csharp/rocketmq-client-csharp/ISendReceipt.cs
+++ b/csharp/rocketmq-client-csharp/ISendReceipt.cs
@@ -20,5 +20,6 @@ namespace Org.Apache.Rocketmq
     public interface ISendReceipt
     {
         string MessageId { get; }
+        string RecallHandle { get; }
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Producer.cs 
b/csharp/rocketmq-client-csharp/Producer.cs
index 1986182f..0d4be45c 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -342,6 +342,33 @@ namespace Org.Apache.Rocketmq
             StatusChecker.Check(invocation.Response.Status, request, 
invocation.RequestId);
         }
 
+        public async Task<IRecallReceipt> RecallMessage(string topic, string 
recallhandle)
+        {
+            var recallReceipt = await RecallMessage0(topic, recallhandle);
+            return recallReceipt;
+        }
+
+        private async Task<RecallReceipt> RecallMessage0(string topic, string 
recallhandle)
+        {
+            if (State.Running != State)
+            {
+                throw new InvalidOperationException("Producer is not running");
+            }
+            if (recallhandle == null)
+            {
+                throw new InvalidOperationException("Recall handle is 
invalid");
+            }
+            var request = new Proto.RecallMessageRequest
+            {
+                Topic = new Proto.Resource { ResourceNamespace = 
ClientConfig.Namespace, Name = topic },
+                RecallHandle = recallhandle
+            };
+            var invocation =
+                await ClientManager.RecallMessage(new 
Endpoints(ClientConfig.Endpoints), request, ClientConfig.RequestTimeout);
+            StatusChecker.Check(invocation.Response.Status, request, 
invocation.RequestId);
+            return new RecallReceipt(invocation.Response.MessageId);
+        }
+
         public class Builder
         {
             private ClientConfig _clientConfig;
diff --git a/csharp/rocketmq-client-csharp/ISendReceipt.cs 
b/csharp/rocketmq-client-csharp/RecallReceipt.cs
similarity index 72%
copy from csharp/rocketmq-client-csharp/ISendReceipt.cs
copy to csharp/rocketmq-client-csharp/RecallReceipt.cs
index f1004b5b..80cf120c 100644
--- a/csharp/rocketmq-client-csharp/ISendReceipt.cs
+++ b/csharp/rocketmq-client-csharp/RecallReceipt.cs
@@ -17,8 +17,18 @@
 
 namespace Org.Apache.Rocketmq
 {
-    public interface ISendReceipt
+    public sealed class RecallReceipt : IRecallReceipt
     {
-        string MessageId { get; }
+        public RecallReceipt(string messageId)
+        {
+            MessageId = messageId;
+        }
+
+        public string MessageId { get; }
+
+        public override string ToString()
+        {
+            return $"{nameof(MessageId)}: {MessageId}";
+        }
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/RpcClient.cs 
b/csharp/rocketmq-client-csharp/RpcClient.cs
index eeff96e5..c6540c8e 100644
--- a/csharp/rocketmq-client-csharp/RpcClient.cs
+++ b/csharp/rocketmq-client-csharp/RpcClient.cs
@@ -189,5 +189,14 @@ namespace Org.Apache.Rocketmq
             var call = _stub.NotifyClientTerminationAsync(request, 
callOptions);
             return await call.ResponseAsync;
         }
+
+        public async Task<Proto::RecallMessageResponse> RecallMessage(Metadata 
metadata, Proto.RecallMessageRequest request, TimeSpan timeout)
+        {
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new CallOptions(metadata, deadline);
+
+            var call = _stub.RecallMessageAsync(request, callOptions);
+            return await call.ResponseAsync;
+        }
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/SendReceipt.cs 
b/csharp/rocketmq-client-csharp/SendReceipt.cs
index c9fe8014..2a49a817 100644
--- a/csharp/rocketmq-client-csharp/SendReceipt.cs
+++ b/csharp/rocketmq-client-csharp/SendReceipt.cs
@@ -23,15 +23,21 @@ namespace Org.Apache.Rocketmq
 {
     public sealed class SendReceipt : ISendReceipt
     {
-        private SendReceipt(string messageId, string transactionId, 
MessageQueue messageQueue)
+        private SendReceipt(string messageId, string transactionId, 
MessageQueue messageQueue, long offset, string recallHandle)
         {
             MessageId = messageId;
             TransactionId = transactionId;
             MessageQueue = messageQueue;
+            Offset = offset;
+            RecallHandle = recallHandle;
         }
 
         public string MessageId { get; }
 
+        public string RecallHandle { get; }
+
+        public long Offset { get; }
+
         public string TransactionId { get; }
 
         private MessageQueue MessageQueue { get; }
@@ -40,7 +46,7 @@ namespace Org.Apache.Rocketmq
 
         public override string ToString()
         {
-            return $"{nameof(MessageId)}: {MessageId}";
+            return $"{nameof(MessageId)}: {MessageId}, {nameof(RecallHandle)}: 
{RecallHandle}";
         }
 
         public static IEnumerable<SendReceipt> 
ProcessSendMessageResponse(MessageQueue mq,
@@ -58,7 +64,7 @@ namespace Org.Apache.Rocketmq
 
             // May throw exception.
             StatusChecker.Check(status, invocation.Request, 
invocation.RequestId);
-            return invocation.Response.Entries.Select(entry => new 
SendReceipt(entry.MessageId, entry.TransactionId, mq)).ToList();
+            return invocation.Response.Entries.Select(entry => new 
SendReceipt(entry.MessageId, entry.TransactionId, mq, entry.Offset, 
entry.RecallHandle)).ToList();
         }
     }
 }
\ No newline at end of file
diff --git a/csharp/tests/ClientManagerTest.cs 
b/csharp/tests/ClientManagerTest.cs
index 5e4e7eef..be3697d6 100644
--- a/csharp/tests/ClientManagerTest.cs
+++ b/csharp/tests/ClientManagerTest.cs
@@ -121,6 +121,15 @@ namespace tests
             // Expect no exception thrown.
         }
 
+        [TestMethod]
+        public void TestRecallMessage()
+        {
+            var request = new RecallMessageRequest();
+            _clientManager.RecallMessage(FakeEndpoints, request, 
TimeSpan.FromSeconds(1));
+            _clientManager.RecallMessage(null, request, 
TimeSpan.FromSeconds(1));
+            // Expect no exception thrown.
+        }
+
         private Client CreateTestClient()
         {
             return new Producer(_clientConfig, new 
ConcurrentDictionary<string, bool>(), 1, null);
diff --git a/csharp/tests/ProducerTest.cs b/csharp/tests/ProducerTest.cs
index ce0cca15..8fe53000 100644
--- a/csharp/tests/ProducerTest.cs
+++ b/csharp/tests/ProducerTest.cs
@@ -96,6 +96,48 @@ namespace tests
                 It.IsAny<Proto.SendMessageRequest>(), It.IsAny<TimeSpan>()), 
Times.Exactly(maxAttempts));
         }
 
+        [TestMethod]
+        public async Task TestRecall()
+        {
+            var producer = CreateTestClient();
+            producer.State = State.Running;
+            var metadata = producer.Sign();
+            var recallReceipt = new 
RecallReceipt(MessageIdGenerator.GetInstance().Next());
+            var recallMessageResponse = new Proto.RecallMessageResponse
+            {
+                Status = new Proto.Status
+                {
+                    Code = Proto.Code.Ok
+                },
+                MessageId = recallReceipt.MessageId
+            };
+            var recallMessageInvocation = new 
RpcInvocation<Proto.RecallMessageRequest, Proto.RecallMessageResponse>(null,
+                recallMessageResponse, metadata);
+            var mockClientManager = new Mock<IClientManager>();
+            producer.SetClientManager(mockClientManager.Object);
+            mockClientManager.Setup(cm => 
cm.RecallMessage(It.IsAny<Endpoints>(),
+                It.IsAny<Proto.RecallMessageRequest>(), 
It.IsAny<TimeSpan>())).Returns(Task.FromResult(recallMessageInvocation));
+            await producer.RecallMessage("testTopic", "handle");
+            mockClientManager.Verify(cm => 
cm.RecallMessage(It.IsAny<Endpoints>(),
+                It.IsAny<Proto.RecallMessageRequest>(), It.IsAny<TimeSpan>()), 
Times.Once);
+        }
+
+        [TestMethod]
+        [ExpectedException(typeof(ArgumentException))]
+        public async Task TestRecallFailure()
+        {
+            var producer = CreateTestClient();
+            producer.State = State.Running;
+            var mockClientManager = new Mock<IClientManager>();
+            producer.SetClientManager(mockClientManager.Object);
+            var exception = new ArgumentException();
+            mockClientManager.Setup(cm => 
cm.RecallMessage(It.IsAny<Endpoints>(),
+                It.IsAny<Proto.RecallMessageRequest>(), 
It.IsAny<TimeSpan>())).Throws(exception);
+            await producer.RecallMessage("testTopic", "handle");
+            mockClientManager.Verify(cm => 
cm.RecallMessage(It.IsAny<Endpoints>(),
+                It.IsAny<Proto.RecallMessageRequest>(), It.IsAny<TimeSpan>()), 
Times.Once);
+        }
+
         private Producer CreateTestClient()
         {
             const string host0 = "127.0.0.1";

Reply via email to