This is an automated email from the ASF dual-hosted git repository.

wuweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git


The following commit(s) were added to refs/heads/master by this push:
     new 64267beb0 Each job has its own listener event notify thread instead of 
the only one curator thread (#2092)
64267beb0 is described below

commit 64267beb079446ec4de101d111997cb85681bd4d
Author: HungChu <[email protected]>
AuthorDate: Mon Aug 1 11:31:03 2022 +0800

    Each job has its own listener event notify thread instead of the only one 
curator thread (#2092)
    
    * Each job has its own listener notify thread instead of the only one 
Curator thread
    
    * Each job has its own listener notify thread instead of the only one 
Curator thread
    (#2038)
    
    * Use lamda expression instead of anonymous class for CuratorCacheListener 
in watch method.
    
    * Add ASF license
    
    * Add unit test case
---
 .../reg/base/CoordinatorRegistryCenter.java        |  4 +-
 .../reg/zookeeper/ZookeeperRegistryCenter.java     | 12 +++-
 .../ZookeeperRegistryCenterWatchTest.java          | 30 +++++++-
 .../lite/api/registry/JobInstanceRegistry.java     |  9 ++-
 .../lite/internal/listener/ListenerManager.java    |  1 +
 .../internal/listener/ListenerNotifierManager.java | 79 ++++++++++++++++++++++
 .../lite/internal/storage/JobNodeStorage.java      |  5 +-
 .../listener/ListenerNotifierManagerTest.java      | 38 +++++++++++
 .../lite/internal/storage/JobNodeStorageTest.java  |  7 +-
 9 files changed, 176 insertions(+), 9 deletions(-)

diff --git 
a/elasticjob-infra/elasticjob-registry-center/elasticjob-registry-center-api/src/main/java/org/apache/shardingsphere/elasticjob/reg/base/CoordinatorRegistryCenter.java
 
b/elasticjob-infra/elasticjob-registry-center/elasticjob-registry-center-api/src/main/java/org/apache/shardingsphere/elasticjob/reg/base/CoordinatorRegistryCenter.java
index 20aeb281b..eefd43b49 100644
--- 
a/elasticjob-infra/elasticjob-registry-center/elasticjob-registry-center-api/src/main/java/org/apache/shardingsphere/elasticjob/reg/base/CoordinatorRegistryCenter.java
+++ 
b/elasticjob-infra/elasticjob-registry-center/elasticjob-registry-center-api/src/main/java/org/apache/shardingsphere/elasticjob/reg/base/CoordinatorRegistryCenter.java
@@ -22,6 +22,7 @@ import 
org.apache.shardingsphere.elasticjob.reg.listener.ConnectionStateChangedE
 import 
org.apache.shardingsphere.elasticjob.reg.listener.DataChangedEventListener;
 
 import java.util.List;
+import java.util.concurrent.Executor;
 
 /**
  * Coordinator registry center.
@@ -111,8 +112,9 @@ public interface CoordinatorRegistryCenter extends 
RegistryCenter {
      *
      * @param key key to be watched
      * @param listener data listener
+     * @param executor event notify executor
      */
-    void watch(String key, DataChangedEventListener listener);
+    void watch(String key, DataChangedEventListener listener, Executor 
executor);
     
     /**
      * Add connection state changed event listener to registry center.
diff --git 
a/elasticjob-infra/elasticjob-registry-center/elasticjob-regitry-center-provider/elasticjob-registry-center-zookeeper-curator/src/main/java/org/apache/shardingsphere/elasticjob/reg/zookeeper/ZookeeperRegistryCenter.java
 
b/elasticjob-infra/elasticjob-registry-center/elasticjob-regitry-center-provider/elasticjob-registry-center-zookeeper-curator/src/main/java/org/apache/shardingsphere/elasticjob/reg/zookeeper/ZookeeperRegistryCenter.java
index a414dabea..aa6c03d34 100644
--- 
a/elasticjob-infra/elasticjob-registry-center/elasticjob-regitry-center-provider/elasticjob-registry-center-zookeeper-curator/src/main/java/org/apache/shardingsphere/elasticjob/reg/zookeeper/ZookeeperRegistryCenter.java
+++ 
b/elasticjob-infra/elasticjob-registry-center/elasticjob-regitry-center-provider/elasticjob-registry-center-zookeeper-curator/src/main/java/org/apache/shardingsphere/elasticjob/reg/zookeeper/ZookeeperRegistryCenter.java
@@ -58,6 +58,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -408,9 +409,9 @@ public final class ZookeeperRegistryCenter implements 
CoordinatorRegistryCenter
     }
     
     @Override
-    public void watch(final String key, final DataChangedEventListener 
listener) {
+    public void watch(final String key, final DataChangedEventListener 
listener, final Executor executor) {
         CuratorCache cache = caches.get(key + "/");
-        cache.listenable().addListener((curatorType, oldData, newData) -> {
+        CuratorCacheListener cacheListener = (curatorType, oldData, newData) 
-> {
             if (null == newData && null == oldData) {
                 return;
             }
@@ -421,7 +422,12 @@ public final class ZookeeperRegistryCenter implements 
CoordinatorRegistryCenter
             }
             byte[] data = Type.DELETED == type ? oldData.getData() : 
newData.getData();
             listener.onChange(new DataChangedEvent(type, path, null == data ? 
"" : new String(data, StandardCharsets.UTF_8)));
-        });
+        };
+        if (executor != null) {
+            cache.listenable().addListener(cacheListener, executor);
+        } else {
+            cache.listenable().addListener(cacheListener);
+        }
     }
     
     private Type getTypeFromCuratorType(final CuratorCacheListener.Type 
curatorType) {
diff --git 
a/elasticjob-infra/elasticjob-registry-center/elasticjob-regitry-center-provider/elasticjob-registry-center-zookeeper-curator/src/test/java/org/apache/shardingsphere/elasticjob/reg/zookeeper/ZookeeperRegistryCenterWatchTest.java
 
b/elasticjob-infra/elasticjob-registry-center/elasticjob-regitry-center-provider/elasticjob-registry-center-zookeeper-curator/src/test/java/org/apache/shardingsphere/elasticjob/reg/zookeeper/ZookeeperRegistryCenterWatchTest.java
index 28e28287b..9f1f3429d 100644
--- 
a/elasticjob-infra/elasticjob-registry-center/elasticjob-regitry-center-provider/elasticjob-registry-center-zookeeper-curator/src/test/java/org/apache/shardingsphere/elasticjob/reg/zookeeper/ZookeeperRegistryCenterWatchTest.java
+++ 
b/elasticjob-infra/elasticjob-registry-center/elasticjob-regitry-center-provider/elasticjob-registry-center-zookeeper-curator/src/test/java/org/apache/shardingsphere/elasticjob/reg/zookeeper/ZookeeperRegistryCenterWatchTest.java
@@ -17,14 +17,20 @@
 
 package org.apache.shardingsphere.elasticjob.reg.zookeeper;
 
+import org.apache.curator.utils.ThreadUtils;
 import org.apache.shardingsphere.elasticjob.reg.listener.DataChangedEvent;
 import 
org.apache.shardingsphere.elasticjob.reg.zookeeper.fixture.EmbedTestingServer;
 import 
org.apache.shardingsphere.elasticjob.reg.zookeeper.util.ZookeeperRegistryCenterTestUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import static org.junit.Assert.assertThat;
+import static org.hamcrest.CoreMatchers.startsWith;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 
 public final class ZookeeperRegistryCenterWatchTest {
     
@@ -45,18 +51,38 @@ public final class ZookeeperRegistryCenterWatchTest {
     public static void tearDown() {
         zkRegCenter.close();
     }
+
+    @Test(timeout = 10000L)
+    public void assertWatchWithoutExecutor() throws InterruptedException {
+        CountDownLatch waitingForCountDownValue = new CountDownLatch(1);
+        zkRegCenter.addCacheData("/test");
+        CountDownLatch waitingForWatchReady = new CountDownLatch(1);
+        zkRegCenter.watch("/test", event -> {
+            waitingForWatchReady.countDown();
+            if (DataChangedEvent.Type.UPDATED == event.getType() && 
"countDown".equals(event.getValue())) {
+                waitingForCountDownValue.countDown();
+            }
+        }, null);
+        waitingForWatchReady.await();
+        zkRegCenter.update("/test", "countDown");
+        waitingForCountDownValue.await();
+    }
     
     @Test(timeout = 30000L)
-    public void assertWatch() throws InterruptedException {
+    public void assertWatchWithExecutor() throws InterruptedException {
         CountDownLatch waitingForCountDownValue = new CountDownLatch(1);
         zkRegCenter.addCacheData("/test");
         CountDownLatch waitingForWatchReady = new CountDownLatch(1);
+        String threadNamePreffix = "ListenerNotify";
+        ThreadFactory threadFactory = 
ThreadUtils.newGenericThreadFactory(threadNamePreffix);
+        Executor executor = Executors.newSingleThreadExecutor(threadFactory);
         zkRegCenter.watch("/test", event -> {
+            assertThat(Thread.currentThread().getName(), 
startsWith(threadNamePreffix));
             waitingForWatchReady.countDown();
             if (DataChangedEvent.Type.UPDATED == event.getType() && 
"countDown".equals(event.getValue())) {
                 waitingForCountDownValue.countDown();
             }
-        });
+        }, executor);
         waitingForWatchReady.await();
         zkRegCenter.update("/test", "countDown");
         waitingForCountDownValue.await();
diff --git 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/registry/JobInstanceRegistry.java
 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/registry/JobInstanceRegistry.java
index 9f7623678..1af396349 100644
--- 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/registry/JobInstanceRegistry.java
+++ 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/registry/JobInstanceRegistry.java
@@ -18,6 +18,8 @@
 package org.apache.shardingsphere.elasticjob.lite.api.registry;
 
 import lombok.RequiredArgsConstructor;
+
+import org.apache.curator.utils.ThreadUtils;
 import org.apache.shardingsphere.elasticjob.api.ElasticJob;
 import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
 import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
@@ -31,6 +33,9 @@ import 
org.apache.shardingsphere.elasticjob.reg.listener.DataChangedEvent;
 import 
org.apache.shardingsphere.elasticjob.reg.listener.DataChangedEventListener;
 
 import java.util.Arrays;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -51,7 +56,9 @@ public final class JobInstanceRegistry {
      * Register.
      */
     public void register() {
-        regCenter.watch("/", new JobInstanceRegistryListener());
+        ThreadFactory threadFactory = 
ThreadUtils.newGenericThreadFactory("ListenerNotify-instanceRegistry");
+        Executor executor = Executors.newSingleThreadExecutor(threadFactory);
+        regCenter.watch("/", new JobInstanceRegistryListener(), executor);
     }
     
     public class JobInstanceRegistryListener implements 
DataChangedEventListener {
diff --git 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerManager.java
 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerManager.java
index 361b6fd00..987bd8ed8 100644
--- 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerManager.java
+++ 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerManager.java
@@ -58,6 +58,7 @@ public final class ListenerManager {
     
     public ListenerManager(final CoordinatorRegistryCenter regCenter, final 
String jobName, final Collection<ElasticJobListener> elasticJobListeners) {
         jobNodeStorage = new JobNodeStorage(regCenter, jobName);
+        
ListenerNotifierManager.getInstance().registerJobNotifyExecutor(jobName);
         electionListenerManager = new ElectionListenerManager(regCenter, 
jobName);
         shardingListenerManager = new ShardingListenerManager(regCenter, 
jobName);
         failoverListenerManager = new FailoverListenerManager(regCenter, 
jobName);
diff --git 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManager.java
 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManager.java
new file mode 100644
index 000000000..b6dae8654
--- /dev/null
+++ 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManager.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.internal.listener;
+
+import org.apache.curator.utils.ThreadUtils;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Manage listener's notify executor,
+ * each job has its own listener notify executor.
+ */
+public final class ListenerNotifierManager {
+
+    private static volatile ListenerNotifierManager instance;
+
+    private final Map<String, Executor> listenerNotifyExecutors = new 
ConcurrentHashMap<>();
+
+    private ListenerNotifierManager() { }
+
+    /**
+     * Get singleton instance of ListenerNotifierManager.
+     * @return singleton instance of ListenerNotifierManager.
+     */
+    public static ListenerNotifierManager getInstance() {
+        if (null == instance) {
+            synchronized (ListenerNotifierManager.class) {
+                if (null == instance) {
+                    instance = new ListenerNotifierManager();
+                }
+            }
+        }
+        return instance;
+    }
+
+    /**
+     * Register a listener notify executor for the job specified.
+     * @param jobName The job's name.
+     */
+    public void registerJobNotifyExecutor(final String jobName) {
+        if (!listenerNotifyExecutors.containsKey(jobName)) {
+            synchronized (this) {
+                if (!listenerNotifyExecutors.containsKey(jobName)) {
+                    ThreadFactory threadFactory = 
ThreadUtils.newGenericThreadFactory("ListenerNotify-" + jobName);
+                    Executor notifyExecutor = 
Executors.newSingleThreadExecutor(threadFactory);
+                    listenerNotifyExecutors.put(jobName, notifyExecutor);
+                }
+            }
+        }
+    }
+
+    /**
+     * Get the listener notify executor for the specified job.
+     * @param jobName The job's name.
+     * @return The job listener's notify executor.
+     */
+    public Executor getJobNotifyExecutor(final String jobName) {
+        return listenerNotifyExecutors.get(jobName);
+    }
+}
diff --git 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorage.java
 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorage.java
index ffb30a0e2..1df239043 100644
--- 
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorage.java
+++ 
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorage.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.elasticjob.lite.internal.storage;
 
+import 
org.apache.shardingsphere.elasticjob.lite.internal.listener.ListenerNotifierManager;
 import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
 import org.apache.shardingsphere.elasticjob.reg.base.LeaderExecutionCallback;
 import 
org.apache.shardingsphere.elasticjob.reg.base.transaction.TransactionOperation;
@@ -26,6 +27,7 @@ import 
org.apache.shardingsphere.elasticjob.reg.listener.DataChangedEventListene
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Executor;
 
 /**
  * Job node storage.
@@ -218,7 +220,8 @@ public final class JobNodeStorage {
      * @param listener data listener
      */
     public void addDataListener(final DataChangedEventListener listener) {
-        regCenter.watch("/" + jobName, listener);
+        Executor executor = 
ListenerNotifierManager.getInstance().getJobNotifyExecutor(jobName);
+        regCenter.watch("/" + jobName, listener, executor);
     }
     
     /**
diff --git 
a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManagerTest.java
 
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManagerTest.java
new file mode 100644
index 000000000..8e182d8dd
--- /dev/null
+++ 
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/ListenerNotifierManagerTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.lite.internal.listener;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.concurrent.Executor;
+
+import static org.junit.Assert.assertThat;
+import static org.hamcrest.CoreMatchers.notNullValue;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ListenerNotifierManagerTest {
+
+    @Test
+    public void assertRegisterAndGetJobNotifyExecutor() {
+        String jobName = "test_job";
+        
ListenerNotifierManager.getInstance().registerJobNotifyExecutor(jobName);
+        
assertThat(ListenerNotifierManager.getInstance().getJobNotifyExecutor(jobName), 
notNullValue(Executor.class));
+    }
+}
diff --git 
a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorageTest.java
 
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorageTest.java
index 5b6569e66..cb84a7014 100644
--- 
a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorageTest.java
+++ 
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/storage/JobNodeStorageTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.elasticjob.lite.internal.storage;
 
+import 
org.apache.shardingsphere.elasticjob.lite.internal.listener.ListenerNotifierManager;
 import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils;
 import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
 import 
org.apache.shardingsphere.elasticjob.reg.base.transaction.TransactionOperation;
@@ -32,6 +33,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Executor;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
@@ -176,8 +178,11 @@ public final class JobNodeStorageTest {
     @Test
     public void assertAddDataListener() {
         DataChangedEventListener listener = 
mock(DataChangedEventListener.class);
+        String jobName = "test_job";
+        
ListenerNotifierManager.getInstance().registerJobNotifyExecutor(jobName);
+        Executor executor = 
ListenerNotifierManager.getInstance().getJobNotifyExecutor(jobName);
         jobNodeStorage.addDataListener(listener);
-        verify(regCenter).watch("/test_job", listener);
+        verify(regCenter).watch("/test_job", listener, executor);
     }
     
     @Test

Reply via email to