qinyi8888 commented on issue #7187:
URL: https://github.com/apache/rocketmq/issues/7187#issuecomment-2556518772

   目前自己创建,临时解决方法:
   引入配置文件:
   implementation('org.apache.rocketmq:rocketmq-tools:5.0.0')
    简单实现代码: 
   `/**
    * @author
    * @date 2024/12/20
    */
   @Slf4j
   @Component
   public class InitRocketmqTopicRunner implements ApplicationRunner {
   
       @Resource
       private RedisService redisService;
   
       @Value("${rocketmq.name-server}")
       private String nameServerAddr;
   
       @Override
       public void run(ApplicationArguments args) throws Exception {
           Object flag = redisService.get(RedisKeys.SYSTEM_INITIALIZATION_FLAG);
           if (flag == null) {
               log.info("Init Rocketmq Topic");
               initTopic();
               
redisService.setCacheObject(RedisKeys.SYSTEM_INITIALIZATION_FLAG, true);
           }
       }
   
       private void initTopic() {
           DefaultMQAdminExt admin = new DefaultMQAdminExt();
           admin.setInstanceName(Long.toString(System.currentTimeMillis()));
           admin.setNamesrvAddr(nameServerAddr);
           try {
               admin.start();
               ClusterInfo clusterInfo = admin.examineBrokerClusterInfo();
               Set<String> clusterSet = 
clusterInfo.getClusterAddrTable().keySet();
               if (clusterSet.isEmpty()) {
                   log.error("No clusters found in RocketMQ.");
                   return;
               }
               // Use the first cluster found
               String clusterName = clusterSet.iterator().next();
               log.info("Using cluster: {}", clusterName);
               Field[] fields = TopicConstants.class.getDeclaredFields();
               for (Field field : fields) {
                   if (field.getType() == String.class) {
                       String topic = (String) field.get(null);
                       if (checkIfTopicExists(admin, topic)) {
                           log.info("Topic {} already exists.", topic);
                       } else {
                           createTopic(admin,clusterName, topic);
                       }
                   }
               }
           } catch (Exception e) {
               log.error("Failed to initialize RocketMQ topics", e);
           } finally {
               admin.shutdown();
           }
       }
   
       private boolean checkIfTopicExists(DefaultMQAdminExt admin, String 
topic) {
           try {
               TopicRouteData topicRouteData = 
admin.examineTopicRouteInfo(topic);
               return topicRouteData != null && 
!topicRouteData.getQueueDatas().isEmpty();
           } catch (MQClientException | RemotingException | 
InterruptedException e) {
               // If the topic does not exist, an exception will be thrown
               log.warn("Topic {} does not exist.", topic);
               return false;
           }
       }
       private void createTopic(DefaultMQAdminExt admin, String clusterName, 
String topic) throws Exception{
           admin.createTopic(clusterName, topic, 4,null);
           log.info("Created topic {}", topic);
       }`
       'public interface TopicConstants {
       String DOWN_INVOKE_SERVICE = "down_invoke_service";
       }'


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to