This is an automated email from the ASF dual-hosted git repository. zhangliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new fac2406f572 Migrates unnecessary ListeningExecutorService to ExecutorService (#17725) fac2406f572 is described below commit fac2406f572ffd4c87d73f0cc286061c2bba97dc Author: 吴伟杰 <wuwei...@apache.org> AuthorDate: Tue May 17 14:59:39 2022 +0800 Migrates unnecessary ListeningExecutorService to ExecutorService (#17725) --- .../infra/executor/kernel/ExecutorEngine.java | 14 +++++++------- .../executor/kernel/thread/ExecutorServiceManager.java | 6 ++---- .../proxy/frontend/executor/UserExecutorGroup.java | 5 +++-- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngine.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngine.java index 45cff96196c..e29bb2a5108 100644 --- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngine.java +++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngine.java @@ -17,7 +17,6 @@ package org.apache.shardingsphere.infra.executor.kernel; -import com.google.common.util.concurrent.ListenableFuture; import lombok.Getter; import org.apache.shardingsphere.infra.exception.ShardingSphereException; import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup; @@ -34,6 +33,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; /** * Executor engine. @@ -127,7 +127,7 @@ public final class ExecutorEngine implements AutoCloseable { private <I, O> List<O> parallelExecute(final Iterator<ExecutionGroup<I>> executionGroups, final ExecutorCallback<I, O> firstCallback, final ExecutorCallback<I, O> callback) throws SQLException { ExecutionGroup<I> firstInputs = executionGroups.next(); - Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncExecute(executionGroups, callback); + Collection<Future<Collection<O>>> restResultFutures = asyncExecute(executionGroups, callback); return getGroupResults(syncExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures); } @@ -135,22 +135,22 @@ public final class ExecutorEngine implements AutoCloseable { return callback.execute(executionGroup.getInputs(), true, ExecutorDataMap.getValue()); } - private <I, O> Collection<ListenableFuture<Collection<O>>> asyncExecute(final Iterator<ExecutionGroup<I>> executionGroups, final ExecutorCallback<I, O> callback) { - Collection<ListenableFuture<Collection<O>>> result = new LinkedList<>(); + private <I, O> Collection<Future<Collection<O>>> asyncExecute(final Iterator<ExecutionGroup<I>> executionGroups, final ExecutorCallback<I, O> callback) { + Collection<Future<Collection<O>>> result = new LinkedList<>(); while (executionGroups.hasNext()) { result.add(asyncExecute(executionGroups.next(), callback)); } return result; } - private <I, O> ListenableFuture<Collection<O>> asyncExecute(final ExecutionGroup<I> executionGroup, final ExecutorCallback<I, O> callback) { + private <I, O> Future<Collection<O>> asyncExecute(final ExecutionGroup<I> executionGroup, final ExecutorCallback<I, O> callback) { Map<String, Object> dataMap = ExecutorDataMap.getValue(); return executorServiceManager.getExecutorService().submit(() -> callback.execute(executionGroup.getInputs(), false, dataMap)); } - private <O> List<O> getGroupResults(final Collection<O> firstResults, final Collection<ListenableFuture<Collection<O>>> restFutures) throws SQLException { + private <O> List<O> getGroupResults(final Collection<O> firstResults, final Collection<Future<Collection<O>>> restFutures) throws SQLException { List<O> result = new LinkedList<>(firstResults); - for (ListenableFuture<Collection<O>> each : restFutures) { + for (Future<Collection<O>> each : restFutures) { try { result.addAll(each.get()); } catch (final InterruptedException | ExecutionException ex) { diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/thread/ExecutorServiceManager.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/thread/ExecutorServiceManager.java index 733370bb093..0abe5b5df86 100644 --- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/thread/ExecutorServiceManager.java +++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/thread/ExecutorServiceManager.java @@ -17,8 +17,6 @@ package org.apache.shardingsphere.infra.executor.kernel.thread; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import lombok.Getter; import java.util.concurrent.ExecutorService; @@ -36,14 +34,14 @@ public final class ExecutorServiceManager { private static final ExecutorService SHUTDOWN_EXECUTOR = Executors.newSingleThreadExecutor(ExecutorThreadFactoryBuilder.build("Executor-Engine-Closer")); - private final ListeningExecutorService executorService; + private final ExecutorService executorService; public ExecutorServiceManager(final int executorSize) { this(executorSize, DEFAULT_NAME_FORMAT); } public ExecutorServiceManager(final int executorSize, final String nameFormat) { - executorService = MoreExecutors.listeningDecorator(getExecutorService(executorSize, nameFormat)); + executorService = getExecutorService(executorSize, nameFormat); } private ExecutorService getExecutorService(final int executorSize, final String nameFormat) { diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/UserExecutorGroup.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/UserExecutorGroup.java index cb97d096c4e..8ec10cc13ca 100644 --- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/UserExecutorGroup.java +++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/executor/UserExecutorGroup.java @@ -17,10 +17,11 @@ package org.apache.shardingsphere.proxy.frontend.executor; -import com.google.common.util.concurrent.ListeningExecutorService; import lombok.Getter; import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorServiceManager; +import java.util.concurrent.ExecutorService; + /** * User executor group. */ @@ -31,7 +32,7 @@ public final class UserExecutorGroup { private static final UserExecutorGroup INSTANCE = new UserExecutorGroup(); @Getter - private final ListeningExecutorService executorService; + private final ExecutorService executorService; private UserExecutorGroup() { ExecutorServiceManager executorServiceManager = new ExecutorServiceManager(0, NAME_FORMAT);