This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 5e7baab95 [ISSUE #4323] Optimized openmessaging example code (#4347)
5e7baab95 is described below
commit 5e7baab953731a8d09a181a7891edfcea91d5fd6
Author: 李晓双 Li Xiao Shuang <[email protected]>
AuthorDate: Tue May 24 09:31:43 2022 +0800
[ISSUE #4323] Optimized openmessaging example code (#4347)
* Optimized openmessaging example code
* fix ci
---
.../example/openmessaging/SimpleProducer.java | 31 ++++++++++++----------
.../example/openmessaging/SimplePullConsumer.java | 10 +++++--
.../example/openmessaging/SimplePushConsumer.java | 27 +++++++++----------
3 files changed, 38 insertions(+), 30 deletions(-)
diff --git
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
index 08c00be79..803faaa23 100644
---
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
+++
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.example.openmessaging;
import io.openmessaging.Future;
-import io.openmessaging.FutureListener;
import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
@@ -26,9 +25,14 @@ import io.openmessaging.producer.SendResult;
import java.util.concurrent.CountDownLatch;
public class SimpleProducer {
+
+ public static final String URL =
"oms:rocketmq://localhost:9876/default:default";
+ public static final String QUEUE = "OMS_HELLO_TOPIC";
+
public static void main(String[] args) {
+ // You need to set the environment variable
OMS_RMQ_DIRECT_NAME_SRV=true
final MessagingAccessPoint messagingAccessPoint =
-
OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
+ OMS.getMessagingAccessPoint(URL);
final Producer producer = messagingAccessPoint.createProducer();
@@ -39,7 +43,8 @@ public class SimpleProducer {
System.out.printf("Producer startup OK%n");
{
- Message message = producer.createBytesMessage("OMS_HELLO_TOPIC",
"OMS_HELLO_BODY".getBytes());
+ Message message = producer.createBytesMessage(QUEUE,
"OMS_HELLO_BODY".getBytes());
+
SendResult sendResult = producer.send(message);
//final Void aVoid = result.get(3000L);
System.out.printf("Send async message OK, msgId: %s%n",
sendResult.messageId());
@@ -47,17 +52,14 @@ public class SimpleProducer {
final CountDownLatch countDownLatch = new CountDownLatch(1);
{
- final Future<SendResult> result =
producer.sendAsync(producer.createBytesMessage("OMS_HELLO_TOPIC",
"OMS_HELLO_BODY".getBytes()));
- result.addListener(new FutureListener<SendResult>() {
- @Override
- public void operationComplete(Future<SendResult> future) {
- if (future.getThrowable() != null) {
- System.out.printf("Send async message Failed, error:
%s%n", future.getThrowable().getMessage());
- } else {
- System.out.printf("Send async message OK, msgId:
%s%n", future.get().messageId());
- }
- countDownLatch.countDown();
+ final Future<SendResult> result =
producer.sendAsync(producer.createBytesMessage(QUEUE,
"OMS_HELLO_BODY".getBytes()));
+ result.addListener(future -> {
+ if (future.getThrowable() != null) {
+ System.out.printf("Send async message Failed, error:
%s%n", future.getThrowable().getMessage());
+ } else {
+ System.out.printf("Send async message OK, msgId: %s%n",
future.get().messageId());
}
+ countDownLatch.countDown();
});
}
@@ -68,7 +70,8 @@ public class SimpleProducer {
try {
countDownLatch.await();
- Thread.sleep(500); // Wait some time for one-way delivery.
+ // Wait some time for one-way delivery.
+ Thread.sleep(500);
} catch (InterruptedException ignore) {
}
diff --git
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
index 86aba410a..2c0059ab8 100644
---
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
+++
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
@@ -25,16 +25,22 @@ import io.openmessaging.producer.Producer;
import io.openmessaging.producer.SendResult;
public class SimplePullConsumer {
+
+ public static final String URL =
"oms:rocketmq://localhost:9876/default:default";
+ public static final String QUEUE = "OMS_CONSUMER";
+
public static void main(String[] args) {
+ // You need to set the environment variable
OMS_RMQ_DIRECT_NAME_SRV=true
+
final MessagingAccessPoint messagingAccessPoint =
-
OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
+ OMS.getMessagingAccessPoint(URL);
messagingAccessPoint.startup();
final Producer producer = messagingAccessPoint.createProducer();
final PullConsumer consumer = messagingAccessPoint.createPullConsumer(
- OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER"));
+ OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, QUEUE));
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
diff --git
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
index 220c13230..92ccff1ba 100644
---
a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
+++
b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
@@ -20,13 +20,18 @@ import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.OMSBuiltinKeys;
-import io.openmessaging.consumer.MessageListener;
import io.openmessaging.consumer.PushConsumer;
public class SimplePushConsumer {
+
+ public static final String URL =
"oms:rocketmq://localhost:9876/default:default";
+ public static final String QUEUE = "OMS_HELLO_TOPIC";
+
public static void main(String[] args) {
+ // You need to set the environment variable
OMS_RMQ_DIRECT_NAME_SRV=true
+
final MessagingAccessPoint messagingAccessPoint = OMS
-
.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
+ .getMessagingAccessPoint(URL);
final PushConsumer consumer = messagingAccessPoint.
createPushConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID,
"OMS_CONSUMER"));
@@ -34,20 +39,14 @@ public class SimplePushConsumer {
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- @Override
- public void run() {
- consumer.shutdown();
- messagingAccessPoint.shutdown();
- }
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ consumer.shutdown();
+ messagingAccessPoint.shutdown();
}));
- consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
- @Override
- public void onReceived(Message message, Context context) {
- System.out.printf("Received one message: %s%n",
message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID));
- context.ack();
- }
+ consumer.attachQueue(QUEUE, (message, context) -> {
+ System.out.printf("Received one message: %s%n",
message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID));
+ context.ack();
});
consumer.startup();