onejimmyboy opened a new issue, #7251:
URL: https://github.com/apache/rocketmq/issues/7251

   ### Before Creating the Bug Report
   
   - [X] I found a bug, not just asking a question, which should be created in 
[GitHub Discussions](https://github.com/apache/rocketmq/discussions).
   
   - [X] I have searched the [GitHub 
Issues](https://github.com/apache/rocketmq/issues) and [GitHub 
Discussions](https://github.com/apache/rocketmq/discussions)  of this 
repository and believe that this is not a duplicate.
   
   - [X] I have confirmed that this bug belongs to the current repository, not 
other repositories of RocketMQ.
   
   
   ### Runtime platform environment
   
   centos7
   
   ### RocketMQ version
   
   RocketMQ 5.1.3
   集群模式:单机版+Local Proxy
   
   ### JDK Version
   
   1.8
   
   ### Describe the Bug
   
   rocketmq开启Proxy,消费者连接8081Endpoints消费时,不会自动生成以消费者组的重试队列:%RETRY%+consumerGroup。
   
而在查询消费者组消费状态examineConsumeStats和消费者组的连接信息examineConsumerConnectionInfo的时候都会使用到%RETRY%+consumerGroup。如下:
    @Override
       public ConsumeStats examineConsumeStats(String consumerGroup,
           String topic) throws RemotingException, MQClientException, 
InterruptedException, MQBrokerException {
           TopicRouteData topicRouteData = null;
           List<String> routeTopics = new ArrayList<>();
           routeTopics.add(MixAll.getRetryTopic(consumerGroup));
   .........
   
   
    @Override
       public ConsumerConnection examineConsumerConnectionInfo(
           String consumerGroup) throws InterruptedException, MQBrokerException,
           RemotingException, MQClientException {
           ConsumerConnection result = new ConsumerConnection();
           String topic = MixAll.getRetryTopic(consumerGroup);
           List<BrokerData> brokers = 
this.examineTopicRouteInfo(topic).getBrokerDatas();
   ............
   
   
   public static String getRetryTopic(final String consumerGroup) {
           return RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
       }
   
   另外:使用endpoints消费消息时,如果消费失败,会生成MixAll.RETRY_GROUP_TOPIC_PREFIX + cid + "_" + 
topic;这个topic。
   
   
   ### Steps to Reproduce
   
   1:启动broker
   2:启动消费者
   3:生产消息
   5:使用rocketmq-console查看消费者组信息
   
   
   ### What Did You Expect to See?
   
   
消费者启动时,生成%RETRY%+consumerGroup的重试topic,examineConsumeStats和消费者组的连接信息examineConsumerConnectionInfo接口查询有返回值
   
   ### What Did You See Instead?
   
   
消费者启动时,没有生成%RETRY%+consumerGroup的重试topic,examineConsumeStats和消费者组的连接信息examineConsumerConnectionInfo接口查询没有返回值
   
   ### Additional Context
   
   消费者代码:
    @Bean(name = "MyConsumer")
       public void MyConsumer(){
           log.info("MyConsumer start ...");
           ClientServiceProvider provider = ClientServiceProvider.loadService();
           ClientConfigurationBuilder builder = new 
ClientConfigurationBuilder();
           if (aclEnable){
               SessionCredentialsProvider sessionCredentialsProvider = new 
StaticSessionCredentialsProvider(accessKey,secretKey);
               builder = ClientConfiguration.newBuilder()
                       .setCredentialProvider(sessionCredentialsProvider)
                       .setEndpoints(endPoints);
           }else {
               builder = ClientConfiguration.newBuilder()
                       .setEndpoints(endPoints);
           }
           ClientConfiguration configuration = builder.build();
           try {
               log.info("构建5.0消费者 
endPoints:{},topic:{},consumerGroup:{}",endPoints,topic,consumerGroup);
               String tag = "*";
               FilterExpression filterExpression = new FilterExpression(tag, 
FilterExpressionType.TAG);
               provider.newPushConsumerBuilder()
                       .setClientConfiguration(configuration)
                       .setConsumerGroup(consumerGroup)
                       
.setSubscriptionExpressions(Collections.singletonMap(topic,filterExpression))
                       .setMessageListener(messageView -> {
                           log.info("接受到的完整消息:{}",messageView);
                           String str = 
StandardCharsets.UTF_8.decode(messageView.getBody()).toString();
                           log.info("消息内容为:{}",str);
                           return ConsumeResult.FAILURE;
                       }).build();
               log.info("构建5.0消费者成功");
   
           }catch (Exception e){
               log.error("构建5.0消费者异常",e);
           }
       }


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to