This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 5e7baab95 [ISSUE #4323] Optimized openmessaging example code (#4347)
5e7baab95 is described below

commit 5e7baab953731a8d09a181a7891edfcea91d5fd6
Author: 李晓双 Li Xiao Shuang <[email protected]>
AuthorDate: Tue May 24 09:31:43 2022 +0800

    [ISSUE #4323] Optimized openmessaging example code (#4347)
    
    * Optimized openmessaging example code
    
    * fix ci
---
 .../example/openmessaging/SimpleProducer.java      | 31 ++++++++++++----------
 .../example/openmessaging/SimplePullConsumer.java  | 10 +++++--
 .../example/openmessaging/SimplePushConsumer.java  | 27 +++++++++----------
 3 files changed, 38 insertions(+), 30 deletions(-)

diff --git 
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
 
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
index 08c00be79..803faaa23 100644
--- 
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
+++ 
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.example.openmessaging;
 
 import io.openmessaging.Future;
-import io.openmessaging.FutureListener;
 import io.openmessaging.Message;
 import io.openmessaging.MessagingAccessPoint;
 import io.openmessaging.OMS;
@@ -26,9 +25,14 @@ import io.openmessaging.producer.SendResult;
 import java.util.concurrent.CountDownLatch;
 
 public class SimpleProducer {
+
+    public static final String URL = 
"oms:rocketmq://localhost:9876/default:default";
+    public static final String QUEUE = "OMS_HELLO_TOPIC";
+
     public static void main(String[] args) {
+        // You need to set the environment variable 
OMS_RMQ_DIRECT_NAME_SRV=true
         final MessagingAccessPoint messagingAccessPoint =
-            
OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
+            OMS.getMessagingAccessPoint(URL);
 
         final Producer producer = messagingAccessPoint.createProducer();
 
@@ -39,7 +43,8 @@ public class SimpleProducer {
         System.out.printf("Producer startup OK%n");
 
         {
-            Message message = producer.createBytesMessage("OMS_HELLO_TOPIC", 
"OMS_HELLO_BODY".getBytes());
+            Message message = producer.createBytesMessage(QUEUE, 
"OMS_HELLO_BODY".getBytes());
+
             SendResult sendResult = producer.send(message);
             //final Void aVoid = result.get(3000L);
             System.out.printf("Send async message OK, msgId: %s%n", 
sendResult.messageId());
@@ -47,17 +52,14 @@ public class SimpleProducer {
 
         final CountDownLatch countDownLatch = new CountDownLatch(1);
         {
-            final Future<SendResult> result = 
producer.sendAsync(producer.createBytesMessage("OMS_HELLO_TOPIC", 
"OMS_HELLO_BODY".getBytes()));
-            result.addListener(new FutureListener<SendResult>() {
-                @Override
-                public void operationComplete(Future<SendResult> future) {
-                    if (future.getThrowable() != null) {
-                        System.out.printf("Send async message Failed, error: 
%s%n", future.getThrowable().getMessage());
-                    } else {
-                        System.out.printf("Send async message OK, msgId: 
%s%n", future.get().messageId());
-                    }
-                    countDownLatch.countDown();
+            final Future<SendResult> result = 
producer.sendAsync(producer.createBytesMessage(QUEUE, 
"OMS_HELLO_BODY".getBytes()));
+            result.addListener(future -> {
+                if (future.getThrowable() != null) {
+                    System.out.printf("Send async message Failed, error: 
%s%n", future.getThrowable().getMessage());
+                } else {
+                    System.out.printf("Send async message OK, msgId: %s%n", 
future.get().messageId());
                 }
+                countDownLatch.countDown();
             });
         }
 
@@ -68,7 +70,8 @@ public class SimpleProducer {
 
         try {
             countDownLatch.await();
-            Thread.sleep(500); // Wait some time for one-way delivery.
+            // Wait some time for one-way delivery.
+            Thread.sleep(500);
         } catch (InterruptedException ignore) {
         }
 
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
 
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
index 86aba410a..2c0059ab8 100644
--- 
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
+++ 
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
@@ -25,16 +25,22 @@ import io.openmessaging.producer.Producer;
 import io.openmessaging.producer.SendResult;
 
 public class SimplePullConsumer {
+
+    public static final String URL = 
"oms:rocketmq://localhost:9876/default:default";
+    public static final String QUEUE = "OMS_CONSUMER";
+
     public static void main(String[] args) {
+        // You need to set the environment variable 
OMS_RMQ_DIRECT_NAME_SRV=true
+
         final MessagingAccessPoint messagingAccessPoint =
-            
OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
+            OMS.getMessagingAccessPoint(URL);
 
         messagingAccessPoint.startup();
 
         final Producer producer = messagingAccessPoint.createProducer();
 
         final PullConsumer consumer = messagingAccessPoint.createPullConsumer(
-            OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER"));
+            OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, QUEUE));
 
         messagingAccessPoint.startup();
         System.out.printf("MessagingAccessPoint startup OK%n");
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
 
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
index 220c13230..92ccff1ba 100644
--- 
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
+++ 
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
@@ -20,13 +20,18 @@ import io.openmessaging.Message;
 import io.openmessaging.MessagingAccessPoint;
 import io.openmessaging.OMS;
 import io.openmessaging.OMSBuiltinKeys;
-import io.openmessaging.consumer.MessageListener;
 import io.openmessaging.consumer.PushConsumer;
 
 public class SimplePushConsumer {
+
+    public static final String URL = 
"oms:rocketmq://localhost:9876/default:default";
+    public static final String QUEUE = "OMS_HELLO_TOPIC";
+
     public static void main(String[] args) {
+        // You need to set the environment variable 
OMS_RMQ_DIRECT_NAME_SRV=true
+
         final MessagingAccessPoint messagingAccessPoint = OMS
-            
.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
+            .getMessagingAccessPoint(URL);
 
         final PushConsumer consumer = messagingAccessPoint.
             
createPushConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, 
"OMS_CONSUMER"));
@@ -34,20 +39,14 @@ public class SimplePushConsumer {
         messagingAccessPoint.startup();
         System.out.printf("MessagingAccessPoint startup OK%n");
 
-        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
-            @Override
-            public void run() {
-                consumer.shutdown();
-                messagingAccessPoint.shutdown();
-            }
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            consumer.shutdown();
+            messagingAccessPoint.shutdown();
         }));
 
-        consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
-            @Override
-            public void onReceived(Message message, Context context) {
-                System.out.printf("Received one message: %s%n", 
message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID));
-                context.ack();
-            }
+        consumer.attachQueue(QUEUE, (message, context) -> {
+            System.out.printf("Received one message: %s%n", 
message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID));
+            context.ack();
         });
 
         consumer.startup();

Reply via email to