complone commented on code in PR #526:
URL: https://github.com/apache/rocketmq-spring/pull/526#discussion_r1117857576
##########
rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java:
##########
@@ -256,17 +276,171 @@ public void testSetRocketMQMessageListener() {
assertEquals(anno.instanceName(), container.getInstanceName());
}
+ @Test
+ public void testSelectorType() throws Exception {
+ DefaultRocketMQListenerContainer listenerContainer = new
DefaultRocketMQListenerContainer();
+ listenerContainer.setConsumer(new DefaultMQPushConsumer());
+ Method initSelectorType =
DefaultRocketMQListenerContainer.class.getDeclaredMethod("initSelectorType");
+ initSelectorType.setAccessible(true);
+
+ try {
+
listenerContainer.setRocketMQMessageListener(TagClass.class.getAnnotation(RocketMQMessageListener.class));
+ initSelectorType.invoke(listenerContainer);
+
+
listenerContainer.setRocketMQMessageListener(SQL92Class.class.getAnnotation(RocketMQMessageListener.class));
+ initSelectorType.invoke(listenerContainer);
+ } catch (Exception e) {
+
+ }
+ }
+
+ @Test
+ public void testBatchGetMessages() throws Exception {
+ DefaultRocketMQListenerContainer listenerContainer = new
DefaultRocketMQListenerContainer();
+ listenerContainer.setConsumer(new DefaultMQPushConsumer());
+ Field messageType =
DefaultRocketMQListenerContainer.class.getDeclaredField("messageType");
+ messageType.setAccessible(true);
+
+ ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "TestScheduledThread");
+ }
+ });
+ Runnable r = () -> {
+ try {
+ MessageExt msg = new MessageExt();
+ msg.setMsgId("X_SVEN_AUGUSTUS_0001");
+ msg.setBody((exceptedString + "
1").getBytes(listenerContainer.getCharset()));
+ MessageExt msg2 = new MessageExt();
+ msg2.setMsgId("X_SVEN_AUGUSTUS_0002");
+ msg2.setBody((exceptedString + "
2").getBytes(listenerContainer.getCharset()));
+ List<MessageExt> messages = Arrays.asList(msg, msg2);
+
+ MessageListener l =
listenerContainer.getConsumer().getMessageListener();
+ if (l instanceof MessageListenerConcurrently) {
+ ((MessageListenerConcurrently) l).consumeMessage(messages,
new ConsumeConcurrentlyContext(new MessageQueue()));
+ }
+ if (l instanceof MessageListenerOrderly) {
+ ((MessageListenerOrderly) l).consumeMessage(messages, new
ConsumeOrderlyContext(new MessageQueue()));
+ }
+ } catch (UnsupportedEncodingException e) {
+ e.printStackTrace();
+ }
+ };
+ messageType.set(listenerContainer, String.class);
+
+ // RocketMQBatchListener IN ConsumeMode.CONCURRENTLY, AND test for
excepted
+ tryRocketMQBatchListener(listenerContainer, ConcurrentlyClass.class,
scheduledExecutorService, r, exceptedString, true);// excepted
+
+ // RocketMQBatchListener IN ConsumeMode.CONCURRENTLY, AND test for not
excepted
+ tryRocketMQBatchListener(listenerContainer, ConcurrentlyClass.class,
scheduledExecutorService, r, notExceptedString, false);// not excepted
+
+ // RocketMQBatchListener IN ConsumeMode.ORDERLY, AND test for excepted
+ tryRocketMQBatchListener(listenerContainer, OrderlyClass.class,
scheduledExecutorService, r, exceptedString, true);// excepted
+
+ // RocketMQBatchListener IN ConsumeMode.ORDERLY, AND test for not
excepted
+ tryRocketMQBatchListener(listenerContainer, OrderlyClass.class,
scheduledExecutorService, r, notExceptedString, false);// not excepted
+
+ // RocketMQListener IN ConsumeMode.CONCURRENTLY, AND test for excepted
+ tryRocketMQListener(listenerContainer, ConcurrentlyClass.class,
scheduledExecutorService, r, exceptedString, true);// excepted
+
+ // RocketMQListener IN ConsumeMode.CONCURRENTLY, AND test for not
excepted
+ tryRocketMQListener(listenerContainer, ConcurrentlyClass.class,
scheduledExecutorService, r, notExceptedString, false);// not excepted
+
+ // RocketMQListener IN ConsumeMode.ORDERLY, AND test for excepted
+ tryRocketMQListener(listenerContainer, OrderlyClass.class,
scheduledExecutorService, r, exceptedString, true);// excepted
+
+ // RocketMQListener IN ConsumeMode.ORDERLY, AND test for not excepted
+ tryRocketMQListener(listenerContainer, OrderlyClass.class,
scheduledExecutorService, r, notExceptedString, false);// not excepted
+ }
+
+ private void tryRocketMQBatchListener(DefaultRocketMQListenerContainer
listenerContainer,
+ final Class<?>
rocketMQMessageListenerClass,
+ ScheduledExecutorService
scheduledExecutorService, Runnable r, final String exceptedValue,
+ boolean exceptedTrueOrFalse) throws
InvocationTargetException, IllegalAccessException, NoSuchMethodException,
InterruptedException {
+ Method initConsumeMode =
DefaultRocketMQListenerContainer.class.getDeclaredMethod("initConsumeMode");
+ initConsumeMode.setAccessible(true);
+
+ final Boolean[] result = new Boolean[] {Boolean.FALSE};
+
+ // RocketMQBatchListener IN ConsumeMode.CONCURRENTLY, AND test for
excepted
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ listenerContainer.setRocketMQBatchListener(new
RocketMQBatchListener<String>() {
+ @Override
+ public void onMessages(List<String> messages) {
+ result[0] = messages.stream().anyMatch(m ->
m.startsWith(exceptedValue));
+ countDownLatch.countDown();
+ }
+ });
+
listenerContainer.setRocketMQMessageListener(rocketMQMessageListenerClass.getAnnotation(RocketMQMessageListener.class));
+ initConsumeMode.invoke(listenerContainer);
+ scheduledExecutorService.schedule(r, 100, TimeUnit.MILLISECONDS);
+ countDownLatch.await(1000, TimeUnit.MILLISECONDS);
+ if (exceptedTrueOrFalse) {
+ assertThat(result[0]).isTrue(); // excepted
+ } else {
+ assertThat(result[0]).isFalse(); // not excepted
+ }
+ }
+
+ private void tryRocketMQListener(DefaultRocketMQListenerContainer
listenerContainer,
+ final Class<?>
rocketMQMessageListenerClass,
+ ScheduledExecutorService
scheduledExecutorService, Runnable r, final String exceptedValue,
+ boolean exceptedTrueOrFalse) throws
InvocationTargetException, IllegalAccessException, NoSuchMethodException,
InterruptedException {
+ Method initConsumeMode =
DefaultRocketMQListenerContainer.class.getDeclaredMethod("initConsumeMode");
+ initConsumeMode.setAccessible(true);
+
+ final Boolean[] result = new Boolean[] {Boolean.FALSE};
+
+ // RocketMQBatchListener IN ConsumeMode.CONCURRENTLY, AND test for
excepted
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ listenerContainer.setRocketMQListener(new RocketMQListener<String>() {
+ @Override
+ public void onMessage(String message) {
+ result[0] = message.startsWith(exceptedValue);
+ countDownLatch.countDown();
+ }
+ });
+
listenerContainer.setRocketMQMessageListener(rocketMQMessageListenerClass.getAnnotation(RocketMQMessageListener.class));
+ initConsumeMode.invoke(listenerContainer);
+ scheduledExecutorService.schedule(r, 100, TimeUnit.MILLISECONDS);
+ countDownLatch.await(1000, TimeUnit.MILLISECONDS);
+ if (exceptedTrueOrFalse) {
+ assertThat(result[0]).isTrue(); // excepted
+ } else {
+ assertThat(result[0]).isFalse(); // not excepted
+ }
+ }
+
+ @RocketMQMessageListener(consumerGroup = "consumerGroup1", topic = "test",
selectorExpression = "*", selectorType = SelectorType.TAG)
+ static class TagClass {
+ }
+
+ @RocketMQMessageListener(consumerGroup = "consumerGroup1", topic = "test",
selectorType = SelectorType.SQL92)
+ static class SQL92Class {
+ }
+
+ @RocketMQMessageListener(consumerGroup = "consumerGroup1", topic = "test",
consumeMode = ConsumeMode.CONCURRENTLY)
+ static class ConcurrentlyClass {
+ }
+
+ @RocketMQMessageListener(consumerGroup = "consumerGroup1", topic = "test",
consumeMode = ConsumeMode.ORDERLY)
+ static class OrderlyClass {
+ }
+
+
@RocketMQMessageListener(consumerGroup = "abc1", topic = "test",
- consumeMode = ConsumeMode.ORDERLY,
- consumeThreadNumber = 3456,
- messageModel = MessageModel.BROADCASTING,
- selectorType = SelectorType.SQL92,
- selectorExpression = "selectorExpression",
- tlsEnable = "tlsEnable",
- namespace = "namespace",
- delayLevelWhenNextConsume = 1234,
- suspendCurrentQueueTimeMillis = 2345,
- instanceName = "instanceName"
+ consumeMode = ConsumeMode.ORDERLY,
+ consumeThreadNumber = 3456,
+ messageModel = MessageModel.BROADCASTING,
+ selectorType = SelectorType.SQL92,
+ selectorExpression = "selectorExpression",
+ tlsEnable = "tlsEnable",
+ namespace = "namespace",
+ delayLevelWhenNextConsume = 1234,
+ suspendCurrentQueueTimeMillis = 2345,
+ instanceName = "instanceName"
Review Comment:
add consumeMessageBatchMaxSize
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]