This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new d2f95889 [ISSUE #878] The java implementation of recalling API (#879)
d2f95889 is described below

commit d2f95889b4852251ba5c526ef790ea8735b42100
Author: imzs <i...@foxmail.com>
AuthorDate: Tue Dec 10 11:04:06 2024 +0800

    [ISSUE #878] The java implementation of recalling API (#879)
---
 .github/workflows/java_coverage.yml                |  1 +
 README-CN.md                                       | 19 ++++-----
 README.md                                          |  1 +
 .../rocketmq/client/apis/producer/Producer.java    | 25 ++++++++++++
 .../{SendReceipt.java => RecallReceipt.java}       |  5 +--
 .../rocketmq/client/apis/producer/SendReceipt.java |  3 ++
 .../rocketmq/client/java/impl/ClientManager.java   | 12 ++++++
 .../client/java/impl/ClientManagerImpl.java        | 17 ++++++++
 .../client/java/impl/producer/ProducerImpl.java    | 45 ++++++++++++++++++++++
 .../java/impl/producer/RecallReceiptImpl.java}     | 28 +++++++++++---
 .../client/java/impl/producer/SendReceiptImpl.java | 14 ++++++-
 .../apache/rocketmq/client/java/rpc/RpcClient.java | 14 +++++++
 .../rocketmq/client/java/rpc/RpcClientImpl.java    | 10 +++++
 .../client/java/impl/ClientManagerImplTest.java    |  9 +++++
 .../java/impl/producer/ProducerImplTest.java       | 41 ++++++++++++++++++++
 15 files changed, 223 insertions(+), 21 deletions(-)

diff --git a/.github/workflows/java_coverage.yml 
b/.github/workflows/java_coverage.yml
index 0ad1e25e..cd9289d7 100644
--- a/.github/workflows/java_coverage.yml
+++ b/.github/workflows/java_coverage.yml
@@ -30,4 +30,5 @@ jobs:
         with:
           flags: java
           fail_ci_if_error: true
+          token: e7eb01be-398b-4f7f-a73e-dc35c428cb50
           verbose: true
diff --git a/README-CN.md b/README-CN.md
index 25131b36..5ae40d3e 100644
--- a/README-CN.md
+++ b/README-CN.md
@@ -17,15 +17,16 @@
 * 可用 - ✅
 * 进行中 - 🚧
 
-| 特性                                           | Java  | C/C++ |  C#   | 
Golang | Rust  | Python | Node.js |  PHP  |
-| ---------------------------------------------- | :---: | :---: | :---: | 
:----: | :---: | :----: | :-----: | :---: |
-| Producer with standard messages                |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
-| Producer with FIFO messages                    |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
-| Producer with timed/delay messages             |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
-| Producer with transactional messages           |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
-| Simple consumer                                |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
-| Push consumer with concurrent message listener |   ✅   |   ✅   |   🚧   |   🚧 
   |   ✅   |   🚧    |    🚧    |   🚧   |
-| Push consumer with FIFO message listener       |   ✅   |   ✅   |   🚧   |   🚧 
   |   ✅   |   🚧    |    🚧    |   🚧   |
+| 特性                                             | Java  | C/C++  |  C#   | 
Golang | Rust | Python | Node.js |  PHP  |
+|------------------------------------------------| :---: 
|:------:|:-----:|:------:|:----:|:------:|:-------:| :---: |
+| Producer with standard messages                |   ✅   |   ✅    |   ✅   |   
✅    |  ✅   |   ✅    |    ✅    |   🚧   |
+| Producer with FIFO messages                    |   ✅   |   ✅    |   ✅   |   
✅    |  ✅   |   ✅    |    ✅    |   🚧   |
+| Producer with timed/delay messages             |   ✅   |   ✅    |   ✅   |   
✅    |  ✅   |   ✅    |    ✅    |   🚧   |
+| Producer with transactional messages           |   ✅   |   ✅    |   ✅   |   
✅    |  ✅   |   ✅    |    ✅    |   🚧   |
+| Producer with recalling timed/delay messages   |   ✅   |   🚧   |   🚧    |   
🚧   |   🚧   |    🚧    |   🚧    |   🚧   |
+| Simple consumer                                |   ✅   |   ✅    |   ✅   |   
✅    |  ✅   |   ✅    |    ✅    |   🚧   |
+| Push consumer with concurrent message listener |   ✅   |   ✅    |  🚧   |   🚧 
  |  ✅   |   🚧   |   🚧    |   🚧   |
+| Push consumer with FIFO message listener       |   ✅   |   ✅    |  🚧   |   🚧 
  |  ✅   |   🚧   |   🚧    |   🚧   |
 
 ## 先决条件和构建
 
diff --git a/README.md b/README.md
index 0772a172..937d9c31 100644
--- a/README.md
+++ b/README.md
@@ -23,6 +23,7 @@ Provide cloud-native and robust solutions for Java, C++, C#, 
Golang, Rust and al
 | Producer with FIFO messages                    |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
 | Producer with timed/delay messages             |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
 | Producer with transactional messages           |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
+| Producer with recalling timed/delay messages   |   ✅   |   🚧   |   🚧    |   
🚧   |   🚧   |    🚧    |   🚧    |   🚧   |
 | Simple consumer                                |   ✅   |   ✅   |   ✅   |   ✅ 
   |   ✅   |   ✅    |    ✅    |   🚧   |
 | Push consumer with concurrent message listener |   ✅   |   ✅   |   🚧   |   🚧 
   |   ✅   |   🚧    |    🚧    |   🚧   |
 | Push consumer with FIFO message listener       |   ✅   |   ✅   |   🚧   |   🚧 
   |   ✅   |   🚧    |    🚧    |   🚧   |
diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/Producer.java
 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/Producer.java
index 71a3a57b..37d12725 100644
--- 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/Producer.java
+++ 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/Producer.java
@@ -76,6 +76,31 @@ public interface Producer extends Closeable {
      */
     Transaction beginTransaction() throws ClientException;
 
+    /**
+     * Recall message synchronously, only delay message is supported for now.
+     *
+     * <pre>{@code
+     * SendReceipt receipt = producer.send(message);
+     * String recallHandle = receipt.getRecallHandle();
+     * }</pre>
+     *
+     * @param topic the topic of the operation
+     * @param recallHandle the handle to identify a message to recall
+     * @return the returned receipt, or throw exception if response status is 
not OK.
+     */
+    RecallReceipt recallMessage(String topic, String recallHandle) throws 
ClientException;
+
+    /**
+     * Recall message asynchronously.
+     *
+     * <p>This method returns immediately, the result is included in the 
{@link CompletableFuture};
+     *
+     * @param topic the topic of the operation
+     * @param recallHandle the handle to identify a message to recall
+     * @return a future that indicates the receipt
+     */
+    CompletableFuture<RecallReceipt> recallMessageAsync(String topic, String 
recallHandle);
+
     /**
      * Closes the producer and releases all related resources.
      *
diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/SendReceipt.java
 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/RecallReceipt.java
similarity index 87%
copy from 
java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/SendReceipt.java
copy to 
java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/RecallReceipt.java
index dc96f55f..7ebeb54b 100644
--- 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/SendReceipt.java
+++ 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/RecallReceipt.java
@@ -19,9 +19,6 @@ package org.apache.rocketmq.client.apis.producer;
 
 import org.apache.rocketmq.client.apis.message.MessageId;
 
-/**
- * A receipt from the server, which only makes sense when the message is sent 
successfully.
- */
-public interface SendReceipt {
+public interface RecallReceipt {
     MessageId getMessageId();
 }
diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/SendReceipt.java
 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/SendReceipt.java
index dc96f55f..794da087 100644
--- 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/SendReceipt.java
+++ 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/SendReceipt.java
@@ -24,4 +24,7 @@ import org.apache.rocketmq.client.apis.message.MessageId;
  */
 public interface SendReceipt {
     MessageId getMessageId();
+
+    // Unique handle to identify a message to recall, only delay message is 
supported for now
+    String getRecallHandle();
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
index ce7d2e1b..e9ff2076 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
@@ -33,6 +33,8 @@ import apache.rocketmq.v2.QueryAssignmentRequest;
 import apache.rocketmq.v2.QueryAssignmentResponse;
 import apache.rocketmq.v2.QueryRouteRequest;
 import apache.rocketmq.v2.QueryRouteResponse;
+import apache.rocketmq.v2.RecallMessageRequest;
+import apache.rocketmq.v2.RecallMessageResponse;
 import apache.rocketmq.v2.ReceiveMessageRequest;
 import apache.rocketmq.v2.ReceiveMessageResponse;
 import apache.rocketmq.v2.SendMessageRequest;
@@ -170,6 +172,16 @@ public abstract class ClientManager extends 
AbstractIdleService {
     public abstract RpcFuture<NotifyClientTerminationRequest, 
NotifyClientTerminationResponse>
     notifyClientTermination(Endpoints endpoints, 
NotifyClientTerminationRequest request, Duration duration);
 
+    /**
+     * recall message asynchronously, the method ensures no throwable.
+     * @param endpoints request endpoints.
+     * @param request   recall message request.
+     * @param duration  request max duration.
+     * @return invocation of response future.
+     */
+    public abstract RpcFuture<RecallMessageRequest, RecallMessageResponse> 
recallMessage(Endpoints endpoints,
+        RecallMessageRequest request, Duration duration);
+
     /**
      * Establish telemetry session stream to server.
      *
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
index 7630fefb..76cfb617 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
@@ -33,6 +33,8 @@ import apache.rocketmq.v2.QueryAssignmentRequest;
 import apache.rocketmq.v2.QueryAssignmentResponse;
 import apache.rocketmq.v2.QueryRouteRequest;
 import apache.rocketmq.v2.QueryRouteResponse;
+import apache.rocketmq.v2.RecallMessageRequest;
+import apache.rocketmq.v2.RecallMessageResponse;
 import apache.rocketmq.v2.ReceiveMessageRequest;
 import apache.rocketmq.v2.ReceiveMessageResponse;
 import apache.rocketmq.v2.SendMessageRequest;
@@ -344,6 +346,21 @@ public class ClientManagerImpl extends ClientManager {
         }
     }
 
+    @Override
+    public RpcFuture<RecallMessageRequest, RecallMessageResponse> 
recallMessage(Endpoints endpoints,
+        RecallMessageRequest request, Duration duration) {
+        try {
+            final Metadata metadata = client.sign();
+            final Context context = new Context(endpoints, metadata);
+            final RpcClient rpcClient = getRpcClient(endpoints);
+            final ListenableFuture<RecallMessageResponse> future =
+                rpcClient.recallMessage(metadata, request, asyncWorker, 
duration);
+            return new RpcFuture<>(context, request, future);
+        } catch (Throwable t) {
+            return new RpcFuture<>(t);
+        }
+    }
+
     @Override
     public StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints, 
Duration duration,
         StreamObserver<TelemetryCommand> responseObserver) throws 
ClientException {
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index a8021770..1945639b 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -23,6 +23,8 @@ import apache.rocketmq.v2.EndTransactionRequest;
 import apache.rocketmq.v2.EndTransactionResponse;
 import apache.rocketmq.v2.HeartbeatRequest;
 import apache.rocketmq.v2.NotifyClientTerminationRequest;
+import apache.rocketmq.v2.RecallMessageRequest;
+import apache.rocketmq.v2.RecallMessageResponse;
 import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
 import apache.rocketmq.v2.SendMessageRequest;
 import apache.rocketmq.v2.SendMessageResponse;
@@ -50,16 +52,19 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import net.javacrumbs.futureconverter.java8guava.FutureConverter;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.apis.message.MessageId;
 import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.RecallReceipt;
 import org.apache.rocketmq.client.apis.producer.SendReceipt;
 import org.apache.rocketmq.client.apis.producer.Transaction;
 import org.apache.rocketmq.client.apis.producer.TransactionChecker;
 import org.apache.rocketmq.client.apis.producer.TransactionResolution;
 import org.apache.rocketmq.client.java.exception.InternalErrorException;
+import org.apache.rocketmq.client.java.exception.StatusChecker;
 import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
 import org.apache.rocketmq.client.java.hook.MessageHookPoints;
 import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
@@ -250,6 +255,18 @@ class ProducerImpl extends ClientImpl implements Producer {
         return new TransactionImpl(this);
     }
 
+    @Override
+    public RecallReceipt recallMessage(String topic, String recallHandle) 
throws ClientException {
+        final ListenableFuture<RecallReceipt> future = recallMessage0(topic, 
recallHandle);
+        return handleClientFuture(future);
+    }
+
+    @Override
+    public CompletableFuture<RecallReceipt> recallMessageAsync(String topic, 
String recallHandle) {
+        final ListenableFuture<RecallReceipt> future = recallMessage0(topic, 
recallHandle);
+        return FutureConverter.toCompletableFuture(future);
+    }
+
     @Override
     public void close() {
         this.stopAsync().awaitTerminated();
@@ -561,4 +578,32 @@ class ProducerImpl extends ClientImpl implements Producer {
         return Futures.transform(getRouteData(topic), topicRouteData -> 
updatePublishingLoadBalancer(topic,
             topicRouteData), MoreExecutors.directExecutor());
     }
+
+    ListenableFuture<RecallReceipt> recallMessage0(String topic, String 
recallHandle) {
+        if (!this.isRunning()) {
+            final IllegalStateException e = new 
IllegalStateException("Producer is not running now");
+            log.error("Unable to recall message because producer is not 
running, state={}, clientId={}",
+                this.state(), clientId);
+            return Futures.immediateFailedFuture(e);
+        }
+        if (StringUtils.isEmpty(recallHandle)) {
+            return Futures.immediateFailedFuture(new 
IllegalArgumentException("recall handle is invalid"));
+        }
+        final RecallMessageRequest request = RecallMessageRequest.newBuilder()
+            .setTopic(apache.rocketmq.v2.Resource.newBuilder()
+                .setResourceNamespace(clientConfiguration.getNamespace())
+                .setName(topic)
+                .build())
+            .setRecallHandle(recallHandle)
+            .build();
+        final Duration requestTimeout = 
clientConfiguration.getRequestTimeout();
+        final RpcFuture<RecallMessageRequest, RecallMessageResponse> future =
+            this.getClientManager().recallMessage(endpoints, request, 
requestTimeout);
+
+        return Futures.transformAsync(future, response -> {
+            final Status status = response.getStatus();
+            StatusChecker.check(status, future);
+            return Futures.immediateFuture(new 
RecallReceiptImpl(response.getMessageId()));
+        }, MoreExecutors.directExecutor());
+    }
 }
diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/SendReceipt.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/RecallReceiptImpl.java
similarity index 54%
copy from 
java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/SendReceipt.java
copy to 
java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/RecallReceiptImpl.java
index dc96f55f..b6ba0ed6 100644
--- 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/SendReceipt.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/RecallReceiptImpl.java
@@ -15,13 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.client.apis.producer;
+package org.apache.rocketmq.client.java.impl.producer;
 
+import com.google.common.base.MoreObjects;
 import org.apache.rocketmq.client.apis.message.MessageId;
+import org.apache.rocketmq.client.apis.producer.RecallReceipt;
+import org.apache.rocketmq.client.java.message.MessageIdCodec;
 
-/**
- * A receipt from the server, which only makes sense when the message is sent 
successfully.
- */
-public interface SendReceipt {
-    MessageId getMessageId();
+public class RecallReceiptImpl implements RecallReceipt {
+    private final MessageId messageId;
+
+    public RecallReceiptImpl(String messageIdStr) {
+        messageId = MessageIdCodec.getInstance().decode(messageIdStr);
+    }
+
+    @Override
+    public MessageId getMessageId() {
+        return messageId;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("messageId", messageId)
+                .toString();
+    }
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
index 5f9f403f..ed2fbf79 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
@@ -39,14 +39,17 @@ import org.apache.rocketmq.client.java.rpc.RpcFuture;
 public class SendReceiptImpl implements SendReceipt {
     private final MessageId messageId;
     private final String transactionId;
+    private final String recallHandle;
     private final MessageQueueImpl messageQueue;
     private final long offset;
 
-    private SendReceiptImpl(MessageId messageId, String transactionId, 
MessageQueueImpl messageQueue, long offset) {
+    private SendReceiptImpl(MessageId messageId, String transactionId,
+        MessageQueueImpl messageQueue, long offset, String recallHandle) {
         this.messageId = messageId;
         this.transactionId = transactionId;
         this.messageQueue = messageQueue;
         this.offset = offset;
+        this.recallHandle = recallHandle;
     }
 
     @Override
@@ -54,6 +57,11 @@ public class SendReceiptImpl implements SendReceipt {
         return messageId;
     }
 
+    @Override
+    public String getRecallHandle() {
+        return recallHandle;
+    }
+
     public MessageQueueImpl getMessageQueue() {
         return messageQueue;
     }
@@ -87,7 +95,8 @@ public class SendReceiptImpl implements SendReceipt {
             final MessageId messageId = 
MessageIdCodec.getInstance().decode(entry.getMessageId());
             final String transactionId = entry.getTransactionId();
             final long offset = entry.getOffset();
-            final SendReceiptImpl impl = new SendReceiptImpl(messageId, 
transactionId, mq, offset);
+            final String recallHandle = entry.getRecallHandle();
+            final SendReceiptImpl impl = new SendReceiptImpl(messageId, 
transactionId, mq, offset, recallHandle);
             sendReceipts.add(impl);
         }
         return sendReceipts;
@@ -97,6 +106,7 @@ public class SendReceiptImpl implements SendReceipt {
     public String toString() {
         return MoreObjects.toStringHelper(this)
             .add("messageId", messageId)
+            .add("recallHandle", recallHandle)
             .toString();
     }
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
index 4c5e8b76..c555082f 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
@@ -33,6 +33,8 @@ import apache.rocketmq.v2.QueryAssignmentRequest;
 import apache.rocketmq.v2.QueryAssignmentResponse;
 import apache.rocketmq.v2.QueryRouteRequest;
 import apache.rocketmq.v2.QueryRouteResponse;
+import apache.rocketmq.v2.RecallMessageRequest;
+import apache.rocketmq.v2.RecallMessageResponse;
 import apache.rocketmq.v2.ReceiveMessageRequest;
 import apache.rocketmq.v2.ReceiveMessageResponse;
 import apache.rocketmq.v2.SendMessageRequest;
@@ -186,6 +188,18 @@ public interface RpcClient {
     ListenableFuture<NotifyClientTerminationResponse> 
notifyClientTermination(Metadata metadata,
         NotifyClientTerminationRequest request, Executor executor, Duration 
duration);
 
+    /**
+     * Recall message asynchronously.
+     *
+     * @param metadata gRPC request header metadata.
+     * @param request  recall message request
+     * @param executor gRPC asynchronous executor.
+     * @param duration request max duration.
+     * @return invocation of response future.
+     */
+    ListenableFuture<RecallMessageResponse> recallMessage(Metadata metadata,
+        RecallMessageRequest request, Executor executor, Duration duration);
+
     /**
      * Start a streaming request and get the request observer.
      *
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
index a40cd398..77c2184e 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
@@ -34,6 +34,8 @@ import apache.rocketmq.v2.QueryAssignmentRequest;
 import apache.rocketmq.v2.QueryAssignmentResponse;
 import apache.rocketmq.v2.QueryRouteRequest;
 import apache.rocketmq.v2.QueryRouteResponse;
+import apache.rocketmq.v2.RecallMessageRequest;
+import apache.rocketmq.v2.RecallMessageResponse;
 import apache.rocketmq.v2.ReceiveMessageRequest;
 import apache.rocketmq.v2.ReceiveMessageResponse;
 import apache.rocketmq.v2.SendMessageRequest;
@@ -212,6 +214,14 @@ public class RpcClientImpl implements RpcClient {
             .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).notifyClientTermination(request);
     }
 
+    @Override
+    public ListenableFuture<RecallMessageResponse> recallMessage(Metadata 
metadata,
+        RecallMessageRequest request, Executor executor, Duration duration) {
+        this.activityNanoTime = System.nanoTime();
+        return 
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+            .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).recallMessage(request);
+    }
+
     @Override
     public StreamObserver<TelemetryCommand> telemetry(Metadata metadata, 
Executor executor, Duration duration,
         StreamObserver<TelemetryCommand> responseObserver) {
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
index 46d82339..b6ac2ec3 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
@@ -25,6 +25,7 @@ import apache.rocketmq.v2.HeartbeatRequest;
 import apache.rocketmq.v2.NotifyClientTerminationRequest;
 import apache.rocketmq.v2.QueryAssignmentRequest;
 import apache.rocketmq.v2.QueryRouteRequest;
+import apache.rocketmq.v2.RecallMessageRequest;
 import apache.rocketmq.v2.ReceiveMessageRequest;
 import apache.rocketmq.v2.SendMessageRequest;
 import io.grpc.Metadata;
@@ -135,4 +136,12 @@ public class ClientManagerImplTest extends TestBase {
         CLIENT_MANAGER.notifyClientTermination(null, request, 
Duration.ofSeconds(1));
         // Expect no exception thrown.
     }
+
+    @Test
+    public void testRecallMessage() {
+        RecallMessageRequest request = 
RecallMessageRequest.newBuilder().build();
+        CLIENT_MANAGER.recallMessage(fakeEndpoints(), request, 
Duration.ofSeconds(1));
+        CLIENT_MANAGER.recallMessage(null, request, Duration.ofSeconds(1));
+        // Expect no exception thrown.
+    }
 }
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
index 60bb0fef..20533a8c 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
@@ -24,10 +24,13 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import apache.rocketmq.v2.Broker;
+import apache.rocketmq.v2.Code;
 import apache.rocketmq.v2.MessageQueue;
 import apache.rocketmq.v2.MessageType;
 import apache.rocketmq.v2.Permission;
+import apache.rocketmq.v2.RecallMessageResponse;
 import apache.rocketmq.v2.Resource;
+import apache.rocketmq.v2.Status;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.Service;
 import java.util.ArrayList;
@@ -38,10 +41,15 @@ import java.util.Set;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.java.exception.InternalErrorException;
+import org.apache.rocketmq.client.java.impl.ClientManagerImpl;
+import org.apache.rocketmq.client.java.message.MessageIdCodec;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.apache.rocketmq.client.java.route.TopicRouteData;
+import org.apache.rocketmq.client.java.rpc.RpcFuture;
 import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mockito;
@@ -109,4 +117,37 @@ public class ProducerImplTest extends TestBase {
         verify(producer, times(maxAttempts)).send0(any(Endpoints.class), 
anyList(), any(MessageQueueImpl.class));
         producer.close();
     }
+
+    @Test
+    public void testRecall() throws Exception {
+        final ProducerImpl producer = createProducerWithTopic(FAKE_TOPIC_0);
+        final String messageId = 
MessageIdCodec.getInstance().nextMessageId().toString();
+        final RecallReceiptImpl recallReceiptImpl = new 
RecallReceiptImpl(messageId);
+        
Mockito.doReturn(Futures.immediateFuture(recallReceiptImpl)).when(producer).recallMessage0(any(),
 any());
+        producer.recallMessage(FAKE_TOPIC_0, "handle");
+        verify(producer, times(1)).recallMessage0(any(), any());
+        producer.close();
+    }
+
+    @Test
+    public void testRecallFailure() {
+        RecallMessageResponse response = RecallMessageResponse.newBuilder()
+            .setMessageId("")
+            
.setStatus(Status.newBuilder().setCode(Code.INTERNAL_ERROR).build())
+            .build();
+        final ClientManagerImpl clientManager = mock(ClientManagerImpl.class);
+        final ProducerImpl producer = createProducerWithTopic(FAKE_TOPIC_0);
+        Mockito.doReturn(clientManager).when(producer).getClientManager();
+        Mockito.doReturn(new RpcFuture<>(fakeRpcContext(), null, 
Futures.immediateFuture(response)))
+            .when(clientManager).recallMessage(any(), any(), any());
+
+        Assert.assertThrows(InternalErrorException.class, () -> {
+            producer.recallMessage(FAKE_TOPIC_0, "handle");
+        });
+        producer.recallMessageAsync(FAKE_TOPIC_0, "handle").whenComplete((r, 
t) -> {
+            Assert.assertTrue(t instanceof InternalErrorException);
+        });
+        verify(producer, times(2)).recallMessage0(any(), any());
+        producer.close();
+    }
 }
\ No newline at end of file

Reply via email to