This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 515c5023abd Add more test cases for ProcessListChangedSubscriber
(#32885)
515c5023abd is described below
commit 515c5023abd18a3caa01345363dbe43772a099a6
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Sep 15 20:23:23 2024 +0800
Add more test cases for ProcessListChangedSubscriber (#32885)
* Add more test cases for ProcessListChangedSubscriber
* Add more test cases for ProcessListChangedSubscriber
---
.../dispatch/ProcessListChangedSubscriberTest.java | 169 +++++++--------------
1 file changed, 57 insertions(+), 112 deletions(-)
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ProcessListChangedSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ProcessListChangedSubscriberTest.java
index 64a93bc3abf..a1ce52b4977 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ProcessListChangedSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/ProcessListChangedSubscriberTest.java
@@ -17,165 +17,110 @@
package
org.apache.shardingsphere.mode.manager.cluster.event.subscriber.dispatch;
-import lombok.SneakyThrows;
-import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
-import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
-import org.apache.shardingsphere.infra.database.core.DefaultDatabase;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.executor.sql.process.Process;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
import
org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
-import
org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockReleaseStrategy;
-import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
-import
org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import
org.apache.shardingsphere.mode.event.dispatch.state.compute.KillLocalProcessCompletedEvent;
import
org.apache.shardingsphere.mode.event.dispatch.state.compute.KillLocalProcessEvent;
import
org.apache.shardingsphere.mode.event.dispatch.state.compute.ReportLocalProcessesCompletedEvent;
import
org.apache.shardingsphere.mode.event.dispatch.state.compute.ReportLocalProcessesEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
-import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
-import
org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
-import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
-import org.apache.shardingsphere.mode.persist.PersistServiceFacade;
-import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
-import org.apache.shardingsphere.mode.spi.PersistRepository;
-import org.awaitility.Awaitility;
+import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import org.apache.shardingsphere.test.mock.AutoMockExtension;
+import org.apache.shardingsphere.test.mock.StaticMockSettings;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.Mock;
-import org.mockito.internal.configuration.plugins.Plugins;
-import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.Collections;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-@ExtendWith(MockitoExtension.class)
+@ExtendWith(AutoMockExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+@StaticMockSettings({ProcessRegistry.class,
ProcessOperationLockRegistry.class})
class ProcessListChangedSubscriberTest {
private ProcessListChangedSubscriber subscriber;
- private ContextManager contextManager;
-
- @Mock
- private PersistRepository repository;
-
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
- private ShardingSphereDatabase database;
+ private ContextManager contextManager;
- @SneakyThrows(ReflectiveOperationException.class)
@BeforeEach
- void setUp() throws SQLException {
- EventBusContext eventBusContext = new EventBusContext();
- contextManager = new
ClusterContextManagerBuilder().build(createContextManagerBuilderParameter(),
eventBusContext);
-
contextManager.renewMetaDataContexts(MetaDataContextsFactory.create(contextManager.getPersistServiceFacade().getMetaDataPersistService(),
new ShardingSphereMetaData(createDatabases(),
-
contextManager.getMetaDataContexts().getMetaData().getGlobalResourceMetaData(),
contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(),
- new ConfigurationProperties(new Properties()))));
-
Plugins.getMemberAccessor().set(PersistServiceFacade.class.getDeclaredField("repository"),
contextManager.getPersistServiceFacade(), repository);
+ void setUp() {
+
when(contextManager.getPersistServiceFacade().getRepository()).thenReturn(mock(ClusterPersistRepository.class));
+
when(contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId()).thenReturn("foo_instance_id");
subscriber = new ProcessListChangedSubscriber(contextManager);
}
- private ContextManagerBuilderParameter
createContextManagerBuilderParameter() {
- ModeConfiguration modeConfig = new ModeConfiguration("Cluster", new
ClusterPersistRepositoryConfiguration("FIXTURE", "", "", new Properties()));
- InstanceMetaData instanceMetaData = new
ProxyInstanceMetaData("foo_instance_id", 3307);
- return new ContextManagerBuilderParameter(modeConfig,
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyList(),
- new Properties(), Collections.emptyList(), instanceMetaData,
false);
+ @Test
+ void assertReportLocalProcessesWithNotCurrentInstance() {
+ subscriber.reportLocalProcesses(new
ReportLocalProcessesEvent("bar_instance_id", "foo_task_id"));
+ verify(contextManager.getPersistServiceFacade().getRepository(),
times(0)).delete(any());
}
- private Map<String, ShardingSphereDatabase> createDatabases() {
-
when(database.getSchemas()).thenReturn(Collections.singletonMap("foo_schema",
new ShardingSphereSchema(DefaultDatabase.LOGIC_NAME)));
-
when(database.getProtocolType()).thenReturn(TypedSPILoader.getService(DatabaseType.class,
"MySQL"));
-
when(database.getSchema("foo_schema")).thenReturn(mock(ShardingSphereSchema.class));
-
when(database.getRuleMetaData().getConfigurations()).thenReturn(Collections.emptyList());
- return Collections.singletonMap("db", database);
+ @Test
+ void assertReportEmptyLocalProcesses() {
+
when(ProcessRegistry.getInstance().listAll()).thenReturn(Collections.emptyList());
+ subscriber.reportLocalProcesses(new
ReportLocalProcessesEvent("foo_instance_id", "foo_task_id"));
+ verify(contextManager.getPersistServiceFacade().getRepository(),
times(0)).persist(any(), any());
+
verify(contextManager.getPersistServiceFacade().getRepository()).delete("/nodes/compute_nodes/show_process_list_trigger/foo_instance_id:foo_task_id");
}
@Test
- void assertReportLocalProcesses() {
- Process process = mock(Process.class);
- String processId = "foo_id";
- when(process.getId()).thenReturn(processId);
- when(process.isInterrupted()).thenReturn(false);
- when(process.isIdle()).thenReturn(false);
- when(process.getCompletedUnitCount()).thenReturn(new AtomicInteger(0));
- when(process.getTotalUnitCount()).thenReturn(new AtomicInteger(0));
- ProcessRegistry.getInstance().add(process);
- String instanceId =
contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId();
- subscriber.reportLocalProcesses(new
ReportLocalProcessesEvent(instanceId, processId));
- verify(repository).persist("/execution_nodes/foo_id/" + instanceId,
- "processes:" + System.lineSeparator() + "- completedUnitCount:
0" + System.lineSeparator()
- + " id: foo_id" + System.lineSeparator()
- + " idle: false" + System.lineSeparator()
- + " interrupted: false" + System.lineSeparator()
- + " startMillis: 0" + System.lineSeparator()
- + " totalUnitCount: 0" + System.lineSeparator());
-
verify(repository).delete("/nodes/compute_nodes/show_process_list_trigger/" +
instanceId + ":foo_id");
+ void assertReportNotEmptyLocalProcesses() {
+
when(ProcessRegistry.getInstance().listAll()).thenReturn(Collections.singleton(mock(Process.class,
RETURNS_DEEP_STUBS)));
+ subscriber.reportLocalProcesses(new
ReportLocalProcessesEvent("foo_instance_id", "foo_task_id"));
+
verify(contextManager.getPersistServiceFacade().getRepository()).persist(eq("/execution_nodes/foo_task_id/foo_instance_id"),
any());
+
verify(contextManager.getPersistServiceFacade().getRepository()).delete("/nodes/compute_nodes/show_process_list_trigger/foo_instance_id:foo_task_id");
}
@Test
void assertCompleteToReportLocalProcesses() {
- String taskId = "foo_id";
- long startMillis = System.currentTimeMillis();
- Executors.newFixedThreadPool(1).submit(() -> {
- Awaitility.await().pollDelay(50L, TimeUnit.MILLISECONDS).until(()
-> true);
- subscriber.completeToReportLocalProcesses(new
ReportLocalProcessesCompletedEvent(taskId));
- });
- waitUntilReleaseReady(taskId);
- long currentMillis = System.currentTimeMillis();
- assertThat(currentMillis, greaterThanOrEqualTo(startMillis + 50L));
- assertThat(currentMillis, lessThanOrEqualTo(startMillis + 5000L));
+ subscriber.completeToReportLocalProcesses(new
ReportLocalProcessesCompletedEvent("foo_task_id"));
+
verify(ProcessOperationLockRegistry.getInstance()).notify("foo_task_id");
}
@Test
- void assertKillLocalProcess() throws SQLException {
- String instanceId =
contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId();
- String processId = "foo_id";
- subscriber.killLocalProcess(new KillLocalProcessEvent(instanceId,
processId));
- verify(repository).delete("/nodes/compute_nodes/kill_process_trigger/"
+ instanceId + ":foo_id");
+ void assertKillLocalProcessWithNotCurrentInstance() throws SQLException {
+ subscriber.killLocalProcess(new
KillLocalProcessEvent("bar_instance_id", "foo_pid"));
+ verify(contextManager.getPersistServiceFacade().getRepository(),
times(0)).delete(any());
}
@Test
- void assertCompleteToKillLocalProcess() {
- String processId = "foo_id";
- long startMillis = System.currentTimeMillis();
- Executors.newFixedThreadPool(1).submit(() -> {
- Awaitility.await().pollDelay(50L, TimeUnit.MILLISECONDS).until(()
-> true);
- subscriber.completeToKillLocalProcess(new
KillLocalProcessCompletedEvent(processId));
- });
- waitUntilReleaseReady(processId);
- long currentMillis = System.currentTimeMillis();
- assertThat(currentMillis, greaterThanOrEqualTo(startMillis + 50L));
- assertThat(currentMillis, lessThanOrEqualTo(startMillis + 5000L));
+ void assertKillLocalProcessWithoutExistedProcess() throws SQLException {
+ when(ProcessRegistry.getInstance().get("foo_pid")).thenReturn(null);
+ subscriber.killLocalProcess(new
KillLocalProcessEvent("foo_instance_id", "foo_pid"));
+
verify(contextManager.getPersistServiceFacade().getRepository()).delete("/nodes/compute_nodes/kill_process_trigger/foo_instance_id:foo_pid");
+ }
+
+ @Test
+ void assertKillLocalProcessWithExistedProcess() throws SQLException {
+ Process process = mock(Process.class, RETURNS_DEEP_STUBS);
+ Statement statement = mock(Statement.class);
+
when(process.getProcessStatements()).thenReturn(Collections.singletonMap(1,
statement));
+ when(ProcessRegistry.getInstance().get("foo_pid")).thenReturn(process);
+ subscriber.killLocalProcess(new
KillLocalProcessEvent("foo_instance_id", "foo_pid"));
+ verify(process).setInterrupted(true);
+ verify(statement).cancel();
+
verify(contextManager.getPersistServiceFacade().getRepository()).delete("/nodes/compute_nodes/kill_process_trigger/foo_instance_id:foo_pid");
}
- private void waitUntilReleaseReady(final String lockId) {
-
ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(lockId, new
ProcessOperationLockReleaseStrategy() {
-
- private final AtomicBoolean firstTime = new AtomicBoolean(true);
-
- @Override
- public boolean isReadyToRelease() {
- return !firstTime.getAndSet(false);
- }
- });
+ @Test
+ void assertCompleteToKillLocalProcess() {
+ subscriber.completeToKillLocalProcess(new
KillLocalProcessCompletedEvent("foo_pid"));
+ verify(ProcessOperationLockRegistry.getInstance()).notify("foo_pid");
}
}