qinyi8888 commented on issue #7187: URL: https://github.com/apache/rocketmq/issues/7187#issuecomment-2556518772
目前自己创建,临时解决方法: 引入配置文件: implementation('org.apache.rocketmq:rocketmq-tools:5.0.0') 简单实现代码: `/** * @author * @date 2024/12/20 */ @Slf4j @Component public class InitRocketmqTopicRunner implements ApplicationRunner { @Resource private RedisService redisService; @Value("${rocketmq.name-server}") private String nameServerAddr; @Override public void run(ApplicationArguments args) throws Exception { Object flag = redisService.get(RedisKeys.SYSTEM_INITIALIZATION_FLAG); if (flag == null) { log.info("Init Rocketmq Topic"); initTopic(); redisService.setCacheObject(RedisKeys.SYSTEM_INITIALIZATION_FLAG, true); } } private void initTopic() { DefaultMQAdminExt admin = new DefaultMQAdminExt(); admin.setInstanceName(Long.toString(System.currentTimeMillis())); admin.setNamesrvAddr(nameServerAddr); try { admin.start(); ClusterInfo clusterInfo = admin.examineBrokerClusterInfo(); Set<String> clusterSet = clusterInfo.getClusterAddrTable().keySet(); if (clusterSet.isEmpty()) { log.error("No clusters found in RocketMQ."); return; } // Use the first cluster found String clusterName = clusterSet.iterator().next(); log.info("Using cluster: {}", clusterName); Field[] fields = TopicConstants.class.getDeclaredFields(); for (Field field : fields) { if (field.getType() == String.class) { String topic = (String) field.get(null); if (checkIfTopicExists(admin, topic)) { log.info("Topic {} already exists.", topic); } else { createTopic(admin,clusterName, topic); } } } } catch (Exception e) { log.error("Failed to initialize RocketMQ topics", e); } finally { admin.shutdown(); } } private boolean checkIfTopicExists(DefaultMQAdminExt admin, String topic) { try { TopicRouteData topicRouteData = admin.examineTopicRouteInfo(topic); return topicRouteData != null && !topicRouteData.getQueueDatas().isEmpty(); } catch (MQClientException | RemotingException | InterruptedException e) { // If the topic does not exist, an exception will be thrown log.warn("Topic {} does not exist.", topic); return false; } } private void createTopic(DefaultMQAdminExt admin, String clusterName, String topic) throws Exception{ admin.createTopic(clusterName, topic, 4,null); log.info("Created topic {}", topic); }` 'public interface TopicConstants { String DOWN_INVOKE_SERVICE = "down_invoke_service"; }' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org