This is an automated email from the ASF dual-hosted git repository. huangli pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 64ad790 [ISSUE #3624]Fix producer/consumer re-start may fail introduced in #3454 (#3639) 64ad790 is described below commit 64ad7908b89bec5190e14c80f044b7060ac2774d Author: panzhi <panzh...@qq.com> AuthorDate: Wed Dec 15 20:00:21 2021 +0800 [ISSUE #3624]Fix producer/consumer re-start may fail introduced in #3454 (#3639) Fix producer/consumer re-start may fail introduced in #3454 1. start a producer 2. shutdown producer 3. start the second producer, exception throws --- .../impl/producer/DefaultMQProducerImpl.java | 21 +-------- .../client/producer/RequestFutureHolder.java | 50 ++++++++++++++-------- 2 files changed, 35 insertions(+), 36 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index b27117f..cedbbdb 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -223,25 +223,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); - this.startScheduledTask(); + RequestFutureHolder.getInstance().startScheduledTask(this); } - private void startScheduledTask() { - if (RequestFutureHolder.getInstance().getProducerNum().incrementAndGet() == 1) { - RequestFutureHolder.getInstance().getScheduledExecutorService().scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - RequestFutureHolder.getInstance().scanExpiredRequest(); - } catch (Throwable e) { - log.error("scan RequestFutureTable exception", e); - } - } - }, 1000 * 3, 1000, TimeUnit.MILLISECONDS); - } - } - private void checkConfig() throws MQClientException { Validators.checkGroup(this.defaultMQProducer.getProducerGroup()); @@ -269,9 +254,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { if (shutdownFactory) { this.mQClientFactory.shutdown(); } - if (RequestFutureHolder.getInstance().getProducerNum().decrementAndGet() == 0) { - RequestFutureHolder.getInstance().getScheduledExecutorService().shutdown(); - } + RequestFutureHolder.getInstance().shutdown(this); log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup()); this.serviceState = ServiceState.SHUTDOWN_ALREADY; break; diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java index 24b3a90..8fe9abc 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureHolder.java @@ -17,39 +17,36 @@ package org.apache.rocketmq.client.producer; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.common.ClientErrorCode; import org.apache.rocketmq.client.exception.RequestTimeoutException; +import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.logging.InternalLogger; public class RequestFutureHolder { private static InternalLogger log = ClientLogger.getLog(); private static final RequestFutureHolder INSTANCE = new RequestFutureHolder(); private ConcurrentHashMap<String, RequestResponseFuture> requestFutureTable = new ConcurrentHashMap<String, RequestResponseFuture>(); - private final AtomicInteger producerNum = new AtomicInteger(0); - private final ScheduledExecutorService scheduledExecutorService = Executors - .newSingleThreadScheduledExecutor(new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "RequestHouseKeepingService"); - } - }); + private final Set<DefaultMQProducerImpl> producerSet = new HashSet<>(); + private ScheduledExecutorService scheduledExecutorService = null; public ConcurrentHashMap<String, RequestResponseFuture> getRequestFutureTable() { return requestFutureTable; } - public void scanExpiredRequest() { + private void scanExpiredRequest() { final List<RequestResponseFuture> rfList = new LinkedList<RequestResponseFuture>(); Iterator<Map.Entry<String, RequestResponseFuture>> it = requestFutureTable.entrySet().iterator(); while (it.hasNext()) { @@ -74,17 +71,36 @@ public class RequestFutureHolder { } } - private RequestFutureHolder() { - } + public synchronized void startScheduledTask(DefaultMQProducerImpl producer) { + this.producerSet.add(producer); + if (null == scheduledExecutorService) { + this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RequestHouseKeepingService")); + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + RequestFutureHolder.getInstance().scanExpiredRequest(); + } catch (Throwable e) { + log.error("scan RequestFutureTable exception", e); + } + } + }, 1000 * 3, 1000, TimeUnit.MILLISECONDS); - public AtomicInteger getProducerNum() { - return producerNum; + } } - public ScheduledExecutorService getScheduledExecutorService() { - return scheduledExecutorService; + public synchronized void shutdown(DefaultMQProducerImpl producer) { + this.producerSet.remove(producer); + if (this.producerSet.size() <= 0 && null != this.scheduledExecutorService) { + ScheduledExecutorService executorService = this.scheduledExecutorService; + this.scheduledExecutorService = null; + executorService.shutdown(); + } } + private RequestFutureHolder() {} + public static RequestFutureHolder getInstance() { return INSTANCE; }