This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d0abb75896c Use ProcessPersistService instead of ProcessListEvent and
remove ProcessSubscriber (#31411)
d0abb75896c is described below
commit d0abb75896cce63f8309d1992aa4f4aae5224895
Author: Haoran Meng <[email protected]>
AuthorDate: Mon May 27 13:41:49 2024 +0800
Use ProcessPersistService instead of ProcessListEvent and remove
ProcessSubscriber (#31411)
---
.../mode/process/ProcessSubscriber.java | 44 --------
.../process/event/KillProcessRequestEvent.java | 31 ------
.../process/event/ShowProcessListRequestEvent.java | 24 -----
.../event/ShowProcessListResponseEvent.java | 34 -------
.../cluster/ClusterContextManagerBuilder.java | 2 -
.../subscriber/ClusterProcessSubscriber.java | 113 ---------------------
.../ClusterProcessPersistServiceTest.java} | 24 +++--
.../StandaloneEventSubscriberRegistry.java | 4 +-
.../subscriber/StandaloneProcessSubscriber.java | 59 -----------
.../StandaloneProcessPersistServiceTest.java} | 39 +++++--
.../admin/executor/KillProcessExecutor.java | 7 +-
.../admin/executor/ShowProcessListExecutor.java | 18 +---
.../executor/ShowProcessListExecutorTest.java | 11 +-
13 files changed, 58 insertions(+), 352 deletions(-)
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/ProcessSubscriber.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/process/ProcessSubscriber.java
deleted file mode 100644
index 96fb755510c..00000000000
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/ProcessSubscriber.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.process;
-
-import org.apache.shardingsphere.mode.process.event.KillProcessRequestEvent;
-import
org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
-
-import java.sql.SQLException;
-
-/**
- * Process subscriber.
- */
-public interface ProcessSubscriber {
-
- /**
- * Post show process list data.
- *
- * @param event show process list request event
- */
- void postShowProcessListData(ShowProcessListRequestEvent event);
-
- /**
- * Kill process.
- *
- * @param event kill process request event
- * @throws SQLException SQL exception
- */
- void killProcess(KillProcessRequestEvent event) throws SQLException;
-}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/KillProcessRequestEvent.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/KillProcessRequestEvent.java
deleted file mode 100644
index e69590c04cd..00000000000
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/KillProcessRequestEvent.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.process.event;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-/**
- * Kill process request event.
- */
-@RequiredArgsConstructor
-@Getter
-public final class KillProcessRequestEvent {
-
- private final String id;
-}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/ShowProcessListRequestEvent.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/ShowProcessListRequestEvent.java
deleted file mode 100644
index 289964b6287..00000000000
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/ShowProcessListRequestEvent.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.process.event;
-
-/**
- * Show process list request event.
- */
-public final class ShowProcessListRequestEvent {
-}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/ShowProcessListResponseEvent.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/ShowProcessListResponseEvent.java
deleted file mode 100644
index 4b83a572516..00000000000
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/event/ShowProcessListResponseEvent.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.process.event;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.executor.sql.process.Process;
-
-import java.util.Collection;
-
-/**
- * Show process list response event.
- */
-@RequiredArgsConstructor
-@Getter
-public final class ShowProcessListResponseEvent {
-
- private final Collection<Process> processes;
-}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 72f4874996c..f6409ee689d 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -34,7 +34,6 @@ import
org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.GlobalLockPersistService;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcherFactory;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.ClusterProcessSubscriber;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber.QualifiedDataSourceStatusSubscriber;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.ClusterWorkerIdGenerator;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber.ClusterEventSubscriberRegistry;
@@ -97,7 +96,6 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
// TODO remove the method, only keep ZooKeeper's events, remove all
decouple events
private void createSubscribers(final EventBusContext eventBusContext,
final ClusterPersistRepository repository) {
eventBusContext.register(new
QualifiedDataSourceStatusSubscriber(repository));
- eventBusContext.register(new ClusterProcessSubscriber(repository,
eventBusContext));
}
private void registerOnline(final EventBusContext eventBusContext, final
ComputeNodeInstanceContext computeNodeInstanceContext,
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java
deleted file mode 100644
index 440d8e22dc9..00000000000
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber;
-
-import com.google.common.eventbus.Subscribe;
-import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
-import
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList;
-import
org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
-import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
-import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
-import org.apache.shardingsphere.mode.process.ProcessSubscriber;
-import org.apache.shardingsphere.mode.process.event.KillProcessRequestEvent;
-import
org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
-import
org.apache.shardingsphere.mode.process.event.ShowProcessListResponseEvent;
-import org.apache.shardingsphere.mode.spi.PersistRepository;
-
-import java.util.Collection;
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/**
- * Cluster process subscriber.
- */
-@RequiredArgsConstructor
-public final class ClusterProcessSubscriber implements ProcessSubscriber,
EventSubscriber {
-
- private final PersistRepository repository;
-
- private final EventBusContext eventBusContext;
-
- private final YamlProcessListSwapper swapper = new
YamlProcessListSwapper();
-
- @Override
- @Subscribe
- public void postShowProcessListData(final ShowProcessListRequestEvent
event) {
- String taskId = new UUID(ThreadLocalRandom.current().nextLong(),
ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
- Collection<String> triggerPaths =
getShowProcessListTriggerPaths(taskId);
- boolean isCompleted = false;
- try {
- triggerPaths.forEach(each -> repository.persist(each, ""));
- isCompleted =
ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(taskId, () ->
isReady(triggerPaths));
- postShowProcessListData(taskId);
- } finally {
- repository.delete(ProcessNode.getProcessIdPath(taskId));
- if (!isCompleted) {
- triggerPaths.forEach(repository::delete);
- }
- }
- }
-
- private void postShowProcessListData(final String taskId) {
- YamlProcessList yamlProcessList = new YamlProcessList();
- for (String each :
repository.getChildrenKeys(ProcessNode.getProcessIdPath(taskId)).stream()
- .map(each ->
repository.query(ProcessNode.getProcessListInstancePath(taskId,
each))).collect(Collectors.toList())) {
- yamlProcessList.getProcesses().addAll(YamlEngine.unmarshal(each,
YamlProcessList.class).getProcesses());
- }
- eventBusContext.post(new
ShowProcessListResponseEvent(swapper.swapToObject(yamlProcessList)));
- }
-
- private Collection<String> getShowProcessListTriggerPaths(final String
taskId) {
- return Stream.of(InstanceType.values())
- .flatMap(each ->
repository.getChildrenKeys(ComputeNode.getOnlineNodePath(each)).stream().map(onlinePath
-> ComputeNode.getProcessTriggerInstanceNodePath(onlinePath, taskId)))
- .collect(Collectors.toList());
- }
-
- private boolean isReady(final Collection<String> paths) {
- return paths.stream().noneMatch(each -> null !=
repository.query(each));
- }
-
- @Override
- @Subscribe
- public void killProcess(final KillProcessRequestEvent event) {
- String processId = event.getId();
- Collection<String> triggerPaths =
getKillProcessTriggerPaths(processId);
- boolean isCompleted = false;
- try {
- triggerPaths.forEach(each -> repository.persist(each, ""));
- isCompleted =
ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(processId, ()
-> isReady(triggerPaths));
- } finally {
- if (!isCompleted) {
- triggerPaths.forEach(repository::delete);
- }
- }
- }
-
- private Collection<String> getKillProcessTriggerPaths(final String
processId) {
- return Stream.of(InstanceType.values())
- .flatMap(each ->
repository.getChildrenKeys(ComputeNode.getOnlineNodePath(each)).stream().map(onlinePath
-> ComputeNode.getProcessKillInstanceIdNodePath(onlinePath, processId)))
- .collect(Collectors.toList());
- }
-}
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/service/ClusterProcessPersistServiceTest.java
similarity index 72%
rename from
mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriberTest.java
rename to
mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/service/ClusterProcessPersistServiceTest.java
index 5fc60d3e805..d7f2ce4969e 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/service/ClusterProcessPersistServiceTest.java
@@ -15,12 +15,10 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber;
+package org.apache.shardingsphere.mode.manager.cluster.service;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
-import
org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -35,26 +33,32 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
-class ClusterProcessSubscriberTest {
+class ClusterProcessPersistServiceTest {
@Mock
private ClusterPersistRepository repository;
- private final EventBusContext eventBusContext = new EventBusContext();
-
- private ClusterProcessSubscriber clusterProcessListSubscriber;
+ private ClusterProcessPersistService processPersistService;
@BeforeEach
void setUp() {
- clusterProcessListSubscriber = new
ClusterProcessSubscriber(repository, eventBusContext);
+ processPersistService = new ClusterProcessPersistService(repository);
}
@Test
- void assertPostShowProcessListData() {
+ void getProcessList() {
when(repository.getChildrenKeys(ComputeNode.getOnlineNodePath(InstanceType.JDBC))).thenReturn(Collections.emptyList());
when(repository.getChildrenKeys(ComputeNode.getOnlineNodePath(InstanceType.PROXY))).thenReturn(Collections.singletonList("abc"));
when(repository.query(any())).thenReturn(null);
- clusterProcessListSubscriber.postShowProcessListData(new
ShowProcessListRequestEvent());
+ processPersistService.getProcessList();
+ verify(repository).persist(any(), any());
+ }
+
+ @Test
+ void killProcess() {
+
when(repository.getChildrenKeys(ComputeNode.getOnlineNodePath(InstanceType.JDBC))).thenReturn(Collections.emptyList());
+
when(repository.getChildrenKeys(ComputeNode.getOnlineNodePath(InstanceType.PROXY))).thenReturn(Collections.singletonList("abc"));
+ processPersistService.killProcess("foo_process_id");
verify(repository).persist(any(), any());
}
}
diff --git
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneEventSubscriberRegistry.java
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneEventSubscriberRegistry.java
index b0a4278a9a7..040973879dc 100644
---
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneEventSubscriberRegistry.java
+++
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneEventSubscriberRegistry.java
@@ -27,8 +27,6 @@ import
org.apache.shardingsphere.mode.subsciber.RuleItemChangedSubscriber;
public final class StandaloneEventSubscriberRegistry extends
EventSubscriberRegistry {
public StandaloneEventSubscriberRegistry(final ContextManager
contextManager) {
- super(contextManager,
- new
StandaloneProcessSubscriber(contextManager.getComputeNodeInstanceContext().getEventBusContext()),
- new RuleItemChangedSubscriber(contextManager));
+ super(contextManager, new RuleItemChangedSubscriber(contextManager));
}
}
diff --git
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriber.java
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriber.java
deleted file mode 100644
index 8180b6334d6..00000000000
---
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriber.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.manager.standalone.subscriber;
-
-import com.google.common.eventbus.Subscribe;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.executor.sql.process.Process;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
-import org.apache.shardingsphere.mode.process.ProcessSubscriber;
-import org.apache.shardingsphere.mode.process.event.KillProcessRequestEvent;
-import
org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
-import
org.apache.shardingsphere.mode.process.event.ShowProcessListResponseEvent;
-
-import java.sql.SQLException;
-import java.sql.Statement;
-
-/**
- * Standalone process subscriber.
- */
-@RequiredArgsConstructor
-public final class StandaloneProcessSubscriber implements ProcessSubscriber,
EventSubscriber {
-
- private final EventBusContext eventBusContext;
-
- @Override
- @Subscribe
- public void postShowProcessListData(final ShowProcessListRequestEvent
event) {
- eventBusContext.post(new
ShowProcessListResponseEvent(ProcessRegistry.getInstance().listAll()));
- }
-
- @Override
- @Subscribe
- public void killProcess(final KillProcessRequestEvent event) throws
SQLException {
- Process process = ProcessRegistry.getInstance().get(event.getId());
- if (null == process) {
- return;
- }
- for (Statement each : process.getProcessStatements().values()) {
- each.cancel();
- }
- }
-}
diff --git
a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriberTest.java
b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/service/StandaloneProcessPersistServiceTest.java
similarity index 53%
rename from
mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriberTest.java
rename to
mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/service/StandaloneProcessPersistServiceTest.java
index 83e1dae112c..8c9e53d028a 100644
---
a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriberTest.java
+++
b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/service/StandaloneProcessPersistServiceTest.java
@@ -15,29 +15,56 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.manager.standalone.subscriber;
+package org.apache.shardingsphere.mode.manager.standalone.service;
+import org.apache.shardingsphere.infra.executor.sql.process.Process;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
-import
org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
import org.apache.shardingsphere.test.mock.AutoMockExtension;
import org.apache.shardingsphere.test.mock.StaticMockSettings;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(AutoMockExtension.class)
@StaticMockSettings(ProcessRegistry.class)
-class StandaloneProcessSubscriberTest {
+class StandaloneProcessPersistServiceTest {
+
+ private StandaloneProcessPersistService processPersistService;
+
+ @BeforeEach
+ void setUp() {
+ processPersistService = new StandaloneProcessPersistService();
+ }
@Test
- void assertPostShowProcessListData() {
+ void getProcessList() {
ProcessRegistry processRegistry = mock(ProcessRegistry.class);
when(ProcessRegistry.getInstance()).thenReturn(processRegistry);
- new StandaloneProcessSubscriber(new
EventBusContext()).postShowProcessListData(new ShowProcessListRequestEvent());
+ processPersistService.getProcessList();
verify(processRegistry).listAll();
}
+
+ @Test
+ void killProcess() throws SQLException {
+ ProcessRegistry processRegistry = mock(ProcessRegistry.class);
+ when(ProcessRegistry.getInstance()).thenReturn(processRegistry);
+ Process process = mock(Process.class);
+ Statement statement = mock(Statement.class);
+ Map<Integer, Statement> processStatements = new ConcurrentHashMap<>();
+ processStatements.put(1, statement);
+ when(process.getProcessStatements()).thenReturn(processStatements);
+ when(processRegistry.get(any())).thenReturn(process);
+ processPersistService.killProcess("foo_process_id");
+ verify(statement).cancel();
+ }
}
diff --git
a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/KillProcessExecutor.java
b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/KillProcessExecutor.java
index 87eebb9e610..ddea96d89a1 100644
---
a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/KillProcessExecutor.java
+++
b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/KillProcessExecutor.java
@@ -18,12 +18,13 @@
package org.apache.shardingsphere.proxy.backend.mysql.handler.admin.executor;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.mode.process.event.KillProcessRequestEvent;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import
org.apache.shardingsphere.proxy.backend.handler.admin.executor.DatabaseAdminExecutor;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dal.MySQLKillStatement;
+import java.sql.SQLException;
+
/**
* Kill process executor.
*/
@@ -38,8 +39,8 @@ public final class KillProcessExecutor implements
DatabaseAdminExecutor {
* @param connectionSession connection session
*/
@Override
- public void execute(final ConnectionSession connectionSession) {
+ public void execute(final ConnectionSession connectionSession) throws
SQLException {
String processId = killStatement.getProcessId();
-
ProxyContext.getInstance().getContextManager().getComputeNodeInstanceContext().getEventBusContext().post(new
KillProcessRequestEvent(processId));
+
ProxyContext.getInstance().getContextManager().getPersistServiceFacade().getProcessPersistService().killProcess(processId);
}
}
diff --git
a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
index 8aa4aabf976..d4907a68162 100644
---
a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
+++
b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.proxy.backend.mysql.handler.admin.executor;
-import com.google.common.eventbus.Subscribe;
import lombok.Getter;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResultMetaData;
@@ -29,8 +28,6 @@ import
org.apache.shardingsphere.infra.executor.sql.process.Process;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
import
org.apache.shardingsphere.infra.merge.result.impl.transparent.TransparentMergedResult;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
-import
org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
-import
org.apache.shardingsphere.mode.process.event.ShowProcessListResponseEvent;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import
org.apache.shardingsphere.proxy.backend.handler.admin.executor.DatabaseAdminQueryExecutor;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
@@ -50,8 +47,6 @@ public final class ShowProcessListExecutor implements
DatabaseAdminQueryExecutor
private final boolean showFullProcesslist;
- private Collection<Process> processes;
-
@Getter
private QueryResultMetaData queryResultMetaData;
@@ -63,17 +58,6 @@ public final class ShowProcessListExecutor implements
DatabaseAdminQueryExecutor
ProxyContext.getInstance().getContextManager().getComputeNodeInstanceContext().getEventBusContext().register(this);
}
- /**
- * Receive and handle response event.
- *
- * @param event show process list response event
- */
- @SuppressWarnings("unused")
- @Subscribe
- public void receiveProcessListData(final ShowProcessListResponseEvent
event) {
- processes = event.getProcesses();
- }
-
@Override
public void execute(final ConnectionSession connectionSession) {
queryResultMetaData = createQueryResultMetaData();
@@ -81,7 +65,7 @@ public final class ShowProcessListExecutor implements
DatabaseAdminQueryExecutor
}
private QueryResult getQueryResult() {
-
ProxyContext.getInstance().getContextManager().getComputeNodeInstanceContext().getEventBusContext().post(new
ShowProcessListRequestEvent());
+ Collection<Process> processes =
ProxyContext.getInstance().getContextManager().getPersistServiceFacade().getProcessPersistService().getProcessList();
if (null == processes || processes.isEmpty()) {
return new RawMemoryQueryResult(queryResultMetaData,
Collections.emptyList());
}
diff --git
a/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
b/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
index 5bd4d57c12c..16ad4602956 100644
---
a/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
+++
b/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
@@ -28,9 +28,9 @@ import org.apache.shardingsphere.test.mock.AutoMockExtension;
import org.apache.shardingsphere.test.mock.StaticMockSettings;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.internal.configuration.plugins.Plugins;
import java.sql.SQLException;
+import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -46,11 +46,11 @@ import static org.mockito.Mockito.when;
class ShowProcessListExecutorTest {
@Test
- void assertExecute() throws SQLException, ReflectiveOperationException {
+ void assertExecute() throws SQLException {
ContextManager contextManager = mock(ContextManager.class,
RETURNS_DEEP_STUBS);
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
+
when(contextManager.getPersistServiceFacade().getProcessPersistService().getProcessList()).thenReturn(mockProcessList());
ShowProcessListExecutor showProcessListExecutor = new
ShowProcessListExecutor(false);
- setupProcesses(showProcessListExecutor);
showProcessListExecutor.execute(new
ConnectionSession(mock(MySQLDatabaseType.class), new DefaultAttributeMap()));
assertThat(showProcessListExecutor.getQueryResultMetaData().getColumnCount(),
is(8));
MergedResult mergedResult = showProcessListExecutor.getMergedResult();
@@ -64,11 +64,10 @@ class ShowProcessListExecutorTest {
}
}
- private void setupProcesses(final ShowProcessListExecutor
showProcessListExecutor) throws ReflectiveOperationException {
+ private Collection<Process> mockProcessList() {
Process process = new Process("f6c2336a-63ba-41bf-941e-2e3504eb2c80",
1617939785160L,
"ALTER TABLE t_order ADD COLUMN a varchar(64) AFTER order_id",
"foo_db", "root", "127.0.0.1", new AtomicInteger(2), new AtomicInteger(1), new
AtomicBoolean(false),
new AtomicBoolean());
- Plugins.getMemberAccessor().set(
-
showProcessListExecutor.getClass().getDeclaredField("processes"),
showProcessListExecutor, Collections.singleton(process));
+ return Collections.singleton(process);
}
}