This is an automated email from the ASF dual-hosted git repository. lollipop pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push: new d2f95889 [ISSUE #878] The java implementation of recalling API (#879) d2f95889 is described below commit d2f95889b4852251ba5c526ef790ea8735b42100 Author: imzs <i...@foxmail.com> AuthorDate: Tue Dec 10 11:04:06 2024 +0800 [ISSUE #878] The java implementation of recalling API (#879) --- .github/workflows/java_coverage.yml | 1 + README-CN.md | 19 ++++----- README.md | 1 + .../rocketmq/client/apis/producer/Producer.java | 25 ++++++++++++ .../{SendReceipt.java => RecallReceipt.java} | 5 +-- .../rocketmq/client/apis/producer/SendReceipt.java | 3 ++ .../rocketmq/client/java/impl/ClientManager.java | 12 ++++++ .../client/java/impl/ClientManagerImpl.java | 17 ++++++++ .../client/java/impl/producer/ProducerImpl.java | 45 ++++++++++++++++++++++ .../java/impl/producer/RecallReceiptImpl.java} | 28 +++++++++++--- .../client/java/impl/producer/SendReceiptImpl.java | 14 ++++++- .../apache/rocketmq/client/java/rpc/RpcClient.java | 14 +++++++ .../rocketmq/client/java/rpc/RpcClientImpl.java | 10 +++++ .../client/java/impl/ClientManagerImplTest.java | 9 +++++ .../java/impl/producer/ProducerImplTest.java | 41 ++++++++++++++++++++ 15 files changed, 223 insertions(+), 21 deletions(-) diff --git a/.github/workflows/java_coverage.yml b/.github/workflows/java_coverage.yml index 0ad1e25e..cd9289d7 100644 --- a/.github/workflows/java_coverage.yml +++ b/.github/workflows/java_coverage.yml @@ -30,4 +30,5 @@ jobs: with: flags: java fail_ci_if_error: true + token: e7eb01be-398b-4f7f-a73e-dc35c428cb50 verbose: true diff --git a/README-CN.md b/README-CN.md index 25131b36..5ae40d3e 100644 --- a/README-CN.md +++ b/README-CN.md @@ -17,15 +17,16 @@ * 可用 - ✅ * 进行中 - 🚧 -| 特性 | Java | C/C++ | C# | Golang | Rust | Python | Node.js | PHP | -| ---------------------------------------------- | :---: | :---: | :---: | :----: | :---: | :----: | :-----: | :---: | -| Producer with standard messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | -| Producer with FIFO messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | -| Producer with timed/delay messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | -| Producer with transactional messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | -| Simple consumer | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | -| Push consumer with concurrent message listener | ✅ | ✅ | 🚧 | 🚧 | ✅ | 🚧 | 🚧 | 🚧 | -| Push consumer with FIFO message listener | ✅ | ✅ | 🚧 | 🚧 | ✅ | 🚧 | 🚧 | 🚧 | +| 特性 | Java | C/C++ | C# | Golang | Rust | Python | Node.js | PHP | +|------------------------------------------------| :---: |:------:|:-----:|:------:|:----:|:------:|:-------:| :---: | +| Producer with standard messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | +| Producer with FIFO messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | +| Producer with timed/delay messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | +| Producer with transactional messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | +| Producer with recalling timed/delay messages | ✅ | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | +| Simple consumer | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | +| Push consumer with concurrent message listener | ✅ | ✅ | 🚧 | 🚧 | ✅ | 🚧 | 🚧 | 🚧 | +| Push consumer with FIFO message listener | ✅ | ✅ | 🚧 | 🚧 | ✅ | 🚧 | 🚧 | 🚧 | ## 先决条件和构建 diff --git a/README.md b/README.md index 0772a172..937d9c31 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ Provide cloud-native and robust solutions for Java, C++, C#, Golang, Rust and al | Producer with FIFO messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | | Producer with timed/delay messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | | Producer with transactional messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | +| Producer with recalling timed/delay messages | ✅ | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | | Simple consumer | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | | Push consumer with concurrent message listener | ✅ | ✅ | 🚧 | 🚧 | ✅ | 🚧 | 🚧 | 🚧 | | Push consumer with FIFO message listener | ✅ | ✅ | 🚧 | 🚧 | ✅ | 🚧 | 🚧 | 🚧 | diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/Producer.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/Producer.java index 71a3a57b..37d12725 100644 --- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/Producer.java +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/Producer.java @@ -76,6 +76,31 @@ public interface Producer extends Closeable { */ Transaction beginTransaction() throws ClientException; + /** + * Recall message synchronously, only delay message is supported for now. + * + * <pre>{@code + * SendReceipt receipt = producer.send(message); + * String recallHandle = receipt.getRecallHandle(); + * }</pre> + * + * @param topic the topic of the operation + * @param recallHandle the handle to identify a message to recall + * @return the returned receipt, or throw exception if response status is not OK. + */ + RecallReceipt recallMessage(String topic, String recallHandle) throws ClientException; + + /** + * Recall message asynchronously. + * + * <p>This method returns immediately, the result is included in the {@link CompletableFuture}; + * + * @param topic the topic of the operation + * @param recallHandle the handle to identify a message to recall + * @return a future that indicates the receipt + */ + CompletableFuture<RecallReceipt> recallMessageAsync(String topic, String recallHandle); + /** * Closes the producer and releases all related resources. * diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/SendReceipt.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/RecallReceipt.java similarity index 87% copy from java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/SendReceipt.java copy to java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/RecallReceipt.java index dc96f55f..7ebeb54b 100644 --- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/SendReceipt.java +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/RecallReceipt.java @@ -19,9 +19,6 @@ package org.apache.rocketmq.client.apis.producer; import org.apache.rocketmq.client.apis.message.MessageId; -/** - * A receipt from the server, which only makes sense when the message is sent successfully. - */ -public interface SendReceipt { +public interface RecallReceipt { MessageId getMessageId(); } diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/SendReceipt.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/SendReceipt.java index dc96f55f..794da087 100644 --- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/SendReceipt.java +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/SendReceipt.java @@ -24,4 +24,7 @@ import org.apache.rocketmq.client.apis.message.MessageId; */ public interface SendReceipt { MessageId getMessageId(); + + // Unique handle to identify a message to recall, only delay message is supported for now + String getRecallHandle(); } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java index ce7d2e1b..e9ff2076 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java @@ -33,6 +33,8 @@ import apache.rocketmq.v2.QueryAssignmentRequest; import apache.rocketmq.v2.QueryAssignmentResponse; import apache.rocketmq.v2.QueryRouteRequest; import apache.rocketmq.v2.QueryRouteResponse; +import apache.rocketmq.v2.RecallMessageRequest; +import apache.rocketmq.v2.RecallMessageResponse; import apache.rocketmq.v2.ReceiveMessageRequest; import apache.rocketmq.v2.ReceiveMessageResponse; import apache.rocketmq.v2.SendMessageRequest; @@ -170,6 +172,16 @@ public abstract class ClientManager extends AbstractIdleService { public abstract RpcFuture<NotifyClientTerminationRequest, NotifyClientTerminationResponse> notifyClientTermination(Endpoints endpoints, NotifyClientTerminationRequest request, Duration duration); + /** + * recall message asynchronously, the method ensures no throwable. + * @param endpoints request endpoints. + * @param request recall message request. + * @param duration request max duration. + * @return invocation of response future. + */ + public abstract RpcFuture<RecallMessageRequest, RecallMessageResponse> recallMessage(Endpoints endpoints, + RecallMessageRequest request, Duration duration); + /** * Establish telemetry session stream to server. * diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java index 7630fefb..76cfb617 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java @@ -33,6 +33,8 @@ import apache.rocketmq.v2.QueryAssignmentRequest; import apache.rocketmq.v2.QueryAssignmentResponse; import apache.rocketmq.v2.QueryRouteRequest; import apache.rocketmq.v2.QueryRouteResponse; +import apache.rocketmq.v2.RecallMessageRequest; +import apache.rocketmq.v2.RecallMessageResponse; import apache.rocketmq.v2.ReceiveMessageRequest; import apache.rocketmq.v2.ReceiveMessageResponse; import apache.rocketmq.v2.SendMessageRequest; @@ -344,6 +346,21 @@ public class ClientManagerImpl extends ClientManager { } } + @Override + public RpcFuture<RecallMessageRequest, RecallMessageResponse> recallMessage(Endpoints endpoints, + RecallMessageRequest request, Duration duration) { + try { + final Metadata metadata = client.sign(); + final Context context = new Context(endpoints, metadata); + final RpcClient rpcClient = getRpcClient(endpoints); + final ListenableFuture<RecallMessageResponse> future = + rpcClient.recallMessage(metadata, request, asyncWorker, duration); + return new RpcFuture<>(context, request, future); + } catch (Throwable t) { + return new RpcFuture<>(t); + } + } + @Override public StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints, Duration duration, StreamObserver<TelemetryCommand> responseObserver) throws ClientException { diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java index a8021770..1945639b 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java @@ -23,6 +23,8 @@ import apache.rocketmq.v2.EndTransactionRequest; import apache.rocketmq.v2.EndTransactionResponse; import apache.rocketmq.v2.HeartbeatRequest; import apache.rocketmq.v2.NotifyClientTerminationRequest; +import apache.rocketmq.v2.RecallMessageRequest; +import apache.rocketmq.v2.RecallMessageResponse; import apache.rocketmq.v2.RecoverOrphanedTransactionCommand; import apache.rocketmq.v2.SendMessageRequest; import apache.rocketmq.v2.SendMessageResponse; @@ -50,16 +52,19 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; import net.javacrumbs.futureconverter.java8guava.FutureConverter; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.message.MessageId; import org.apache.rocketmq.client.apis.producer.Producer; +import org.apache.rocketmq.client.apis.producer.RecallReceipt; import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.apache.rocketmq.client.apis.producer.Transaction; import org.apache.rocketmq.client.apis.producer.TransactionChecker; import org.apache.rocketmq.client.apis.producer.TransactionResolution; import org.apache.rocketmq.client.java.exception.InternalErrorException; +import org.apache.rocketmq.client.java.exception.StatusChecker; import org.apache.rocketmq.client.java.exception.TooManyRequestsException; import org.apache.rocketmq.client.java.hook.MessageHookPoints; import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus; @@ -250,6 +255,18 @@ class ProducerImpl extends ClientImpl implements Producer { return new TransactionImpl(this); } + @Override + public RecallReceipt recallMessage(String topic, String recallHandle) throws ClientException { + final ListenableFuture<RecallReceipt> future = recallMessage0(topic, recallHandle); + return handleClientFuture(future); + } + + @Override + public CompletableFuture<RecallReceipt> recallMessageAsync(String topic, String recallHandle) { + final ListenableFuture<RecallReceipt> future = recallMessage0(topic, recallHandle); + return FutureConverter.toCompletableFuture(future); + } + @Override public void close() { this.stopAsync().awaitTerminated(); @@ -561,4 +578,32 @@ class ProducerImpl extends ClientImpl implements Producer { return Futures.transform(getRouteData(topic), topicRouteData -> updatePublishingLoadBalancer(topic, topicRouteData), MoreExecutors.directExecutor()); } + + ListenableFuture<RecallReceipt> recallMessage0(String topic, String recallHandle) { + if (!this.isRunning()) { + final IllegalStateException e = new IllegalStateException("Producer is not running now"); + log.error("Unable to recall message because producer is not running, state={}, clientId={}", + this.state(), clientId); + return Futures.immediateFailedFuture(e); + } + if (StringUtils.isEmpty(recallHandle)) { + return Futures.immediateFailedFuture(new IllegalArgumentException("recall handle is invalid")); + } + final RecallMessageRequest request = RecallMessageRequest.newBuilder() + .setTopic(apache.rocketmq.v2.Resource.newBuilder() + .setResourceNamespace(clientConfiguration.getNamespace()) + .setName(topic) + .build()) + .setRecallHandle(recallHandle) + .build(); + final Duration requestTimeout = clientConfiguration.getRequestTimeout(); + final RpcFuture<RecallMessageRequest, RecallMessageResponse> future = + this.getClientManager().recallMessage(endpoints, request, requestTimeout); + + return Futures.transformAsync(future, response -> { + final Status status = response.getStatus(); + StatusChecker.check(status, future); + return Futures.immediateFuture(new RecallReceiptImpl(response.getMessageId())); + }, MoreExecutors.directExecutor()); + } } diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/SendReceipt.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/RecallReceiptImpl.java similarity index 54% copy from java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/SendReceipt.java copy to java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/RecallReceiptImpl.java index dc96f55f..b6ba0ed6 100644 --- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/producer/SendReceipt.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/RecallReceiptImpl.java @@ -15,13 +15,29 @@ * limitations under the License. */ -package org.apache.rocketmq.client.apis.producer; +package org.apache.rocketmq.client.java.impl.producer; +import com.google.common.base.MoreObjects; import org.apache.rocketmq.client.apis.message.MessageId; +import org.apache.rocketmq.client.apis.producer.RecallReceipt; +import org.apache.rocketmq.client.java.message.MessageIdCodec; -/** - * A receipt from the server, which only makes sense when the message is sent successfully. - */ -public interface SendReceipt { - MessageId getMessageId(); +public class RecallReceiptImpl implements RecallReceipt { + private final MessageId messageId; + + public RecallReceiptImpl(String messageIdStr) { + messageId = MessageIdCodec.getInstance().decode(messageIdStr); + } + + @Override + public MessageId getMessageId() { + return messageId; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("messageId", messageId) + .toString(); + } } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java index 5f9f403f..ed2fbf79 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java @@ -39,14 +39,17 @@ import org.apache.rocketmq.client.java.rpc.RpcFuture; public class SendReceiptImpl implements SendReceipt { private final MessageId messageId; private final String transactionId; + private final String recallHandle; private final MessageQueueImpl messageQueue; private final long offset; - private SendReceiptImpl(MessageId messageId, String transactionId, MessageQueueImpl messageQueue, long offset) { + private SendReceiptImpl(MessageId messageId, String transactionId, + MessageQueueImpl messageQueue, long offset, String recallHandle) { this.messageId = messageId; this.transactionId = transactionId; this.messageQueue = messageQueue; this.offset = offset; + this.recallHandle = recallHandle; } @Override @@ -54,6 +57,11 @@ public class SendReceiptImpl implements SendReceipt { return messageId; } + @Override + public String getRecallHandle() { + return recallHandle; + } + public MessageQueueImpl getMessageQueue() { return messageQueue; } @@ -87,7 +95,8 @@ public class SendReceiptImpl implements SendReceipt { final MessageId messageId = MessageIdCodec.getInstance().decode(entry.getMessageId()); final String transactionId = entry.getTransactionId(); final long offset = entry.getOffset(); - final SendReceiptImpl impl = new SendReceiptImpl(messageId, transactionId, mq, offset); + final String recallHandle = entry.getRecallHandle(); + final SendReceiptImpl impl = new SendReceiptImpl(messageId, transactionId, mq, offset, recallHandle); sendReceipts.add(impl); } return sendReceipts; @@ -97,6 +106,7 @@ public class SendReceiptImpl implements SendReceipt { public String toString() { return MoreObjects.toStringHelper(this) .add("messageId", messageId) + .add("recallHandle", recallHandle) .toString(); } } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java index 4c5e8b76..c555082f 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java @@ -33,6 +33,8 @@ import apache.rocketmq.v2.QueryAssignmentRequest; import apache.rocketmq.v2.QueryAssignmentResponse; import apache.rocketmq.v2.QueryRouteRequest; import apache.rocketmq.v2.QueryRouteResponse; +import apache.rocketmq.v2.RecallMessageRequest; +import apache.rocketmq.v2.RecallMessageResponse; import apache.rocketmq.v2.ReceiveMessageRequest; import apache.rocketmq.v2.ReceiveMessageResponse; import apache.rocketmq.v2.SendMessageRequest; @@ -186,6 +188,18 @@ public interface RpcClient { ListenableFuture<NotifyClientTerminationResponse> notifyClientTermination(Metadata metadata, NotifyClientTerminationRequest request, Executor executor, Duration duration); + /** + * Recall message asynchronously. + * + * @param metadata gRPC request header metadata. + * @param request recall message request + * @param executor gRPC asynchronous executor. + * @param duration request max duration. + * @return invocation of response future. + */ + ListenableFuture<RecallMessageResponse> recallMessage(Metadata metadata, + RecallMessageRequest request, Executor executor, Duration duration); + /** * Start a streaming request and get the request observer. * diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java index a40cd398..77c2184e 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java @@ -34,6 +34,8 @@ import apache.rocketmq.v2.QueryAssignmentRequest; import apache.rocketmq.v2.QueryAssignmentResponse; import apache.rocketmq.v2.QueryRouteRequest; import apache.rocketmq.v2.QueryRouteResponse; +import apache.rocketmq.v2.RecallMessageRequest; +import apache.rocketmq.v2.RecallMessageResponse; import apache.rocketmq.v2.ReceiveMessageRequest; import apache.rocketmq.v2.ReceiveMessageResponse; import apache.rocketmq.v2.SendMessageRequest; @@ -212,6 +214,14 @@ public class RpcClientImpl implements RpcClient { .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).notifyClientTermination(request); } + @Override + public ListenableFuture<RecallMessageResponse> recallMessage(Metadata metadata, + RecallMessageRequest request, Executor executor, Duration duration) { + this.activityNanoTime = System.nanoTime(); + return futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor) + .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).recallMessage(request); + } + @Override public StreamObserver<TelemetryCommand> telemetry(Metadata metadata, Executor executor, Duration duration, StreamObserver<TelemetryCommand> responseObserver) { diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java index 46d82339..b6ac2ec3 100644 --- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java +++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java @@ -25,6 +25,7 @@ import apache.rocketmq.v2.HeartbeatRequest; import apache.rocketmq.v2.NotifyClientTerminationRequest; import apache.rocketmq.v2.QueryAssignmentRequest; import apache.rocketmq.v2.QueryRouteRequest; +import apache.rocketmq.v2.RecallMessageRequest; import apache.rocketmq.v2.ReceiveMessageRequest; import apache.rocketmq.v2.SendMessageRequest; import io.grpc.Metadata; @@ -135,4 +136,12 @@ public class ClientManagerImplTest extends TestBase { CLIENT_MANAGER.notifyClientTermination(null, request, Duration.ofSeconds(1)); // Expect no exception thrown. } + + @Test + public void testRecallMessage() { + RecallMessageRequest request = RecallMessageRequest.newBuilder().build(); + CLIENT_MANAGER.recallMessage(fakeEndpoints(), request, Duration.ofSeconds(1)); + CLIENT_MANAGER.recallMessage(null, request, Duration.ofSeconds(1)); + // Expect no exception thrown. + } } \ No newline at end of file diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java index 60bb0fef..20533a8c 100644 --- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java +++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java @@ -24,10 +24,13 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import apache.rocketmq.v2.Broker; +import apache.rocketmq.v2.Code; import apache.rocketmq.v2.MessageQueue; import apache.rocketmq.v2.MessageType; import apache.rocketmq.v2.Permission; +import apache.rocketmq.v2.RecallMessageResponse; import apache.rocketmq.v2.Resource; +import apache.rocketmq.v2.Status; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Service; import java.util.ArrayList; @@ -38,10 +41,15 @@ import java.util.Set; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.message.Message; +import org.apache.rocketmq.client.java.exception.InternalErrorException; +import org.apache.rocketmq.client.java.impl.ClientManagerImpl; +import org.apache.rocketmq.client.java.message.MessageIdCodec; import org.apache.rocketmq.client.java.route.Endpoints; import org.apache.rocketmq.client.java.route.MessageQueueImpl; import org.apache.rocketmq.client.java.route.TopicRouteData; +import org.apache.rocketmq.client.java.rpc.RpcFuture; import org.apache.rocketmq.client.java.tool.TestBase; +import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; @@ -109,4 +117,37 @@ public class ProducerImplTest extends TestBase { verify(producer, times(maxAttempts)).send0(any(Endpoints.class), anyList(), any(MessageQueueImpl.class)); producer.close(); } + + @Test + public void testRecall() throws Exception { + final ProducerImpl producer = createProducerWithTopic(FAKE_TOPIC_0); + final String messageId = MessageIdCodec.getInstance().nextMessageId().toString(); + final RecallReceiptImpl recallReceiptImpl = new RecallReceiptImpl(messageId); + Mockito.doReturn(Futures.immediateFuture(recallReceiptImpl)).when(producer).recallMessage0(any(), any()); + producer.recallMessage(FAKE_TOPIC_0, "handle"); + verify(producer, times(1)).recallMessage0(any(), any()); + producer.close(); + } + + @Test + public void testRecallFailure() { + RecallMessageResponse response = RecallMessageResponse.newBuilder() + .setMessageId("") + .setStatus(Status.newBuilder().setCode(Code.INTERNAL_ERROR).build()) + .build(); + final ClientManagerImpl clientManager = mock(ClientManagerImpl.class); + final ProducerImpl producer = createProducerWithTopic(FAKE_TOPIC_0); + Mockito.doReturn(clientManager).when(producer).getClientManager(); + Mockito.doReturn(new RpcFuture<>(fakeRpcContext(), null, Futures.immediateFuture(response))) + .when(clientManager).recallMessage(any(), any(), any()); + + Assert.assertThrows(InternalErrorException.class, () -> { + producer.recallMessage(FAKE_TOPIC_0, "handle"); + }); + producer.recallMessageAsync(FAKE_TOPIC_0, "handle").whenComplete((r, t) -> { + Assert.assertTrue(t instanceof InternalErrorException); + }); + verify(producer, times(2)).recallMessage0(any(), any()); + producer.close(); + } } \ No newline at end of file