你好!首先感谢回复。嗯,之后我也思考了这个问题,也感觉阿里的开发人员是为了这方面做考虑的。但是个人仍然认为,这部分代码是不安全的。可能会导致新注册上来的broker无法第一时间被scanNotActiveBroker感知到,因为执行scanNotActiveBroker任务的线程不会实时去更新工作线程中的数据,有一个内存可见性的问题。当然和你说的相同,这里的并发是很低的,且有可能在一个集群搭建起来之后后续都不会有新的broker进来,所以这种可见性问题基本不会出现。 在2018-03-15,"老胡" <[email protected]> 写道:-----原始邮件----- 发件人: "老胡" <[email protected]> 发送时间: 2018年3月15日 星期四 收件人: users <[email protected]>, users <[email protected]> 主题: 回复:rocketmq线程安全问题
你好! 这里是存在线程安全的问题。 scanNotActiveBroker 只与 unregisterBroker和 registerBroker的之间是线程不安全的。 scanNotActiveBroker 每10秒执行一次,而unregisterBroker 与 registerBroker 可能很久才会触发。甚至不会触发。 出现线程安全的几率很低, scanNotActiveBroker 锁持有时间很长,频率高 scanNotActiveBroker 报错,可以等下下次执行 每10秒执行一次,那么会哟加锁,解锁的操作,比较耗时,在上面的原有下,不加锁是一种好的方式。 仅仅是个人理解。 ------------------ 原始邮件 ------------------ 发件人: "鲁仕林"<[email protected]>; 发送时间: 2018年3月14日(星期三) 下午3:23 收件人: "users"<[email protected]>; 主题: rocketmq线程安全问题 this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256); */ public void scanNotActiveBroker() { Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, BrokerLiveInfo> next = it.next(); long last = next.getValue().getLastUpdateTimestamp(); if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { RemotingUtil.closeChannel(next.getValue().getChannel()); it.remove(); log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME); this.onChannelDestroy(/**brokerAddress*/next.getKey(), /**长连接channel*/next.getValue().getChannel()); } } }首先这是rocketmq中扫描存活broker的一段代码,我认为这段代码是存在线程安全性问题。在一些情况下,如在使用Iterator便利这个map时,有新的broker注册进来,会抛出java.util.ConcurrentModificationException。
