RongtongJin commented on code in PR #9549: URL: https://github.com/apache/rocketmq/pull/9549#discussion_r2244188570
########## broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java: ########## @@ -1610,6 +1619,37 @@ protected void shutdownBasicService() { } } + public void publishRouteEvent(String eventType) { + + Map<String, Object> eventData = new HashMap<>(); + eventData.put("eventType", eventType); + eventData.put("brokerName", this.brokerConfig.getBrokerName()); + eventData.put("brokerId", this.brokerConfig.getBrokerId()); + eventData.put("timestamp", System.currentTimeMillis()); + eventData.put("affectedTopic", this.getTopicConfigManager().getTopicConfigTable().keySet().toArray()); Review Comment: topic可能会有百万级别的数据,要看下什么事件类型,是否有必要传送,如果有必要(建议不传送这么大的数据),则是否要做切分,否则传送这么大的数据,消息体也是受限制的 ########## broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java: ########## @@ -1610,6 +1619,37 @@ protected void shutdownBasicService() { } } + public void publishRouteEvent(String eventType) { Review Comment: eventType用枚举类 ########## broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java: ########## @@ -1610,6 +1619,37 @@ protected void shutdownBasicService() { } } + public void publishRouteEvent(String eventType) { + + Map<String, Object> eventData = new HashMap<>(); + eventData.put("eventType", eventType); + eventData.put("brokerName", this.brokerConfig.getBrokerName()); + eventData.put("brokerId", this.brokerConfig.getBrokerId()); + eventData.put("timestamp", System.currentTimeMillis()); + eventData.put("affectedTopic", this.getTopicConfigManager().getTopicConfigTable().keySet().toArray()); + LOG.info("[ROUTE_UPDATE]: eventData {}", eventData); + + MessageExtBrokerInner innerMsg = new MessageExtBrokerInner(); + innerMsg.setTopic(TopicValidator.RMQ_ROUTE_EVENT_TOPIC); + innerMsg.setBody(JSON.toJSONString(eventData).getBytes(StandardCharsets.UTF_8)); + innerMsg.setTags("broker_event"); Review Comment: tags是否有实际意义,比如不同类型的事件,作为不同的tags,如果所有都是broker_event,那就没有必要设置tags ########## broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java: ########## @@ -1610,6 +1619,37 @@ protected void shutdownBasicService() { } } + public void publishRouteEvent(String eventType) { + + Map<String, Object> eventData = new HashMap<>(); + eventData.put("eventType", eventType); + eventData.put("brokerName", this.brokerConfig.getBrokerName()); + eventData.put("brokerId", this.brokerConfig.getBrokerId()); + eventData.put("timestamp", System.currentTimeMillis()); + eventData.put("affectedTopic", this.getTopicConfigManager().getTopicConfigTable().keySet().toArray()); + LOG.info("[ROUTE_UPDATE]: eventData {}", eventData); + + MessageExtBrokerInner innerMsg = new MessageExtBrokerInner(); + innerMsg.setTopic(TopicValidator.RMQ_ROUTE_EVENT_TOPIC); + innerMsg.setBody(JSON.toJSONString(eventData).getBytes(StandardCharsets.UTF_8)); + innerMsg.setTags("broker_event"); + innerMsg.setQueueId(0); + innerMsg.setBornTimestamp(System.currentTimeMillis()); + innerMsg.setBornHost(this.getStoreHost()); + innerMsg.setStoreHost(this.getStoreHost()); + innerMsg.setSysFlag(0); + LOG.info("[ROUTE_UPDATE]: innerMsg {}", innerMsg); + + PutMessageResult result = this.messageStore.putMessage(innerMsg); + this.messageStore.flush(); + + if (result.getPutMessageStatus() == PutMessageStatus.PUT_OK) { Review Comment: 这里用PutMessageResult.isOk来做判断吧 ########## proxy/src/main/java/org/apache/rocketmq/proxy/service/route/RouteEventSubscriber.java: ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.proxy.service.route; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import com.alibaba.fastjson2.JSON; + +public class RouteEventSubscriber { + private final Consumer<String> dirtyMarker; + private final DefaultMQPushConsumer consumer; + private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); + + public RouteEventSubscriber(Consumer<String> dirtyMarker) { + this.dirtyMarker = dirtyMarker; + this.consumer = new DefaultMQPushConsumer("PROXY_ROUTE_EVENT_GROUP"); + } + public void start() { + try { + consumer.subscribe(TopicValidator.RMQ_ROUTE_EVENT_TOPIC, "*"); + LOGGER.warn("Subscribed to system topic: {}", TopicValidator.RMQ_ROUTE_EVENT_TOPIC); + + consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { + LOGGER.warn("[ROUTE_UPDATE] Received {} events", msgs.size()); + processMessages(msgs); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + }); + + consumer.start(); Review Comment: 这里没有用广播模式。。正常情况下proxy肯定不止一个,那么只能有一个proxy被通知到。建议可以参考下AbstractSystemMessageSyncer、 HeartbeatSyncer等类重构一下这部分 ########## proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java: ########## @@ -226,4 +246,82 @@ protected MessageQueueView buildMessageQueueView(String topic, TopicRouteData to } return MessageQueueView.WRAPPED_EMPTY_QUEUE; } + + protected void markCacheDirty(String topic) { + long currentTime = System.currentTimeMillis(); + Long previousTime = dirtyTopics.get(topic); + + if (previousTime == null) { + log.warn("[ROUTE_UPDATE]: markCacheDirty (NEW): {}", topic); + dirtyTopics.put(topic, currentTime); + pendingTopics.offer(topic); + } else { + log.info("[ROUTE_UPDATE]: markCacheDirty (EXISTING): {} lastMarked={}", + topic, new Date(previousTime)); + } + } + + private void batchRefreshRoutes() { + int pendingCount = pendingTopics.size(); + if (pendingCount == 0) { + return; + } + + log.warn("[ROUTE_UPDATE] Starting batch refresh for {} topics", pendingCount); + + List<String> refreshList = new ArrayList<>(Math.min(pendingCount, 100)); + int count = 0; + while (!pendingTopics.isEmpty() && count < 100) { + String topic = pendingTopics.poll(); + if (topic != null) { + refreshList.add(topic); + count++; + log.warn("[ROUTE_UPDATE]: Adding to refresh: {}", topic); + } + } + + log.warn("[ROUTE_UPDATE] Will refresh {} topics: {}", refreshList.size(), refreshList); + + List<CompletableFuture<Void>> futures = new ArrayList<>(); + for (String topic : refreshList) { + futures.add(CompletableFuture.runAsync(() -> { + try { + refreshSingleRoute(topic); + log.warn("[ROUTE_UPDATE]: Refresh topic route success: {}", topic); + } catch (Exception e) { + log.warn("[ROUTE_UPDATE]: Refresh topic route failed: {}", topic, e); + pendingTopics.offer(topic); + } + }, cacheRefreshExecutor)); + } + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + } + + private void refreshSingleRoute(String topic) { + try { + log.warn("[ROUTE_UPDATE]: Refreshing route for: {}", topic); + long startTime = System.currentTimeMillis(); + + TopicRouteData routeData = mqClientAPIFactory.getClient() + .getTopicRouteInfoFromNameServer(topic, 1000); + + if (routeData == null) { + log.warn("[ROUTE_UPDATE]: Null route data for topic: {}", topic); + return; + } + + MessageQueueView newView = buildMessageQueueView(topic, routeData); + topicCache.put(topic, newView); + dirtyTopics.remove(topic); + + long duration = System.currentTimeMillis() - startTime; + log.warn("[ROUTE_UPDATE]: Refresh success for {} in {}ms.", + topic, duration); + } catch (Exception e) { + log.error("[ROUTE_UPDATE]: Refresh failed for: {}", topic, e); + pendingTopics.offer(topic); + } + } Review Comment: 要看下是否相关topic直接用topicCache.invalidate就行,不需要自己去刷。 ########## proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java: ########## @@ -18,17 +18,47 @@ import java.util.List; import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; +import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.common.Address; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; public class ClusterTopicRouteService extends TopicRouteService { + private final RouteEventSubscriber eventSubscriber; + private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); Review Comment: 为什么只在ClusterTopicRouteService类里面有,LocalTopicRouteService不需要吗?还是建议单独抽象类出来进行处理 ########## broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java: ########## @@ -1610,6 +1619,37 @@ protected void shutdownBasicService() { } } + public void publishRouteEvent(String eventType) { + + Map<String, Object> eventData = new HashMap<>(); + eventData.put("eventType", eventType); + eventData.put("brokerName", this.brokerConfig.getBrokerName()); + eventData.put("brokerId", this.brokerConfig.getBrokerId()); + eventData.put("timestamp", System.currentTimeMillis()); + eventData.put("affectedTopic", this.getTopicConfigManager().getTopicConfigTable().keySet().toArray()); Review Comment: 这里任何硬编码的字符串都建议用常量,建议看下是否能把相关方法都收敛到一个类中 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org