This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 515c5023abd Add more test cases for ProcessListChangedSubscriber 
(#32885)
515c5023abd is described below

commit 515c5023abd18a3caa01345363dbe43772a099a6
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Sep 15 20:23:23 2024 +0800

    Add more test cases for ProcessListChangedSubscriber (#32885)
    
    * Add more test cases for ProcessListChangedSubscriber
    
    * Add more test cases for ProcessListChangedSubscriber
---
 .../dispatch/ProcessListChangedSubscriberTest.java | 169 +++++++--------------
 1 file changed, 57 insertions(+), 112 deletions(-)

diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ProcessListChangedSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ProcessListChangedSubscriberTest.java
index 64a93bc3abf..a1ce52b4977 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ProcessListChangedSubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ProcessListChangedSubscriberTest.java
@@ -17,165 +17,110 @@
 
 package 
org.apache.shardingsphere.mode.manager.cluster.event.subscriber.dispatch;
 
-import lombok.SneakyThrows;
-import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
-import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
-import org.apache.shardingsphere.infra.database.core.DefaultDatabase;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.executor.sql.process.Process;
 import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
 import 
org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
-import 
org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockReleaseStrategy;
-import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
-import 
org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import 
org.apache.shardingsphere.mode.event.dispatch.state.compute.KillLocalProcessCompletedEvent;
 import 
org.apache.shardingsphere.mode.event.dispatch.state.compute.KillLocalProcessEvent;
 import 
org.apache.shardingsphere.mode.event.dispatch.state.compute.ReportLocalProcessesCompletedEvent;
 import 
org.apache.shardingsphere.mode.event.dispatch.state.compute.ReportLocalProcessesEvent;
 import org.apache.shardingsphere.mode.manager.ContextManager;
-import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
-import 
org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
-import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
-import org.apache.shardingsphere.mode.persist.PersistServiceFacade;
-import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
-import org.apache.shardingsphere.mode.spi.PersistRepository;
-import org.awaitility.Awaitility;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import org.apache.shardingsphere.test.mock.AutoMockExtension;
+import org.apache.shardingsphere.test.mock.StaticMockSettings;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Answers;
 import org.mockito.Mock;
-import org.mockito.internal.configuration.plugins.Plugins;
-import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
 
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.Collections;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-@ExtendWith(MockitoExtension.class)
+@ExtendWith(AutoMockExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+@StaticMockSettings({ProcessRegistry.class, 
ProcessOperationLockRegistry.class})
 class ProcessListChangedSubscriberTest {
     
     private ProcessListChangedSubscriber subscriber;
     
-    private ContextManager contextManager;
-    
-    @Mock
-    private PersistRepository repository;
-    
     @Mock(answer = Answers.RETURNS_DEEP_STUBS)
-    private ShardingSphereDatabase database;
+    private ContextManager contextManager;
     
-    @SneakyThrows(ReflectiveOperationException.class)
     @BeforeEach
-    void setUp() throws SQLException {
-        EventBusContext eventBusContext = new EventBusContext();
-        contextManager = new 
ClusterContextManagerBuilder().build(createContextManagerBuilderParameter(), 
eventBusContext);
-        
contextManager.renewMetaDataContexts(MetaDataContextsFactory.create(contextManager.getPersistServiceFacade().getMetaDataPersistService(),
 new ShardingSphereMetaData(createDatabases(),
-                
contextManager.getMetaDataContexts().getMetaData().getGlobalResourceMetaData(), 
contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(),
-                new ConfigurationProperties(new Properties()))));
-        
Plugins.getMemberAccessor().set(PersistServiceFacade.class.getDeclaredField("repository"),
 contextManager.getPersistServiceFacade(), repository);
+    void setUp() {
+        
when(contextManager.getPersistServiceFacade().getRepository()).thenReturn(mock(ClusterPersistRepository.class));
+        
when(contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId()).thenReturn("foo_instance_id");
         subscriber = new ProcessListChangedSubscriber(contextManager);
     }
     
-    private ContextManagerBuilderParameter 
createContextManagerBuilderParameter() {
-        ModeConfiguration modeConfig = new ModeConfiguration("Cluster", new 
ClusterPersistRepositoryConfiguration("FIXTURE", "", "", new Properties()));
-        InstanceMetaData instanceMetaData = new 
ProxyInstanceMetaData("foo_instance_id", 3307);
-        return new ContextManagerBuilderParameter(modeConfig, 
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyList(),
-                new Properties(), Collections.emptyList(), instanceMetaData, 
false);
+    @Test
+    void assertReportLocalProcessesWithNotCurrentInstance() {
+        subscriber.reportLocalProcesses(new 
ReportLocalProcessesEvent("bar_instance_id", "foo_task_id"));
+        verify(contextManager.getPersistServiceFacade().getRepository(), 
times(0)).delete(any());
     }
     
-    private Map<String, ShardingSphereDatabase> createDatabases() {
-        
when(database.getSchemas()).thenReturn(Collections.singletonMap("foo_schema", 
new ShardingSphereSchema(DefaultDatabase.LOGIC_NAME)));
-        
when(database.getProtocolType()).thenReturn(TypedSPILoader.getService(DatabaseType.class,
 "MySQL"));
-        
when(database.getSchema("foo_schema")).thenReturn(mock(ShardingSphereSchema.class));
-        
when(database.getRuleMetaData().getConfigurations()).thenReturn(Collections.emptyList());
-        return Collections.singletonMap("db", database);
+    @Test
+    void assertReportEmptyLocalProcesses() {
+        
when(ProcessRegistry.getInstance().listAll()).thenReturn(Collections.emptyList());
+        subscriber.reportLocalProcesses(new 
ReportLocalProcessesEvent("foo_instance_id", "foo_task_id"));
+        verify(contextManager.getPersistServiceFacade().getRepository(), 
times(0)).persist(any(), any());
+        
verify(contextManager.getPersistServiceFacade().getRepository()).delete("/nodes/compute_nodes/show_process_list_trigger/foo_instance_id:foo_task_id");
     }
     
     @Test
-    void assertReportLocalProcesses() {
-        Process process = mock(Process.class);
-        String processId = "foo_id";
-        when(process.getId()).thenReturn(processId);
-        when(process.isInterrupted()).thenReturn(false);
-        when(process.isIdle()).thenReturn(false);
-        when(process.getCompletedUnitCount()).thenReturn(new AtomicInteger(0));
-        when(process.getTotalUnitCount()).thenReturn(new AtomicInteger(0));
-        ProcessRegistry.getInstance().add(process);
-        String instanceId = 
contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId();
-        subscriber.reportLocalProcesses(new 
ReportLocalProcessesEvent(instanceId, processId));
-        verify(repository).persist("/execution_nodes/foo_id/" + instanceId,
-                "processes:" + System.lineSeparator() + "- completedUnitCount: 
0" + System.lineSeparator()
-                        + "  id: foo_id" + System.lineSeparator()
-                        + "  idle: false" + System.lineSeparator()
-                        + "  interrupted: false" + System.lineSeparator()
-                        + "  startMillis: 0" + System.lineSeparator()
-                        + "  totalUnitCount: 0" + System.lineSeparator());
-        
verify(repository).delete("/nodes/compute_nodes/show_process_list_trigger/" + 
instanceId + ":foo_id");
+    void assertReportNotEmptyLocalProcesses() {
+        
when(ProcessRegistry.getInstance().listAll()).thenReturn(Collections.singleton(mock(Process.class,
 RETURNS_DEEP_STUBS)));
+        subscriber.reportLocalProcesses(new 
ReportLocalProcessesEvent("foo_instance_id", "foo_task_id"));
+        
verify(contextManager.getPersistServiceFacade().getRepository()).persist(eq("/execution_nodes/foo_task_id/foo_instance_id"),
 any());
+        
verify(contextManager.getPersistServiceFacade().getRepository()).delete("/nodes/compute_nodes/show_process_list_trigger/foo_instance_id:foo_task_id");
     }
     
     @Test
     void assertCompleteToReportLocalProcesses() {
-        String taskId = "foo_id";
-        long startMillis = System.currentTimeMillis();
-        Executors.newFixedThreadPool(1).submit(() -> {
-            Awaitility.await().pollDelay(50L, TimeUnit.MILLISECONDS).until(() 
-> true);
-            subscriber.completeToReportLocalProcesses(new 
ReportLocalProcessesCompletedEvent(taskId));
-        });
-        waitUntilReleaseReady(taskId);
-        long currentMillis = System.currentTimeMillis();
-        assertThat(currentMillis, greaterThanOrEqualTo(startMillis + 50L));
-        assertThat(currentMillis, lessThanOrEqualTo(startMillis + 5000L));
+        subscriber.completeToReportLocalProcesses(new 
ReportLocalProcessesCompletedEvent("foo_task_id"));
+        
verify(ProcessOperationLockRegistry.getInstance()).notify("foo_task_id");
     }
     
     @Test
-    void assertKillLocalProcess() throws SQLException {
-        String instanceId = 
contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId();
-        String processId = "foo_id";
-        subscriber.killLocalProcess(new KillLocalProcessEvent(instanceId, 
processId));
-        verify(repository).delete("/nodes/compute_nodes/kill_process_trigger/" 
+ instanceId + ":foo_id");
+    void assertKillLocalProcessWithNotCurrentInstance() throws SQLException {
+        subscriber.killLocalProcess(new 
KillLocalProcessEvent("bar_instance_id", "foo_pid"));
+        verify(contextManager.getPersistServiceFacade().getRepository(), 
times(0)).delete(any());
     }
     
     @Test
-    void assertCompleteToKillLocalProcess() {
-        String processId = "foo_id";
-        long startMillis = System.currentTimeMillis();
-        Executors.newFixedThreadPool(1).submit(() -> {
-            Awaitility.await().pollDelay(50L, TimeUnit.MILLISECONDS).until(() 
-> true);
-            subscriber.completeToKillLocalProcess(new 
KillLocalProcessCompletedEvent(processId));
-        });
-        waitUntilReleaseReady(processId);
-        long currentMillis = System.currentTimeMillis();
-        assertThat(currentMillis, greaterThanOrEqualTo(startMillis + 50L));
-        assertThat(currentMillis, lessThanOrEqualTo(startMillis + 5000L));
+    void assertKillLocalProcessWithoutExistedProcess() throws SQLException {
+        when(ProcessRegistry.getInstance().get("foo_pid")).thenReturn(null);
+        subscriber.killLocalProcess(new 
KillLocalProcessEvent("foo_instance_id", "foo_pid"));
+        
verify(contextManager.getPersistServiceFacade().getRepository()).delete("/nodes/compute_nodes/kill_process_trigger/foo_instance_id:foo_pid");
+    }
+    
+    @Test
+    void assertKillLocalProcessWithExistedProcess() throws SQLException {
+        Process process = mock(Process.class, RETURNS_DEEP_STUBS);
+        Statement statement = mock(Statement.class);
+        
when(process.getProcessStatements()).thenReturn(Collections.singletonMap(1, 
statement));
+        when(ProcessRegistry.getInstance().get("foo_pid")).thenReturn(process);
+        subscriber.killLocalProcess(new 
KillLocalProcessEvent("foo_instance_id", "foo_pid"));
+        verify(process).setInterrupted(true);
+        verify(statement).cancel();
+        
verify(contextManager.getPersistServiceFacade().getRepository()).delete("/nodes/compute_nodes/kill_process_trigger/foo_instance_id:foo_pid");
     }
     
-    private void waitUntilReleaseReady(final String lockId) {
-        
ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(lockId, new 
ProcessOperationLockReleaseStrategy() {
-            
-            private final AtomicBoolean firstTime = new AtomicBoolean(true);
-            
-            @Override
-            public boolean isReadyToRelease() {
-                return !firstTime.getAndSet(false);
-            }
-        });
+    @Test
+    void assertCompleteToKillLocalProcess() {
+        subscriber.completeToKillLocalProcess(new 
KillLocalProcessCompletedEvent("foo_pid"));
+        verify(ProcessOperationLockRegistry.getInstance()).notify("foo_pid");
     }
 }

Reply via email to