This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new bc32cbd4 Polish example of simple consumer (#351)
bc32cbd4 is described below

commit bc32cbd440f70cb71eb223a3b94c775852740fa9
Author: Aaron Ai <[email protected]>
AuthorDate: Mon Feb 6 11:07:53 2023 +0800

    Polish example of simple consumer (#351)
---
 .../java/example/AsyncSimpleConsumerExample.java   | 60 +++++++++++-----------
 .../client/java/example/SimpleConsumerExample.java | 29 ++++++-----
 2 files changed, 47 insertions(+), 42 deletions(-)

diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
index 7cd462e6..51f3ccf8 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
@@ -17,7 +17,6 @@
 
 package org.apache.rocketmq.client.java.example;
 
-import java.io.IOException;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
@@ -45,7 +44,8 @@ public class AsyncSimpleConsumerExample {
     private AsyncSimpleConsumerExample() {
     }
 
-    public static void main(String[] args) throws ClientException, 
IOException, InterruptedException {
+    @SuppressWarnings({"resource", "InfiniteLoopStatement"})
+    public static void main(String[] args) throws ClientException {
         final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
 
         // Credential provider is optional for client configuration.
@@ -81,34 +81,36 @@ public class AsyncSimpleConsumerExample {
         ExecutorService receiveCallbackExecutor = 
Executors.newCachedThreadPool();
         // Set individual thread pool for ack callback.
         ExecutorService ackCallbackExecutor = Executors.newCachedThreadPool();
-        final CompletableFuture<List<MessageView>> future0 = 
consumer.receiveAsync(maxMessageNum, invisibleDuration);
-        future0.whenCompleteAsync(((messages, throwable) -> {
-            if (null != throwable) {
-                log.error("Failed to receive message from remote", throwable);
-                // Return early.
-                return;
-            }
-            log.info("Received {} message(s)", messages.size());
-            // Using messageView as key rather than message id because message 
id may be duplicated.
-            final Map<MessageView, CompletableFuture<Void>> map =
-                messages.stream().collect(Collectors.toMap(message -> message, 
consumer::ackAsync));
-            for (Map.Entry<MessageView, CompletableFuture<Void>> entry : 
map.entrySet()) {
-                final MessageId messageId = entry.getKey().getMessageId();
-                final CompletableFuture<Void> future = entry.getValue();
-                future.whenCompleteAsync((v, t) -> {
-                    if (null != t) {
-                        log.error("Message is failed to be acknowledged, 
messageId={}", messageId, t);
-                        // Return early.
-                        return;
-                    }
-                    log.info("Message is acknowledged successfully, 
messageId={}", messageId);
-                }, ackCallbackExecutor);
-            }
+        // Receive message.
+        do {
+            final CompletableFuture<List<MessageView>> future0 = 
consumer.receiveAsync(maxMessageNum,
+                invisibleDuration);
+            future0.whenCompleteAsync(((messages, throwable) -> {
+                if (null != throwable) {
+                    log.error("Failed to receive message from remote", 
throwable);
+                    // Return early.
+                    return;
+                }
+                log.info("Received {} message(s)", messages.size());
+                // Using messageView as key rather than message id because 
message id may be duplicated.
+                final Map<MessageView, CompletableFuture<Void>> map =
+                    messages.stream().collect(Collectors.toMap(message -> 
message, consumer::ackAsync));
+                for (Map.Entry<MessageView, CompletableFuture<Void>> entry : 
map.entrySet()) {
+                    final MessageId messageId = entry.getKey().getMessageId();
+                    final CompletableFuture<Void> future = entry.getValue();
+                    future.whenCompleteAsync((v, t) -> {
+                        if (null != t) {
+                            log.error("Message is failed to be acknowledged, 
messageId={}", messageId, t);
+                            // Return early.
+                            return;
+                        }
+                        log.info("Message is acknowledged successfully, 
messageId={}", messageId);
+                    }, ackCallbackExecutor);
+                }
 
-        }), receiveCallbackExecutor);
-        // Block to avoid exist of background threads.
-        Thread.sleep(Long.MAX_VALUE);
+            }), receiveCallbackExecutor);
+        } while (true);
         // Close the simple consumer when you don't need it anymore.
-        consumer.close();
+        // consumer.close();
     }
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
index c3f06a1a..d69253c0 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
@@ -17,7 +17,6 @@
 
 package org.apache.rocketmq.client.java.example;
 
-import java.io.IOException;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
@@ -40,7 +39,8 @@ public class SimpleConsumerExample {
     private SimpleConsumerExample() {
     }
 
-    public static void main(String[] args) throws ClientException, IOException 
{
+    @SuppressWarnings({"resource", "InfiniteLoopStatement"})
+    public static void main(String[] args) throws ClientException {
         final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
 
         // Credential provider is optional for client configuration.
@@ -72,18 +72,21 @@ public class SimpleConsumerExample {
         int maxMessageNum = 16;
         // Set message invisible duration after it is received.
         Duration invisibleDuration = Duration.ofSeconds(15);
-        final List<MessageView> messages = consumer.receive(maxMessageNum, 
invisibleDuration);
-        log.info("Received {} message(s)", messages.size());
-        for (MessageView message : messages) {
-            final MessageId messageId = message.getMessageId();
-            try {
-                consumer.ack(message);
-                log.info("Message is acknowledged successfully, messageId={}", 
messageId);
-            } catch (Throwable t) {
-                log.error("Message is failed to be acknowledged, 
messageId={}", messageId, t);
+        // Receive message, multi-threading is more recommended.
+        do {
+            final List<MessageView> messages = consumer.receive(maxMessageNum, 
invisibleDuration);
+            log.info("Received {} message(s)", messages.size());
+            for (MessageView message : messages) {
+                final MessageId messageId = message.getMessageId();
+                try {
+                    consumer.ack(message);
+                    log.info("Message is acknowledged successfully, 
messageId={}", messageId);
+                } catch (Throwable t) {
+                    log.error("Message is failed to be acknowledged, 
messageId={}", messageId, t);
+                }
             }
-        }
+        } while (true);
         // Close the simple consumer when you don't need it anymore.
-        consumer.close();
+        // consumer.close();
     }
 }

Reply via email to