This is an automated email from the ASF dual-hosted git repository. dinglei pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new f1b411cec Remove filter server module (#6749) f1b411cec is described below commit f1b411cecc3a9c441fdec2caf5867601419f3fc0 Author: rongtong <jinrongto...@163.com> AuthorDate: Mon May 15 14:04:01 2023 +0800 Remove filter server module (#6749) * Remove filter server module * Pass the check style * Remove filterServerNum config * Remove more related code --- .../apache/rocketmq/broker/BrokerController.java | 19 +-- .../broker/client/ClientHousekeepingService.java | 4 - .../broker/filtersrv/FilterServerManager.java | 169 --------------------- .../broker/filtersrv/FilterServerUtil.java | 42 ----- .../broker/processor/AdminBrokerProcessor.java | 21 --- .../broker/filtersrv/FilterServerManagerTest.java | 88 ----------- .../rocketmq/client/impl/MQClientAPIImpl.java | 29 ---- .../client/impl/factory/MQClientInstance.java | 69 --------- .../org/apache/rocketmq/common/BrokerConfig.java | 10 -- .../RegisterFilterServerRequestHeader.java | 39 ----- .../RegisterFilterServerResponseHeader.java | 49 ------ .../RegisterMessageFilterClassRequestHeader.java | 69 --------- 12 files changed, 2 insertions(+), 606 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 22c403eaf..329bd86c0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.broker; +import com.google.common.collect.Lists; import java.io.IOException; import java.net.InetSocketAddress; import java.util.AbstractMap; @@ -55,7 +56,6 @@ import org.apache.rocketmq.broker.dledger.DLedgerRoleChangeHandler; import org.apache.rocketmq.broker.failover.EscapeBridge; import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap; import org.apache.rocketmq.broker.filter.ConsumerFilterManager; -import org.apache.rocketmq.broker.filtersrv.FilterServerManager; import org.apache.rocketmq.broker.latency.BrokerFastFailure; import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor; import org.apache.rocketmq.broker.longpolling.LmqPullRequestHoldService; @@ -212,7 +212,6 @@ public class BrokerController { protected final BlockingQueue<Runnable> endTransactionThreadPoolQueue; protected final BlockingQueue<Runnable> adminBrokerThreadPoolQueue; protected final BlockingQueue<Runnable> loadBalanceThreadPoolQueue; - protected final FilterServerManager filterServerManager; protected final BrokerStatsManager brokerStatsManager; protected final List<SendMessageHook> sendMessageHookList = new ArrayList<>(); protected final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<>(); @@ -327,8 +326,6 @@ public class BrokerController { this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig); } - this.filterServerManager = new FilterServerManager(this); - this.queryAssignmentProcessor = new QueryAssignmentProcessor(this); this.clientManageProcessor = new ClientManageProcessor(this); this.slaveSynchronize = new SlaveSynchronize(this); @@ -1353,10 +1350,6 @@ public class BrokerController { this.consumerOffsetManager.persist(); - if (this.filterServerManager != null) { - this.filterServerManager.shutdown(); - } - if (this.brokerFastFailure != null) { this.brokerFastFailure.shutdown(); } @@ -1530,10 +1523,6 @@ public class BrokerController { this.clientHousekeepingService.start(); } - if (this.filterServerManager != null) { - this.filterServerManager.start(); - } - if (this.brokerStatsManager != null) { this.brokerStatsManager.start(); } @@ -1730,7 +1719,7 @@ public class BrokerController { this.brokerConfig.getBrokerId(), this.getHAServerAddr(), topicConfigWrapper, - this.filterServerManager.buildNewFilterServerList(), + Lists.newArrayList(), oneway, this.brokerConfig.getRegisterBrokerTimeoutMills(), this.brokerConfig.isEnableSlaveActingMaster(), @@ -2087,10 +2076,6 @@ public class BrokerController { return ackThreadPoolQueue; } - public FilterServerManager getFilterServerManager() { - return filterServerManager; - } - public BrokerStatsManager getBrokerStatsManager() { return brokerStatsManager; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java index dafb50d36..98e5f450f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java @@ -56,7 +56,6 @@ public class ClientHousekeepingService implements ChannelEventListener { private void scanExceptionChannel() { this.brokerController.getProducerManager().scanNotActiveChannel(); this.brokerController.getConsumerManager().scanNotActiveChannel(); - this.brokerController.getFilterServerManager().scanNotActiveChannel(); } public void shutdown() { @@ -72,7 +71,6 @@ public class ClientHousekeepingService implements ChannelEventListener { public void onChannelClose(String remoteAddr, Channel channel) { this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel); this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); - this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel); this.brokerController.getBrokerStatsManager().incChannelCloseNum(); } @@ -80,7 +78,6 @@ public class ClientHousekeepingService implements ChannelEventListener { public void onChannelException(String remoteAddr, Channel channel) { this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel); this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); - this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel); this.brokerController.getBrokerStatsManager().incChannelExceptionNum(); } @@ -88,7 +85,6 @@ public class ClientHousekeepingService implements ChannelEventListener { public void onChannelIdle(String remoteAddr, Channel channel) { this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel); this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); - this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel); this.brokerController.getBrokerStatsManager().incChannelIdleNum(); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java deleted file mode 100644 index 57497f904..000000000 --- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.broker.filtersrv; - -import io.netty.channel.Channel; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.broker.BrokerStartup; -import org.apache.rocketmq.common.AbstractBrokerRunnable; -import org.apache.rocketmq.common.ThreadFactoryImpl; -import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.utils.NetworkUtil; -import org.apache.rocketmq.logging.org.slf4j.Logger; -import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import org.apache.rocketmq.remoting.common.RemotingHelper; - -public class FilterServerManager { - - public static final long FILTER_SERVER_MAX_IDLE_TIME_MILLS = 30000; - private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private final ConcurrentMap<Channel, FilterServerInfo> filterServerTable = - new ConcurrentHashMap<>(16); - private final BrokerController brokerController; - - private ScheduledExecutorService scheduledExecutorService = Executors - .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FilterServerManagerScheduledThread")); - - public FilterServerManager(final BrokerController brokerController) { - this.brokerController = brokerController; - } - - public void start() { - - this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(brokerController.getBrokerConfig()) { - @Override - public void run0() { - try { - FilterServerManager.this.createFilterServer(); - } catch (Exception e) { - log.error("", e); - } - } - }, 1000 * 5, 1000 * 30, TimeUnit.MILLISECONDS); - } - - public void createFilterServer() { - int more = - this.brokerController.getBrokerConfig().getFilterServerNums() - this.filterServerTable.size(); - String cmd = this.buildStartCommand(); - for (int i = 0; i < more; i++) { - FilterServerUtil.callShell(cmd, log); - } - } - - private String buildStartCommand() { - String config = ""; - if (BrokerStartup.CONFIG_FILE_HELPER.getFile() != null) { - config = String.format("-c %s", BrokerStartup.CONFIG_FILE_HELPER.getFile()); - } - - if (this.brokerController.getBrokerConfig().getNamesrvAddr() != null) { - config += String.format(" -n %s", this.brokerController.getBrokerConfig().getNamesrvAddr()); - } - - if (NetworkUtil.isWindowsPlatform()) { - return String.format("start /b %s\\bin\\mqfiltersrv.exe %s", - this.brokerController.getBrokerConfig().getRocketmqHome(), - config); - } else { - return String.format("sh %s/bin/startfsrv.sh %s", - this.brokerController.getBrokerConfig().getRocketmqHome(), - config); - } - } - - public void shutdown() { - this.scheduledExecutorService.shutdown(); - } - - public void registerFilterServer(final Channel channel, final String filterServerAddr) { - FilterServerInfo filterServerInfo = this.filterServerTable.get(channel); - if (filterServerInfo != null) { - filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis()); - } else { - filterServerInfo = new FilterServerInfo(); - filterServerInfo.setFilterServerAddr(filterServerAddr); - filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis()); - this.filterServerTable.put(channel, filterServerInfo); - log.info("Receive a New Filter Server<{}>", filterServerAddr); - } - } - - public void scanNotActiveChannel() { - - Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<Channel, FilterServerInfo> next = it.next(); - long timestamp = next.getValue().getLastUpdateTimestamp(); - Channel channel = next.getKey(); - if ((System.currentTimeMillis() - timestamp) > FILTER_SERVER_MAX_IDLE_TIME_MILLS) { - log.info("The Filter Server<{}> expired, remove it", next.getKey()); - it.remove(); - RemotingHelper.closeChannel(channel); - } - } - } - - public void doChannelCloseEvent(final String remoteAddr, final Channel channel) { - FilterServerInfo old = this.filterServerTable.remove(channel); - if (old != null) { - log.warn("The Filter Server<{}> connection<{}> closed, remove it", old.getFilterServerAddr(), - remoteAddr); - } - } - - public List<String> buildNewFilterServerList() { - List<String> addr = new ArrayList<>(); - Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<Channel, FilterServerInfo> next = it.next(); - addr.add(next.getValue().getFilterServerAddr()); - } - return addr; - } - - static class FilterServerInfo { - private String filterServerAddr; - private long lastUpdateTimestamp; - - public String getFilterServerAddr() { - return filterServerAddr; - } - - public void setFilterServerAddr(String filterServerAddr) { - this.filterServerAddr = filterServerAddr; - } - - public long getLastUpdateTimestamp() { - return lastUpdateTimestamp; - } - - public void setLastUpdateTimestamp(long lastUpdateTimestamp) { - this.lastUpdateTimestamp = lastUpdateTimestamp; - } - } -} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java deleted file mode 100644 index dc1a5f850..000000000 --- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.broker.filtersrv; - - -import org.apache.rocketmq.logging.org.slf4j.Logger; - -public class FilterServerUtil { - public static void callShell(final String shellString, final Logger log) { - Process process = null; - try { - String[] cmdArray = splitShellString(shellString); - process = Runtime.getRuntime().exec(cmdArray); - process.waitFor(); - log.info("CallShell: <{}> OK", shellString); - } catch (Throwable e) { - log.error("CallShell: readLine IOException, {}", shellString, e); - } finally { - if (null != process) - process.destroy(); - } - } - - private static String[] splitShellString(final String shellString) { - return shellString.split(" "); - } -} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index be673b916..0a05239e7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -161,8 +161,6 @@ import org.apache.rocketmq.remoting.protocol.header.SearchOffsetResponseHeader; import org.apache.rocketmq.remoting.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader; import org.apache.rocketmq.remoting.protocol.header.UpdateGroupForbiddenRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ViewBrokerStatsDataRequestHeader; -import org.apache.rocketmq.remoting.protocol.header.filtersrv.RegisterFilterServerRequestHeader; -import org.apache.rocketmq.remoting.protocol.header.filtersrv.RegisterFilterServerResponseHeader; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.remoting.protocol.statictopic.LogicQueueMappingItem; import org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping; @@ -263,8 +261,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return this.queryTopicsByConsumer(ctx, request); case RequestCode.QUERY_SUBSCRIPTION_BY_CONSUMER: return this.querySubscriptionByConsumer(ctx, request); - case RequestCode.REGISTER_FILTER_SERVER: - return this.registerFilterServer(ctx, request); case RequestCode.QUERY_CONSUME_TIME_SPAN: return this.queryConsumeTimeSpan(ctx, request); case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_BROKER: @@ -1862,23 +1858,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { } - private RemotingCommand registerFilterServer(ChannelHandlerContext ctx, - RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterFilterServerResponseHeader.class); - final RegisterFilterServerResponseHeader responseHeader = (RegisterFilterServerResponseHeader) response.readCustomHeader(); - final RegisterFilterServerRequestHeader requestHeader = - (RegisterFilterServerRequestHeader) request.decodeCommandCustomHeader(RegisterFilterServerRequestHeader.class); - - this.brokerController.getFilterServerManager().registerFilterServer(ctx.channel(), requestHeader.getFilterServerAddr()); - - responseHeader.setBrokerId(this.brokerController.getBrokerConfig().getBrokerId()); - responseHeader.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); - - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); - return response; - } - private RemotingCommand queryConsumeTimeSpan(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filtersrv/FilterServerManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filtersrv/FilterServerManagerTest.java deleted file mode 100644 index 46cd460d3..000000000 --- a/broker/src/test/java/org/apache/rocketmq/broker/filtersrv/FilterServerManagerTest.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.broker.filtersrv; - -import io.netty.channel.Channel; -import java.util.List; -import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.common.BrokerConfig; -import org.assertj.core.api.Assertions; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -import static org.mockito.Mockito.when; - -@RunWith(MockitoJUnitRunner.class) -public class FilterServerManagerTest { - - @Mock - private BrokerController brokerController; - - private FilterServerManager filterServerManager; - - private BrokerConfig brokerConfig = new BrokerConfig(); - - @Mock - private Channel channel; - - private static final String FILTER_SERVER_ADDR = "192.168.1.1"; - - @Before - public void before() throws InterruptedException { - when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); - filterServerManager = new FilterServerManager(brokerController); - filterServerManager.start(); - filterServerManager.registerFilterServer(channel, FILTER_SERVER_ADDR); - } - - @After - public void after() { - filterServerManager.shutdown(); - brokerController.shutdown(); - } - - @Test - public void createFilterServerTest() { - Assertions.assertThatCode(() -> filterServerManager.createFilterServer()).doesNotThrowAnyException(); - } - - @Test - public void registerFilterServerTest() { - Assertions.assertThatCode(() -> filterServerManager.registerFilterServer(channel, FILTER_SERVER_ADDR)).doesNotThrowAnyException(); - } - - @Test - public void scanNotActiveChannelTest() { - Assertions.assertThatCode(() -> filterServerManager.scanNotActiveChannel()).doesNotThrowAnyException(); - } - - @Test - public void doChannelCloseEventTest() { - Assertions.assertThatCode(() -> filterServerManager.doChannelCloseEvent(FILTER_SERVER_ADDR, channel)).doesNotThrowAnyException(); - } - - @Test - public void buildNewFilterServerListTest() { - final List<String> filterServerList = filterServerManager.buildNewFilterServerList(); - assert !filterServerList.isEmpty(); - } -} diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 192111415..2c7a988ee 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -193,7 +193,6 @@ import org.apache.rocketmq.remoting.protocol.header.UpdateGlobalWhiteAddrsConfig import org.apache.rocketmq.remoting.protocol.header.UpdateGroupForbiddenRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ViewBrokerStatsDataRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ViewMessageRequestHeader; -import org.apache.rocketmq.remoting.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader; import org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerRequestHeader; import org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader; import org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteKVConfigRequestHeader; @@ -2258,34 +2257,6 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { throw new MQClientException(response.getCode(), response.getRemark()); } - public void registerMessageFilterClass(final String addr, - final String consumerGroup, - final String topic, - final String className, - final int classCRC, - final byte[] classBody, - final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, - InterruptedException, MQBrokerException { - RegisterMessageFilterClassRequestHeader requestHeader = new RegisterMessageFilterClassRequestHeader(); - requestHeader.setConsumerGroup(consumerGroup); - requestHeader.setClassName(className); - requestHeader.setTopic(topic); - requestHeader.setClassCRC(classCRC); - - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_MESSAGE_FILTER_CLASS, requestHeader); - request.setBody(classBody); - RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - return; - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark(), addr); - } - public TopicList getSystemTopicList( final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS, null); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index dedfa09ce..703bec4a2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -57,7 +57,6 @@ import org.apache.rocketmq.client.stat.ConsumerStatsManager; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceState; -import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.message.MessageExt; @@ -71,7 +70,6 @@ import org.apache.rocketmq.remoting.protocol.NamespaceUtil; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; -import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData; import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; @@ -462,7 +460,6 @@ public class MQClientInstance { if (this.lockHeartbeat.tryLock()) { try { this.sendHeartbeatToAllBroker(); - this.uploadFilterClassSource(); } catch (final Exception e) { log.error("sendHeartbeatToAllBroker exception", e); } finally { @@ -566,29 +563,6 @@ public class MQClientInstance { } } - private void uploadFilterClassSource() { - for (Entry<String, MQConsumerInner> next : this.consumerTable.entrySet()) { - MQConsumerInner consumer = next.getValue(); - if (ConsumeType.CONSUME_PASSIVELY != consumer.consumeType()) { - continue; - } - Set<SubscriptionData> subscriptions = consumer.subscriptions(); - for (SubscriptionData sub : subscriptions) { - if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) { - final String consumerGroup = consumer.groupName(); - final String className = sub.getSubString(); - final String topic = sub.getTopic(); - final String filterClassSource = sub.getFilterClassSource(); - try { - this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource); - } catch (Exception e) { - log.error("uploadFilterClassToAllFilterServer Exception", e); - } - } - } - } - } - public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { try { @@ -729,49 +703,6 @@ public class MQClientInstance { return false; } - /** - * This method will be removed in the version 5.0.0,because filterServer was removed,and method - * <code>subscribe(final String topic, final MessageSelector messageSelector)</code> is recommended. - */ - @Deprecated - private void uploadFilterClassToAllFilterServer(final String consumerGroup, final String fullClassName, - final String topic, - final String filterClassSource) { - byte[] classBody = null; - int classCRC = 0; - try { - classBody = filterClassSource.getBytes(MixAll.DEFAULT_CHARSET); - classCRC = UtilAll.crc32(classBody); - } catch (Exception e1) { - log.warn("uploadFilterClassToAllFilterServer Exception, ClassName: {} {}", - fullClassName, - UtilAll.exceptionSimpleDesc(e1)); - } - - TopicRouteData topicRouteData = this.topicRouteTable.get(topic); - if (topicRouteData != null - && topicRouteData.getFilterServerTable() != null && !topicRouteData.getFilterServerTable().isEmpty()) { - for (Entry<String, List<String>> next : topicRouteData.getFilterServerTable().entrySet()) { - List<String> value = next.getValue(); - for (final String fsAddr : value) { - try { - this.mQClientAPIImpl.registerMessageFilterClass(fsAddr, consumerGroup, topic, fullClassName, classCRC, classBody, - 5000); - - log.info("register message class filter to {} OK, ConsumerGroup: {} Topic: {} ClassName: {}", fsAddr, consumerGroup, - topic, fullClassName); - - } catch (Exception e) { - log.error("uploadFilterClassToAllFilterServer Exception", e); - } - } - } - } else { - log.warn("register message class filter failed, because no filter server, ConsumerGroup: {} Topic: {} ClassName: {}", - consumerGroup, topic, fullClassName); - } - } - private boolean isNeedUpdateTopicRouteInfo(final String topic) { boolean result = false; Iterator<Entry<String, MQProducerInner>> producerIterator = this.producerTable.entrySet().iterator(); diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 08fbcb521..07640232f 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -112,8 +112,6 @@ public class BrokerConfig extends BrokerIdentity { private int adminBrokerThreadPoolQueueCapacity = 10000; private int loadBalanceThreadPoolQueueCapacity = 100000; - private int filterServerNums = 0; - private boolean longPollingEnable = true; private long shortPollingTimeMills = 1000; @@ -925,14 +923,6 @@ public class BrokerConfig extends BrokerIdentity { this.brokerTopicEnable = brokerTopicEnable; } - public int getFilterServerNums() { - return filterServerNums; - } - - public void setFilterServerNums(int filterServerNums) { - this.filterServerNums = filterServerNums; - } - public boolean isLongPollingEnable() { return longPollingEnable; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/filtersrv/RegisterFilterServerRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/filtersrv/RegisterFilterServerRequestHeader.java deleted file mode 100644 index 14dacf6ec..000000000 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/filtersrv/RegisterFilterServerRequestHeader.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.protocol.header.filtersrv; - -import org.apache.rocketmq.remoting.CommandCustomHeader; -import org.apache.rocketmq.remoting.annotation.CFNotNull; -import org.apache.rocketmq.remoting.exception.RemotingCommandException; - -public class RegisterFilterServerRequestHeader implements CommandCustomHeader { - @CFNotNull - private String filterServerAddr; - - @Override - public void checkFields() throws RemotingCommandException { - } - - public String getFilterServerAddr() { - return filterServerAddr; - } - - public void setFilterServerAddr(String filterServerAddr) { - this.filterServerAddr = filterServerAddr; - } -} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/filtersrv/RegisterFilterServerResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/filtersrv/RegisterFilterServerResponseHeader.java deleted file mode 100644 index a618a4f30..000000000 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/filtersrv/RegisterFilterServerResponseHeader.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.protocol.header.filtersrv; - -import org.apache.rocketmq.remoting.CommandCustomHeader; -import org.apache.rocketmq.remoting.annotation.CFNotNull; -import org.apache.rocketmq.remoting.exception.RemotingCommandException; - -public class RegisterFilterServerResponseHeader implements CommandCustomHeader { - @CFNotNull - private String brokerName; - @CFNotNull - private long brokerId; - - @Override - public void checkFields() throws RemotingCommandException { - } - - public long getBrokerId() { - return brokerId; - } - - public void setBrokerId(long brokerId) { - this.brokerId = brokerId; - } - - public String getBrokerName() { - return brokerName; - } - - public void setBrokerName(String brokerName) { - this.brokerName = brokerName; - } -} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/filtersrv/RegisterMessageFilterClassRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/filtersrv/RegisterMessageFilterClassRequestHeader.java deleted file mode 100644 index b214ee5cd..000000000 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/filtersrv/RegisterMessageFilterClassRequestHeader.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.protocol.header.filtersrv; - -import org.apache.rocketmq.remoting.CommandCustomHeader; -import org.apache.rocketmq.remoting.annotation.CFNotNull; -import org.apache.rocketmq.remoting.exception.RemotingCommandException; - -public class RegisterMessageFilterClassRequestHeader implements CommandCustomHeader { - @CFNotNull - private String consumerGroup; - @CFNotNull - private String topic; - @CFNotNull - private String className; - @CFNotNull - private Integer classCRC; - - @Override - public void checkFields() throws RemotingCommandException { - } - - public String getConsumerGroup() { - return consumerGroup; - } - - public void setConsumerGroup(String consumerGroup) { - this.consumerGroup = consumerGroup; - } - - public String getTopic() { - return topic; - } - - public void setTopic(String topic) { - this.topic = topic; - } - - public String getClassName() { - return className; - } - - public void setClassName(String className) { - this.className = className; - } - - public Integer getClassCRC() { - return classCRC; - } - - public void setClassCRC(Integer classCRC) { - this.classCRC = classCRC; - } -}