This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push: new eecb3a67 [ISSUE #891] Implement message recalling API in C# SDK eecb3a67 is described below commit eecb3a678abf2acc38d741fe6f23c0bc33d1d925 Author: Jack Tsai <tsunghanjackt...@outlook.com> AuthorDate: Wed Jan 22 09:56:15 2025 +0800 [ISSUE #891] Implement message recalling API in C# SDK --- csharp/rocketmq-client-csharp/ClientManager.cs | 9 +++++ csharp/rocketmq-client-csharp/IClientManager.cs | 10 ++++++ .../{ISendReceipt.cs => IRecallReceipt.cs} | 2 +- csharp/rocketmq-client-csharp/IRpcClient.cs | 2 ++ csharp/rocketmq-client-csharp/ISendReceipt.cs | 1 + csharp/rocketmq-client-csharp/Producer.cs | 27 ++++++++++++++ .../{ISendReceipt.cs => RecallReceipt.cs} | 14 ++++++-- csharp/rocketmq-client-csharp/RpcClient.cs | 9 +++++ csharp/rocketmq-client-csharp/SendReceipt.cs | 12 +++++-- csharp/tests/ClientManagerTest.cs | 9 +++++ csharp/tests/ProducerTest.cs | 42 ++++++++++++++++++++++ 11 files changed, 131 insertions(+), 6 deletions(-) diff --git a/csharp/rocketmq-client-csharp/ClientManager.cs b/csharp/rocketmq-client-csharp/ClientManager.cs index e42a29da..b061b5c9 100644 --- a/csharp/rocketmq-client-csharp/ClientManager.cs +++ b/csharp/rocketmq-client-csharp/ClientManager.cs @@ -120,6 +120,15 @@ namespace Org.Apache.Rocketmq request, response, metadata); } + public async Task<RpcInvocation<Proto.RecallMessageRequest, Proto.RecallMessageResponse>> + RecallMessage(Endpoints endpoints, Proto.RecallMessageRequest request, TimeSpan timeout) + { + var metadata = _client.Sign(); + var response = await GetRpcClient(endpoints).RecallMessage(metadata, request, timeout); + return new RpcInvocation<Proto.RecallMessageRequest, Proto.RecallMessageResponse>( + request, response, metadata); + } + public async Task<RpcInvocation<Proto.SendMessageRequest, Proto.SendMessageResponse>> SendMessage( Endpoints endpoints, Proto::SendMessageRequest request, TimeSpan timeout) { diff --git a/csharp/rocketmq-client-csharp/IClientManager.cs b/csharp/rocketmq-client-csharp/IClientManager.cs index 743df9fe..62d733e9 100644 --- a/csharp/rocketmq-client-csharp/IClientManager.cs +++ b/csharp/rocketmq-client-csharp/IClientManager.cs @@ -62,6 +62,16 @@ namespace Org.Apache.Rocketmq Task<RpcInvocation<NotifyClientTerminationRequest, NotifyClientTerminationResponse>> NotifyClientTermination( Endpoints endpoints, NotifyClientTerminationRequest request, TimeSpan timeout); + /// <summary> + /// Recall messages. + /// </summary> + /// <param name="endpoints">The target endpoints.</param> + /// <param name="request">gRPC request of recalling messages.</param> + /// <param name="timeout">Request max duration.</param> + /// <returns>Task of response.</returns> + Task<RpcInvocation<RecallMessageRequest, RecallMessageResponse>> RecallMessage( + Endpoints endpoints, RecallMessageRequest request, TimeSpan timeout); + /// <summary> /// Send message to remote endpoints. /// </summary> diff --git a/csharp/rocketmq-client-csharp/ISendReceipt.cs b/csharp/rocketmq-client-csharp/IRecallReceipt.cs similarity index 95% copy from csharp/rocketmq-client-csharp/ISendReceipt.cs copy to csharp/rocketmq-client-csharp/IRecallReceipt.cs index f1004b5b..8291cd66 100644 --- a/csharp/rocketmq-client-csharp/ISendReceipt.cs +++ b/csharp/rocketmq-client-csharp/IRecallReceipt.cs @@ -17,7 +17,7 @@ namespace Org.Apache.Rocketmq { - public interface ISendReceipt + public interface IRecallReceipt { string MessageId { get; } } diff --git a/csharp/rocketmq-client-csharp/IRpcClient.cs b/csharp/rocketmq-client-csharp/IRpcClient.cs index 8145ea18..eb369c2d 100644 --- a/csharp/rocketmq-client-csharp/IRpcClient.cs +++ b/csharp/rocketmq-client-csharp/IRpcClient.cs @@ -52,6 +52,8 @@ namespace Org.Apache.Rocketmq Task<NotifyClientTerminationResponse> NotifyClientTermination(Metadata metadata, NotifyClientTerminationRequest request, TimeSpan timeout); + Task<RecallMessageResponse> RecallMessage(Metadata metadata, RecallMessageRequest request, TimeSpan timeout); + Task Shutdown(); } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/ISendReceipt.cs b/csharp/rocketmq-client-csharp/ISendReceipt.cs index f1004b5b..eeba4e03 100644 --- a/csharp/rocketmq-client-csharp/ISendReceipt.cs +++ b/csharp/rocketmq-client-csharp/ISendReceipt.cs @@ -20,5 +20,6 @@ namespace Org.Apache.Rocketmq public interface ISendReceipt { string MessageId { get; } + string RecallHandle { get; } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs index 1986182f..0d4be45c 100644 --- a/csharp/rocketmq-client-csharp/Producer.cs +++ b/csharp/rocketmq-client-csharp/Producer.cs @@ -342,6 +342,33 @@ namespace Org.Apache.Rocketmq StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId); } + public async Task<IRecallReceipt> RecallMessage(string topic, string recallhandle) + { + var recallReceipt = await RecallMessage0(topic, recallhandle); + return recallReceipt; + } + + private async Task<RecallReceipt> RecallMessage0(string topic, string recallhandle) + { + if (State.Running != State) + { + throw new InvalidOperationException("Producer is not running"); + } + if (recallhandle == null) + { + throw new InvalidOperationException("Recall handle is invalid"); + } + var request = new Proto.RecallMessageRequest + { + Topic = new Proto.Resource { ResourceNamespace = ClientConfig.Namespace, Name = topic }, + RecallHandle = recallhandle + }; + var invocation = + await ClientManager.RecallMessage(new Endpoints(ClientConfig.Endpoints), request, ClientConfig.RequestTimeout); + StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId); + return new RecallReceipt(invocation.Response.MessageId); + } + public class Builder { private ClientConfig _clientConfig; diff --git a/csharp/rocketmq-client-csharp/ISendReceipt.cs b/csharp/rocketmq-client-csharp/RecallReceipt.cs similarity index 72% copy from csharp/rocketmq-client-csharp/ISendReceipt.cs copy to csharp/rocketmq-client-csharp/RecallReceipt.cs index f1004b5b..80cf120c 100644 --- a/csharp/rocketmq-client-csharp/ISendReceipt.cs +++ b/csharp/rocketmq-client-csharp/RecallReceipt.cs @@ -17,8 +17,18 @@ namespace Org.Apache.Rocketmq { - public interface ISendReceipt + public sealed class RecallReceipt : IRecallReceipt { - string MessageId { get; } + public RecallReceipt(string messageId) + { + MessageId = messageId; + } + + public string MessageId { get; } + + public override string ToString() + { + return $"{nameof(MessageId)}: {MessageId}"; + } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/RpcClient.cs b/csharp/rocketmq-client-csharp/RpcClient.cs index eeff96e5..c6540c8e 100644 --- a/csharp/rocketmq-client-csharp/RpcClient.cs +++ b/csharp/rocketmq-client-csharp/RpcClient.cs @@ -189,5 +189,14 @@ namespace Org.Apache.Rocketmq var call = _stub.NotifyClientTerminationAsync(request, callOptions); return await call.ResponseAsync; } + + public async Task<Proto::RecallMessageResponse> RecallMessage(Metadata metadata, Proto.RecallMessageRequest request, TimeSpan timeout) + { + var deadline = DateTime.UtcNow.Add(timeout); + var callOptions = new CallOptions(metadata, deadline); + + var call = _stub.RecallMessageAsync(request, callOptions); + return await call.ResponseAsync; + } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/SendReceipt.cs b/csharp/rocketmq-client-csharp/SendReceipt.cs index c9fe8014..2a49a817 100644 --- a/csharp/rocketmq-client-csharp/SendReceipt.cs +++ b/csharp/rocketmq-client-csharp/SendReceipt.cs @@ -23,15 +23,21 @@ namespace Org.Apache.Rocketmq { public sealed class SendReceipt : ISendReceipt { - private SendReceipt(string messageId, string transactionId, MessageQueue messageQueue) + private SendReceipt(string messageId, string transactionId, MessageQueue messageQueue, long offset, string recallHandle) { MessageId = messageId; TransactionId = transactionId; MessageQueue = messageQueue; + Offset = offset; + RecallHandle = recallHandle; } public string MessageId { get; } + public string RecallHandle { get; } + + public long Offset { get; } + public string TransactionId { get; } private MessageQueue MessageQueue { get; } @@ -40,7 +46,7 @@ namespace Org.Apache.Rocketmq public override string ToString() { - return $"{nameof(MessageId)}: {MessageId}"; + return $"{nameof(MessageId)}: {MessageId}, {nameof(RecallHandle)}: {RecallHandle}"; } public static IEnumerable<SendReceipt> ProcessSendMessageResponse(MessageQueue mq, @@ -58,7 +64,7 @@ namespace Org.Apache.Rocketmq // May throw exception. StatusChecker.Check(status, invocation.Request, invocation.RequestId); - return invocation.Response.Entries.Select(entry => new SendReceipt(entry.MessageId, entry.TransactionId, mq)).ToList(); + return invocation.Response.Entries.Select(entry => new SendReceipt(entry.MessageId, entry.TransactionId, mq, entry.Offset, entry.RecallHandle)).ToList(); } } } \ No newline at end of file diff --git a/csharp/tests/ClientManagerTest.cs b/csharp/tests/ClientManagerTest.cs index 5e4e7eef..be3697d6 100644 --- a/csharp/tests/ClientManagerTest.cs +++ b/csharp/tests/ClientManagerTest.cs @@ -121,6 +121,15 @@ namespace tests // Expect no exception thrown. } + [TestMethod] + public void TestRecallMessage() + { + var request = new RecallMessageRequest(); + _clientManager.RecallMessage(FakeEndpoints, request, TimeSpan.FromSeconds(1)); + _clientManager.RecallMessage(null, request, TimeSpan.FromSeconds(1)); + // Expect no exception thrown. + } + private Client CreateTestClient() { return new Producer(_clientConfig, new ConcurrentDictionary<string, bool>(), 1, null); diff --git a/csharp/tests/ProducerTest.cs b/csharp/tests/ProducerTest.cs index ce0cca15..8fe53000 100644 --- a/csharp/tests/ProducerTest.cs +++ b/csharp/tests/ProducerTest.cs @@ -96,6 +96,48 @@ namespace tests It.IsAny<Proto.SendMessageRequest>(), It.IsAny<TimeSpan>()), Times.Exactly(maxAttempts)); } + [TestMethod] + public async Task TestRecall() + { + var producer = CreateTestClient(); + producer.State = State.Running; + var metadata = producer.Sign(); + var recallReceipt = new RecallReceipt(MessageIdGenerator.GetInstance().Next()); + var recallMessageResponse = new Proto.RecallMessageResponse + { + Status = new Proto.Status + { + Code = Proto.Code.Ok + }, + MessageId = recallReceipt.MessageId + }; + var recallMessageInvocation = new RpcInvocation<Proto.RecallMessageRequest, Proto.RecallMessageResponse>(null, + recallMessageResponse, metadata); + var mockClientManager = new Mock<IClientManager>(); + producer.SetClientManager(mockClientManager.Object); + mockClientManager.Setup(cm => cm.RecallMessage(It.IsAny<Endpoints>(), + It.IsAny<Proto.RecallMessageRequest>(), It.IsAny<TimeSpan>())).Returns(Task.FromResult(recallMessageInvocation)); + await producer.RecallMessage("testTopic", "handle"); + mockClientManager.Verify(cm => cm.RecallMessage(It.IsAny<Endpoints>(), + It.IsAny<Proto.RecallMessageRequest>(), It.IsAny<TimeSpan>()), Times.Once); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public async Task TestRecallFailure() + { + var producer = CreateTestClient(); + producer.State = State.Running; + var mockClientManager = new Mock<IClientManager>(); + producer.SetClientManager(mockClientManager.Object); + var exception = new ArgumentException(); + mockClientManager.Setup(cm => cm.RecallMessage(It.IsAny<Endpoints>(), + It.IsAny<Proto.RecallMessageRequest>(), It.IsAny<TimeSpan>())).Throws(exception); + await producer.RecallMessage("testTopic", "handle"); + mockClientManager.Verify(cm => cm.RecallMessage(It.IsAny<Endpoints>(), + It.IsAny<Proto.RecallMessageRequest>(), It.IsAny<TimeSpan>()), Times.Once); + } + private Producer CreateTestClient() { const string host0 = "127.0.0.1";