This is an automated email from the ASF dual-hosted git repository.

sureshanaparti pushed a commit to branch 4.19
in repository https://gitbox.apache.org/repos/asf/cloudstack.git


The following commit(s) were added to refs/heads/4.19 by this push:
     new 2ca1b474bd6 PowerFlex/ScaleIO SDC client connection improvements 
(#9268)
2ca1b474bd6 is described below

commit 2ca1b474bd6ccaafb91c90cf91dc7d3e71519148
Author: Suresh Kumar Anaparti <sureshkumar.anapa...@gmail.com>
AuthorDate: Sat Jun 29 10:01:50 2024 +0530

    PowerFlex/ScaleIO SDC client connection improvements (#9268)
    
    * Mitigation for non-scalable Powerflex/ScaleIO clients
    - Added ScaleIOSDCManager to manage SDC connections, checks clients limit, 
prepare and unprepare SDC on the hosts.
    - Added commands for prepare and unprepare storage clients to prepare/start 
and stop SDC service respectively on the hosts.
    - Introduced config 'storage.pool.connected.clients.limit' at storage level 
for client limits, currently support for Powerflex only.
    
    * tests issue fixed
    
    * refactor / improvements
    
    * lock with powerflex systemid while checking connections limit
    
    * updated powerflex systemid lock to hold till sdc preparation
    
    * Added custom stats support for storage pool, through listStoragePools API
    
    * code improvements, and unit tests
    
    * unit tests fixes
    
    * Update config 'storage.pool.connected.clients.limit' to dynamic, and some 
improvements
    
    * Stop SDC on host after migration if no volumes mapped to host
    
    * Wait for SDC to connect after scini service start, and some log 
improvements
    
    * Do not throw exception (log it) when SDC is not connected while revoking 
access for the powerflex volume
    
    * some log improvements
---
 .../org/apache/cloudstack/api/ApiConstants.java    |   1 +
 .../command/admin/storage/ListStoragePoolsCmd.java |   7 +-
 .../api/response/StoragePoolResponse.java          |  12 +
 .../agent/api/PrepareStorageClientAnswer.java      |  43 +++
 .../agent/api/PrepareStorageClientCommand.java     |  56 ++++
 .../agent/api/UnprepareStorageClientAnswer.java    |  34 ++
 .../agent/api/UnprepareStorageClientCommand.java   |  48 +++
 .../api/storage/PrimaryDataStoreDriver.java        |  26 ++
 .../java/com/cloud/storage/StorageManager.java     |  21 +-
 .../LibvirtPrepareStorageClientCommandWrapper.java |  52 ++++
 ...ibvirtUnprepareStorageClientCommandWrapper.java |  49 +++
 .../kvm/storage/KVMStoragePoolManager.java         |  11 +
 .../kvm/storage/ScaleIOStorageAdaptor.java         |  64 ++++
 .../hypervisor/kvm/storage/StorageAdaptor.java     |  24 ++
 ...virtPrepareStorageClientCommandWrapperTest.java |  87 ++++++
 ...rtUnprepareStorageClientCommandWrapperTest.java |  73 +++++
 .../kvm/storage/ScaleIOStorageAdaptorTest.java     | 191 ++++++++++++
 .../datastore/client/ScaleIOGatewayClient.java     |   2 +
 .../datastore/client/ScaleIOGatewayClientImpl.java |  26 ++
 .../driver/ScaleIOPrimaryDataStoreDriver.java      | 102 +++---
 .../ScaleIOPrimaryDataStoreLifeCycle.java          |  30 +-
 .../datastore/manager/ScaleIOSDCManager.java       |  47 +++
 .../datastore/manager/ScaleIOSDCManagerImpl.java   | 346 +++++++++++++++++++++
 .../datastore/provider/ScaleIOHostListener.java    |  66 ++--
 .../storage/datastore/util/ScaleIOUtil.java        |  45 +++
 .../spring-storage-volume-scaleio-context.xml      |   2 +
 .../ScaleIOPrimaryDataStoreLifeCycleTest.java      |   5 +-
 server/src/main/java/com/cloud/api/ApiDBUtils.java |   4 +-
 .../main/java/com/cloud/api/ApiResponseHelper.java |   2 +-
 .../java/com/cloud/api/query/QueryManagerImpl.java |   6 +-
 .../com/cloud/api/query/ViewResponseHelper.java    |   4 +-
 .../cloud/api/query/dao/StoragePoolJoinDao.java    |   2 +-
 .../api/query/dao/StoragePoolJoinDaoImpl.java      |  11 +-
 .../deploy/DeploymentPlanningManagerImpl.java      |   9 +
 .../java/com/cloud/storage/StorageManagerImpl.java |  41 +++
 35 files changed, 1431 insertions(+), 118 deletions(-)

diff --git a/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java 
b/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java
index 050464a13a6..2324b861830 100644
--- a/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java
+++ b/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java
@@ -439,6 +439,7 @@ public class ApiConstants {
     public static final String STORAGE_POLICY = "storagepolicy";
     public static final String STORAGE_MOTION_ENABLED = "storagemotionenabled";
     public static final String STORAGE_CAPABILITIES = "storagecapabilities";
+    public static final String STORAGE_CUSTOM_STATS = "storagecustomstats";
     public static final String SUBNET = "subnet";
     public static final String OWNER = "owner";
     public static final String SWAP_OWNER = "swapowner";
diff --git 
a/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/ListStoragePoolsCmd.java
 
b/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/ListStoragePoolsCmd.java
index 6923353b3bf..3da99de050b 100644
--- 
a/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/ListStoragePoolsCmd.java
+++ 
b/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/ListStoragePoolsCmd.java
@@ -74,7 +74,8 @@ public class ListStoragePoolsCmd extends BaseListCmd {
     @Parameter(name = ApiConstants.HOST_ID, type = CommandType.UUID, 
entityType = HostResponse.class, description = "host ID of the storage pools")
     private Long hostId;
 
-
+    @Parameter(name = ApiConstants.STORAGE_CUSTOM_STATS, type = 
CommandType.BOOLEAN, description = "If true, lists the custom stats of the 
storage pool", since = "4.18.1")
+    private Boolean customStats;
     /////////////////////////////////////////////////////
     /////////////////// Accessors ///////////////////////
     /////////////////////////////////////////////////////
@@ -131,6 +132,10 @@ public class ListStoragePoolsCmd extends BaseListCmd {
         this.scope = scope;
     }
 
+    public Boolean getCustomStats() {
+        return customStats != null && customStats;
+    }
+
     /////////////////////////////////////////////////////
     /////////////// API Implementation///////////////////
     /////////////////////////////////////////////////////
diff --git 
a/api/src/main/java/org/apache/cloudstack/api/response/StoragePoolResponse.java 
b/api/src/main/java/org/apache/cloudstack/api/response/StoragePoolResponse.java
index f514c8167ac..9e7f5159e0e 100644
--- 
a/api/src/main/java/org/apache/cloudstack/api/response/StoragePoolResponse.java
+++ 
b/api/src/main/java/org/apache/cloudstack/api/response/StoragePoolResponse.java
@@ -97,6 +97,10 @@ public class StoragePoolResponse extends 
BaseResponseWithAnnotations {
     @Param(description = "total min IOPS currently in use by volumes")
     private Long allocatedIops;
 
+    @SerializedName(ApiConstants.STORAGE_CUSTOM_STATS)
+    @Param(description = "the storage pool custom stats", since = "4.18.1")
+    private Map<String, String> customStats;
+
     @SerializedName("tags")
     @Param(description = "the tags for the storage pool")
     private String tags;
@@ -304,6 +308,14 @@ public class StoragePoolResponse extends 
BaseResponseWithAnnotations {
        this.allocatedIops = allocatedIops;
     }
 
+    public Map<String, String> getCustomStats() {
+        return customStats;
+    }
+
+    public void setCustomStats(Map<String, String> customStats) {
+        this.customStats = customStats;
+    }
+
     public String getTags() {
         return tags;
     }
diff --git 
a/core/src/main/java/com/cloud/agent/api/PrepareStorageClientAnswer.java 
b/core/src/main/java/com/cloud/agent/api/PrepareStorageClientAnswer.java
new file mode 100644
index 00000000000..85afb925646
--- /dev/null
+++ b/core/src/main/java/com/cloud/agent/api/PrepareStorageClientAnswer.java
@@ -0,0 +1,43 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package com.cloud.agent.api;
+
+import java.util.Map;
+
+public class PrepareStorageClientAnswer extends Answer {
+    Map<String, String> detailsMap;
+
+    public PrepareStorageClientAnswer() {
+        super();
+    }
+
+    public PrepareStorageClientAnswer(Command command, boolean success, 
Map<String, String> detailsMap) {
+        super(command, success, "");
+        this.detailsMap = detailsMap;
+    }
+
+    public PrepareStorageClientAnswer(Command command, boolean success, String 
details) {
+        super(command, success, details);
+    }
+
+    public Map<String, String> getDetailsMap() {
+        return detailsMap;
+    }
+}
diff --git 
a/core/src/main/java/com/cloud/agent/api/PrepareStorageClientCommand.java 
b/core/src/main/java/com/cloud/agent/api/PrepareStorageClientCommand.java
new file mode 100644
index 00000000000..8dea9c11c53
--- /dev/null
+++ b/core/src/main/java/com/cloud/agent/api/PrepareStorageClientCommand.java
@@ -0,0 +1,56 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package com.cloud.agent.api;
+
+import java.util.Map;
+
+import com.cloud.storage.Storage.StoragePoolType;
+
+public class PrepareStorageClientCommand extends Command {
+    private StoragePoolType poolType;
+    private String poolUuid;
+    private Map<String, String> details;
+
+    public PrepareStorageClientCommand() {
+    }
+
+    public PrepareStorageClientCommand(StoragePoolType poolType, String 
poolUuid, Map<String, String> details) {
+        this.poolType = poolType;
+        this.poolUuid = poolUuid;
+        this.details = details;
+    }
+
+    @Override
+    public boolean executeInSequence() {
+        return false;
+    }
+
+    public StoragePoolType getPoolType() {
+        return poolType;
+    }
+
+    public String getPoolUuid() {
+        return poolUuid;
+    }
+
+    public Map<String, String> getDetails() {
+        return details;
+    }
+}
diff --git 
a/core/src/main/java/com/cloud/agent/api/UnprepareStorageClientAnswer.java 
b/core/src/main/java/com/cloud/agent/api/UnprepareStorageClientAnswer.java
new file mode 100644
index 00000000000..1280293db0d
--- /dev/null
+++ b/core/src/main/java/com/cloud/agent/api/UnprepareStorageClientAnswer.java
@@ -0,0 +1,34 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package com.cloud.agent.api;
+
+public class UnprepareStorageClientAnswer extends Answer {
+    public UnprepareStorageClientAnswer() {
+        super();
+    }
+
+    public UnprepareStorageClientAnswer(Command command, boolean success) {
+        super(command, success, "");
+    }
+
+    public UnprepareStorageClientAnswer(Command command, boolean success, 
String details) {
+        super(command, success, details);
+    }
+}
diff --git 
a/core/src/main/java/com/cloud/agent/api/UnprepareStorageClientCommand.java 
b/core/src/main/java/com/cloud/agent/api/UnprepareStorageClientCommand.java
new file mode 100644
index 00000000000..bebd30ca519
--- /dev/null
+++ b/core/src/main/java/com/cloud/agent/api/UnprepareStorageClientCommand.java
@@ -0,0 +1,48 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package com.cloud.agent.api;
+
+import com.cloud.storage.Storage.StoragePoolType;
+
+public class UnprepareStorageClientCommand extends Command {
+    private StoragePoolType poolType;
+    private String poolUuid;
+
+    public UnprepareStorageClientCommand() {
+    }
+
+    public UnprepareStorageClientCommand(StoragePoolType poolType, String 
poolUuid) {
+        this.poolType = poolType;
+        this.poolUuid = poolUuid;
+    }
+
+    @Override
+    public boolean executeInSequence() {
+        return false;
+    }
+
+    public StoragePoolType getPoolType() {
+        return poolType;
+    }
+
+    public String getPoolUuid() {
+        return poolUuid;
+    }
+}
diff --git 
a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/PrimaryDataStoreDriver.java
 
b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/PrimaryDataStoreDriver.java
index 2c7d3c60278..0e70c7b528d 100644
--- 
a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/PrimaryDataStoreDriver.java
+++ 
b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/PrimaryDataStoreDriver.java
@@ -18,6 +18,8 @@
  */
 package org.apache.cloudstack.engine.subsystem.api.storage;
 
+import java.util.Map;
+
 import org.apache.cloudstack.framework.async.AsyncCompletionCallback;
 import org.apache.cloudstack.storage.command.CommandResult;
 
@@ -86,6 +88,22 @@ public interface PrimaryDataStoreDriver extends 
DataStoreDriver {
      */
     boolean canProvideStorageStats();
 
+    /**
+     * intended for managed storage
+     * returns true if the storage can provide its custom stats
+     */
+    default boolean poolProvidesCustomStorageStats() {
+        return false;
+    }
+
+    /**
+     * intended for managed storage
+     * returns the custom stats if the storage can provide them
+     */
+    default Map<String, String> getCustomStorageStats(StoragePool pool) {
+        return null;
+    }
+
     /**
      * intended for managed storage
      * returns the total capacity and used size in bytes
@@ -110,6 +128,14 @@ public interface PrimaryDataStoreDriver extends 
DataStoreDriver {
      */
     boolean canHostAccessStoragePool(Host host, StoragePool pool);
 
+    /**
+     * intended for managed storage
+     * returns true if the host can prepare storage client to provide access 
the storage pool
+     */
+    default boolean canHostPrepareStoragePoolAccess(Host host, StoragePool 
pool) {
+        return false;
+    }
+
     /**
      * Used by storage pools which want to keep VMs' information
      * @return true if additional VM info is needed (intended for storage 
pools).
diff --git 
a/engine/components-api/src/main/java/com/cloud/storage/StorageManager.java 
b/engine/components-api/src/main/java/com/cloud/storage/StorageManager.java
index 5e97cc9edfe..86ef02bb9bc 100644
--- a/engine/components-api/src/main/java/com/cloud/storage/StorageManager.java
+++ b/engine/components-api/src/main/java/com/cloud/storage/StorageManager.java
@@ -118,7 +118,7 @@ public interface StorageManager extends StorageService {
             "storage.pool.disk.wait",
             "Storage",
             "60",
-            "Timeout (in secs) for the storage pool disk (of managed pool) to 
become available in the host. Currently only supported for PowerFlex.",
+            "Timeout (in secs) for the storage pool disk (of managed pool) to 
become available in the host. Currently supported for PowerFlex only.",
             true,
             ConfigKey.Scope.StoragePool,
             null);
@@ -127,7 +127,7 @@ public interface StorageManager extends StorageService {
             "storage.pool.client.timeout",
             "Storage",
             "60",
-            "Timeout (in secs) for the storage pool client connection timeout 
(for managed pools). Currently only supported for PowerFlex.",
+            "Timeout (in secs) for the API client connection timeout of 
storage pool (for managed pools). Currently supported for PowerFlex only.",
             false,
             ConfigKey.Scope.StoragePool,
             null);
@@ -136,11 +136,20 @@ public interface StorageManager extends StorageService {
             "storage.pool.client.max.connections",
             "Storage",
             "100",
-            "Maximum connections for the storage pool client (for managed 
pools). Currently only supported for PowerFlex.",
+            "Maximum connections for the API client of storage pool (for 
managed pools). Currently supported for PowerFlex only.",
             false,
             ConfigKey.Scope.StoragePool,
             null);
 
+    ConfigKey<Integer> STORAGE_POOL_CONNECTED_CLIENTS_LIMIT = new 
ConfigKey<>(Integer.class,
+            "storage.pool.connected.clients.limit",
+            "Storage",
+            "-1",
+            "Maximum connected storage pool clients supported for the storage 
(for managed pools), <= 0 for unlimited (default: -1). Currently supported for 
PowerFlex only.",
+            true,
+            ConfigKey.Scope.StoragePool,
+            null);
+
     ConfigKey<String> STORAGE_POOL_IO_POLICY = new ConfigKey<>(String.class,
             "kvm.storage.pool.io.policy",
             "Storage",
@@ -252,6 +261,10 @@ public interface StorageManager extends StorageService {
 
     boolean canPoolProvideStorageStats(StoragePool pool);
 
+    boolean poolProvidesCustomStorageStats(StoragePool pool);
+
+    Map<String, String> getCustomStorageStats(StoragePool pool);
+
     /**
      * Checks if a host has running VMs that are using its local storage pool.
      * @return true if local storage is active on the host
@@ -286,6 +299,8 @@ public interface StorageManager extends StorageService {
 
     boolean canHostAccessStoragePool(Host host, StoragePool pool);
 
+    boolean canHostPrepareStoragePoolAccess(Host host, StoragePool pool);
+
     Host getHost(long hostId);
 
     Host updateSecondaryStorage(long secStorageId, String newUrl);
diff --git 
a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtPrepareStorageClientCommandWrapper.java
 
b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtPrepareStorageClientCommandWrapper.java
new file mode 100644
index 00000000000..79afd4696b0
--- /dev/null
+++ 
b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtPrepareStorageClientCommandWrapper.java
@@ -0,0 +1,52 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package com.cloud.hypervisor.kvm.resource.wrapper;
+
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import com.cloud.agent.api.Answer;
+import com.cloud.agent.api.PrepareStorageClientAnswer;
+import com.cloud.agent.api.PrepareStorageClientCommand;
+import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource;
+import com.cloud.hypervisor.kvm.storage.KVMStoragePoolManager;
+import com.cloud.resource.CommandWrapper;
+import com.cloud.resource.ResourceWrapper;
+import com.cloud.utils.Ternary;
+
+@ResourceWrapper(handles = PrepareStorageClientCommand.class)
+public class LibvirtPrepareStorageClientCommandWrapper extends 
CommandWrapper<PrepareStorageClientCommand, Answer, LibvirtComputingResource> {
+
+    private static final Logger s_logger = 
Logger.getLogger(LibvirtPrepareStorageClientCommandWrapper.class);
+
+    @Override
+    public Answer execute(PrepareStorageClientCommand cmd, 
LibvirtComputingResource libvirtComputingResource) {
+        final KVMStoragePoolManager storagePoolMgr = 
libvirtComputingResource.getStoragePoolMgr();
+        Ternary<Boolean, Map<String, String>, String> 
prepareStorageClientResult = 
storagePoolMgr.prepareStorageClient(cmd.getPoolType(), cmd.getPoolUuid(), 
cmd.getDetails());
+        if (!prepareStorageClientResult.first()) {
+            String msg = prepareStorageClientResult.third();
+            s_logger.debug("Unable to prepare storage client, due to: " + msg);
+            return new PrepareStorageClientAnswer(cmd, false, msg);
+        }
+        Map<String, String> details = prepareStorageClientResult.second();
+        return new PrepareStorageClientAnswer(cmd, true, details);
+    }
+}
diff --git 
a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtUnprepareStorageClientCommandWrapper.java
 
b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtUnprepareStorageClientCommandWrapper.java
new file mode 100644
index 00000000000..f98782fe748
--- /dev/null
+++ 
b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtUnprepareStorageClientCommandWrapper.java
@@ -0,0 +1,49 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package com.cloud.hypervisor.kvm.resource.wrapper;
+
+import org.apache.log4j.Logger;
+
+import com.cloud.agent.api.Answer;
+import com.cloud.agent.api.UnprepareStorageClientAnswer;
+import com.cloud.agent.api.UnprepareStorageClientCommand;
+import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource;
+import com.cloud.hypervisor.kvm.storage.KVMStoragePoolManager;
+import com.cloud.resource.CommandWrapper;
+import com.cloud.resource.ResourceWrapper;
+import com.cloud.utils.Pair;
+
+@ResourceWrapper(handles = UnprepareStorageClientCommand.class)
+public class LibvirtUnprepareStorageClientCommandWrapper extends 
CommandWrapper<UnprepareStorageClientCommand, Answer, LibvirtComputingResource> 
{
+
+    private static final Logger s_logger = 
Logger.getLogger(LibvirtUnprepareStorageClientCommandWrapper.class);
+
+    @Override
+    public Answer execute(UnprepareStorageClientCommand cmd, 
LibvirtComputingResource libvirtComputingResource) {
+        final KVMStoragePoolManager storagePoolMgr = 
libvirtComputingResource.getStoragePoolMgr();
+        Pair<Boolean, String> unprepareStorageClientResult = 
storagePoolMgr.unprepareStorageClient(cmd.getPoolType(), cmd.getPoolUuid());
+        if (!unprepareStorageClientResult.first()) {
+            String msg = unprepareStorageClientResult.second();
+            s_logger.debug("Couldn't unprepare storage client, due to: " + 
msg);
+            return new UnprepareStorageClientAnswer(cmd, false, msg);
+        }
+        return new UnprepareStorageClientAnswer(cmd, true);
+    }
+}
diff --git 
a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/KVMStoragePoolManager.java
 
b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/KVMStoragePoolManager.java
index b1842f38da2..4f25cfa08d5 100644
--- 
a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/KVMStoragePoolManager.java
+++ 
b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/KVMStoragePoolManager.java
@@ -42,6 +42,8 @@ import com.cloud.storage.Storage;
 import com.cloud.storage.Storage.StoragePoolType;
 import com.cloud.storage.StorageLayer;
 import com.cloud.storage.Volume;
+import com.cloud.utils.Pair;
+import com.cloud.utils.Ternary;
 import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.vm.VirtualMachine;
 
@@ -447,4 +449,13 @@ public class KVMStoragePoolManager {
         return adaptor.createTemplateFromDirectDownloadFile(templateFilePath, 
destTemplatePath, destPool, format, timeout);
     }
 
+    public Ternary<Boolean, Map<String, String>, String> 
prepareStorageClient(StoragePoolType type, String uuid, Map<String, String> 
details) {
+        StorageAdaptor adaptor = getStorageAdaptor(type);
+        return adaptor.prepareStorageClient(type, uuid, details);
+    }
+
+    public Pair<Boolean, String> unprepareStorageClient(StoragePoolType type, 
String uuid) {
+        StorageAdaptor adaptor = getStorageAdaptor(type);
+        return adaptor.unprepareStorageClient(type, uuid);
+    }
 }
diff --git 
a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/ScaleIOStorageAdaptor.java
 
b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/ScaleIOStorageAdaptor.java
index 7a98e3fb11f..60986f198a8 100644
--- 
a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/ScaleIOStorageAdaptor.java
+++ 
b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/ScaleIOStorageAdaptor.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import org.apache.cloudstack.storage.datastore.client.ScaleIOGatewayClient;
 import org.apache.cloudstack.storage.datastore.util.ScaleIOUtil;
 import org.apache.cloudstack.utils.cryptsetup.CryptSetup;
 import org.apache.cloudstack.utils.cryptsetup.CryptSetupException;
@@ -43,6 +44,8 @@ import org.libvirt.LibvirtException;
 import com.cloud.storage.Storage;
 import com.cloud.storage.StorageLayer;
 import com.cloud.storage.StorageManager;
+import com.cloud.utils.Pair;
+import com.cloud.utils.Ternary;
 import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.utils.script.OutputInterpreter;
 import com.cloud.utils.script.Script;
@@ -561,6 +564,67 @@ public class ScaleIOStorageAdaptor implements 
StorageAdaptor {
         qemu.resize(options, objects, usableSizeBytes);
     }
 
+    public Ternary<Boolean, Map<String, String>, String> 
prepareStorageClient(Storage.StoragePoolType type, String uuid, Map<String, 
String> details) {
+        if (!ScaleIOUtil.isSDCServiceInstalled()) {
+            LOGGER.debug("SDC service not installed on host, preparing the SDC 
client not possible");
+            return new Ternary<>(false, null, "SDC service not installed on 
host");
+        }
+
+        if (!ScaleIOUtil.isSDCServiceEnabled()) {
+            LOGGER.debug("SDC service not enabled on host, enabling it");
+            if (!ScaleIOUtil.enableSDCService()) {
+                return new Ternary<>(false, null, "SDC service not enabled on 
host");
+            }
+        }
+
+        if (!ScaleIOUtil.isSDCServiceActive()) {
+            if (!ScaleIOUtil.startSDCService()) {
+                return new Ternary<>(false, null, "Couldn't start SDC service 
on host");
+            }
+        } else if (!ScaleIOUtil.restartSDCService()) {
+            return new Ternary<>(false, null, "Couldn't restart SDC service on 
host");
+        }
+
+        return new Ternary<>( true, getSDCDetails(details), "Prepared client 
successfully");
+    }
+
+    public Pair<Boolean, String> 
unprepareStorageClient(Storage.StoragePoolType type, String uuid) {
+        if (!ScaleIOUtil.isSDCServiceInstalled()) {
+            LOGGER.debug("SDC service not installed on host, no need to 
unprepare the SDC client");
+            return new Pair<>(true, "SDC service not installed on host, no 
need to unprepare the SDC client");
+        }
+
+        if (!ScaleIOUtil.isSDCServiceEnabled()) {
+            LOGGER.debug("SDC service not enabled on host, no need to 
unprepare the SDC client");
+            return new Pair<>(true, "SDC service not enabled on host, no need 
to unprepare the SDC client");
+        }
+
+        if (!ScaleIOUtil.stopSDCService()) {
+            return new Pair<>(false, "Couldn't stop SDC service on host");
+        }
+
+        return new Pair<>(true, "Unprepared SDC client successfully");
+    }
+
+    private Map<String, String> getSDCDetails(Map<String, String> details) {
+        Map<String, String> sdcDetails = new HashMap<String, String>();
+        if (details == null || 
!details.containsKey(ScaleIOGatewayClient.STORAGE_POOL_SYSTEM_ID))  {
+            return sdcDetails;
+        }
+
+        String storageSystemId = 
details.get(ScaleIOGatewayClient.STORAGE_POOL_SYSTEM_ID);
+        String sdcId = ScaleIOUtil.getSdcId(storageSystemId);
+        if (sdcId != null) {
+            sdcDetails.put(ScaleIOGatewayClient.SDC_ID, sdcId);
+        } else {
+            String sdcGuId = ScaleIOUtil.getSdcGuid();
+            if (sdcGuId != null) {
+                sdcDetails.put(ScaleIOGatewayClient.SDC_GUID, sdcGuId);
+            }
+        }
+        return sdcDetails;
+    }
+
     /**
      * Calculates usable size from raw size, assuming qcow2 requires 192k/1GB 
for metadata
      * We also remove 128MiB for encryption/fragmentation/safety factor.
diff --git 
a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/StorageAdaptor.java
 
b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/StorageAdaptor.java
index 5cfdf6d1a8b..80e73e01a86 100644
--- 
a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/StorageAdaptor.java
+++ 
b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/StorageAdaptor.java
@@ -16,6 +16,7 @@
 // under the License.
 package com.cloud.hypervisor.kvm.storage;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -23,6 +24,8 @@ import 
org.apache.cloudstack.utils.qemu.QemuImg.PhysicalDiskFormat;
 
 import com.cloud.storage.Storage;
 import com.cloud.storage.Storage.StoragePoolType;
+import com.cloud.utils.Pair;
+import com.cloud.utils.Ternary;
 
 public interface StorageAdaptor {
 
@@ -105,4 +108,25 @@ public interface StorageAdaptor {
      * @param timeout
      */
     KVMPhysicalDisk createTemplateFromDirectDownloadFile(String 
templateFilePath, String destTemplatePath, KVMStoragePool destPool, 
Storage.ImageFormat format, int timeout);
+
+    /**
+     * Prepares the storage client.
+     * @param type type of the storage pool
+     * @param uuid uuid of the storage pool
+     * @param details any details of the storage pool that are required for 
client preparation
+     * @return status, client details, & message in case failed
+     */
+    default Ternary<Boolean, Map<String, String>, String> 
prepareStorageClient(StoragePoolType type, String uuid, Map<String, String> 
details) {
+        return new Ternary<>(true, new HashMap<>(), "");
+    }
+
+    /**
+     * Unprepares the storage client.
+     * @param type type of the storage pool
+     * @param uuid uuid of the storage pool
+     * @return status, & message in case failed
+     */
+    default Pair<Boolean, String> unprepareStorageClient(StoragePoolType type, 
String uuid) {
+        return new Pair<>(true, "");
+    }
 }
diff --git 
a/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtPrepareStorageClientCommandWrapperTest.java
 
b/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtPrepareStorageClientCommandWrapperTest.java
new file mode 100644
index 00000000000..e7dffeece71
--- /dev/null
+++ 
b/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtPrepareStorageClientCommandWrapperTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloud.hypervisor.kvm.resource.wrapper;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.cloudstack.storage.datastore.client.ScaleIOGatewayClient;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import com.cloud.agent.api.PrepareStorageClientAnswer;
+import com.cloud.agent.api.PrepareStorageClientCommand;
+import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource;
+import com.cloud.hypervisor.kvm.storage.KVMStoragePoolManager;
+import com.cloud.storage.Storage;
+import com.cloud.utils.Ternary;
+
+@RunWith(MockitoJUnitRunner.class)
+public class LibvirtPrepareStorageClientCommandWrapperTest {
+
+    @Spy
+    LibvirtPrepareStorageClientCommandWrapper 
libvirtPrepareStorageClientCommandWrapperSpy = 
Mockito.spy(LibvirtPrepareStorageClientCommandWrapper.class);
+
+    @Mock
+    LibvirtComputingResource libvirtComputingResourceMock;
+
+    private final static String poolUuid = 
"345fc603-2d7e-47d2-b719-a0110b3732e6";
+    private final static String systemId = "218ce1797566a00f";
+    private final static String sdcId = "301b852c00000003";
+
+    @Test
+    public void testPrepareStorageClientSuccess() {
+        Map<String, String> details = new HashMap<>();
+        details.put(ScaleIOGatewayClient.STORAGE_POOL_SYSTEM_ID, systemId);
+        PrepareStorageClientCommand cmd = 
Mockito.mock(PrepareStorageClientCommand.class);
+        
Mockito.when(cmd.getPoolType()).thenReturn(Storage.StoragePoolType.PowerFlex);
+        Mockito.when(cmd.getPoolUuid()).thenReturn(poolUuid);
+        Mockito.when(cmd.getDetails()).thenReturn(details);
+
+        KVMStoragePoolManager storagePoolMgr = 
Mockito.mock(KVMStoragePoolManager.class);
+        
Mockito.when(libvirtComputingResourceMock.getStoragePoolMgr()).thenReturn(storagePoolMgr);
+        details.put(ScaleIOGatewayClient.SDC_ID, sdcId);
+        Mockito.when(storagePoolMgr.prepareStorageClient(cmd.getPoolType(), 
cmd.getPoolUuid(), cmd.getDetails())).thenReturn(new Ternary<>(true, details, 
""));
+
+        PrepareStorageClientAnswer result = (PrepareStorageClientAnswer) 
libvirtPrepareStorageClientCommandWrapperSpy.execute(cmd, 
libvirtComputingResourceMock);
+
+        Assert.assertTrue(result.getResult());
+        Assert.assertEquals(sdcId, 
result.getDetailsMap().get(ScaleIOGatewayClient.SDC_ID));
+    }
+
+    @Test
+    public void testPrepareStorageClientFailure() {
+        Map<String, String> details = new HashMap<>();
+        details.put(ScaleIOGatewayClient.STORAGE_POOL_SYSTEM_ID, systemId);
+        PrepareStorageClientCommand cmd = 
Mockito.mock(PrepareStorageClientCommand.class);
+        
Mockito.when(cmd.getPoolType()).thenReturn(Storage.StoragePoolType.PowerFlex);
+        Mockito.when(cmd.getPoolUuid()).thenReturn(poolUuid);
+        Mockito.when(cmd.getDetails()).thenReturn(details);
+
+        KVMStoragePoolManager storagePoolMgr = 
Mockito.mock(KVMStoragePoolManager.class);
+        
Mockito.when(libvirtComputingResourceMock.getStoragePoolMgr()).thenReturn(storagePoolMgr);
+        Mockito.when(storagePoolMgr.prepareStorageClient(cmd.getPoolType(), 
cmd.getPoolUuid(), cmd.getDetails())).thenReturn(new Ternary<>(false, new 
HashMap<>() , "Prepare storage client failed"));
+
+        PrepareStorageClientAnswer result = (PrepareStorageClientAnswer) 
libvirtPrepareStorageClientCommandWrapperSpy.execute(cmd, 
libvirtComputingResourceMock);
+
+        Assert.assertFalse(result.getResult());
+        Assert.assertEquals("Prepare storage client failed", 
result.getDetails());
+    }
+}
diff --git 
a/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtUnprepareStorageClientCommandWrapperTest.java
 
b/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtUnprepareStorageClientCommandWrapperTest.java
new file mode 100644
index 00000000000..7409b286f32
--- /dev/null
+++ 
b/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtUnprepareStorageClientCommandWrapperTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloud.hypervisor.kvm.resource.wrapper;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import com.cloud.agent.api.UnprepareStorageClientAnswer;
+import com.cloud.agent.api.UnprepareStorageClientCommand;
+import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource;
+import com.cloud.hypervisor.kvm.storage.KVMStoragePoolManager;
+import com.cloud.storage.Storage;
+import com.cloud.utils.Pair;
+
+@RunWith(MockitoJUnitRunner.class)
+public class LibvirtUnprepareStorageClientCommandWrapperTest {
+
+    @Spy
+    LibvirtUnprepareStorageClientCommandWrapper 
libvirtUnprepareStorageClientCommandWrapperSpy = 
Mockito.spy(LibvirtUnprepareStorageClientCommandWrapper.class);
+
+    @Mock
+    LibvirtComputingResource libvirtComputingResourceMock;
+
+    private final static String poolUuid = 
"345fc603-2d7e-47d2-b719-a0110b3732e6";
+
+    @Test
+    public void testUnprepareStorageClientSuccess() {
+        UnprepareStorageClientCommand cmd = 
Mockito.mock(UnprepareStorageClientCommand.class);
+        
Mockito.when(cmd.getPoolType()).thenReturn(Storage.StoragePoolType.PowerFlex);
+        Mockito.when(cmd.getPoolUuid()).thenReturn(poolUuid);
+
+        KVMStoragePoolManager storagePoolMgr = 
Mockito.mock(KVMStoragePoolManager.class);
+        
Mockito.when(libvirtComputingResourceMock.getStoragePoolMgr()).thenReturn(storagePoolMgr);
+        Mockito.when(storagePoolMgr.unprepareStorageClient(cmd.getPoolType(), 
cmd.getPoolUuid())).thenReturn(new Pair<>(true, ""));
+
+        UnprepareStorageClientAnswer result = (UnprepareStorageClientAnswer) 
libvirtUnprepareStorageClientCommandWrapperSpy.execute(cmd, 
libvirtComputingResourceMock);
+
+        Assert.assertTrue(result.getResult());
+    }
+
+    @Test
+    public void testUnprepareStorageClientFailure() {
+        UnprepareStorageClientCommand cmd = 
Mockito.mock(UnprepareStorageClientCommand.class);
+        
Mockito.when(cmd.getPoolType()).thenReturn(Storage.StoragePoolType.PowerFlex);
+        Mockito.when(cmd.getPoolUuid()).thenReturn(poolUuid);
+
+        KVMStoragePoolManager storagePoolMgr = 
Mockito.mock(KVMStoragePoolManager.class);
+        
Mockito.when(libvirtComputingResourceMock.getStoragePoolMgr()).thenReturn(storagePoolMgr);
+        Mockito.when(storagePoolMgr.unprepareStorageClient(cmd.getPoolType(), 
cmd.getPoolUuid())).thenReturn(new Pair<>(false, "Unprepare storage client 
failed"));
+
+        UnprepareStorageClientAnswer result = (UnprepareStorageClientAnswer) 
libvirtUnprepareStorageClientCommandWrapperSpy.execute(cmd, 
libvirtComputingResourceMock);
+
+        Assert.assertFalse(result.getResult());
+        Assert.assertEquals("Unprepare storage client failed", 
result.getDetails());
+    }
+}
diff --git 
a/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/storage/ScaleIOStorageAdaptorTest.java
 
b/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/storage/ScaleIOStorageAdaptorTest.java
index 25fab1a6ff8..7db4f114e8c 100644
--- 
a/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/storage/ScaleIOStorageAdaptorTest.java
+++ 
b/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/storage/ScaleIOStorageAdaptorTest.java
@@ -17,13 +17,50 @@
 
 package com.cloud.hypervisor.kvm.storage;
 
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.cloudstack.storage.datastore.client.ScaleIOGatewayClient;
+import org.apache.cloudstack.storage.datastore.util.ScaleIOUtil;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import com.cloud.storage.Storage;
+import com.cloud.storage.StorageLayer;
+import com.cloud.utils.Pair;
+import com.cloud.utils.Ternary;
+import com.cloud.utils.script.Script;
+
 @RunWith(MockitoJUnitRunner.class)
 public class ScaleIOStorageAdaptorTest {
+
+    @Mock
+    StorageLayer storageLayer;
+    ScaleIOStorageAdaptor scaleIOStorageAdaptor;
+
+    private final static String poolUuid = 
"345fc603-2d7e-47d2-b719-a0110b3732e6";
+    private static MockedStatic<Script> mockedScript;
+
+    @Before
+    public void setUp() {
+        mockedScript = Mockito.mockStatic(Script.class);
+        scaleIOStorageAdaptor = Mockito.spy(new 
ScaleIOStorageAdaptor(storageLayer));
+    }
+
+    @After
+    public void tearDown() {
+        mockedScript.close();
+    }
+
     @Test
     public void getUsableBytesFromRawBytesTest() {
         Assert.assertEquals("Overhead calculated for 8Gi size", 8454111232L, 
ScaleIOStorageAdaptor.getUsableBytesFromRawBytes(8L << 30));
@@ -31,4 +68,158 @@ public class ScaleIOStorageAdaptorTest {
         Assert.assertEquals("Overhead calculated for 500Gi size", 
536636342272L, ScaleIOStorageAdaptor.getUsableBytesFromRawBytes(500L << 30));
         Assert.assertEquals("Unsupported small size", 0, 
ScaleIOStorageAdaptor.getUsableBytesFromRawBytes(1L));
     }
+
+    @Test
+    public void testPrepareStorageClient_SDCServiceNotInstalled() {
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl 
status scini"))).thenReturn(4);
+
+        Ternary<Boolean, Map<String, String>, String> result = 
scaleIOStorageAdaptor.prepareStorageClient(Storage.StoragePoolType.PowerFlex, 
poolUuid, new HashMap<>());
+
+        Assert.assertFalse(result.first());
+        Assert.assertNull(result.second());
+        Assert.assertEquals("SDC service not installed on host", 
result.third());
+    }
+
+    @Test
+    public void testPrepareStorageClient_SDCServiceNotEnabled() {
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl 
status scini"))).thenReturn(3);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl 
is-enabled scini"))).thenReturn(1);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl 
enable scini"))).thenReturn(1);
+
+        Ternary<Boolean, Map<String, String>, String> result = 
scaleIOStorageAdaptor.prepareStorageClient(Storage.StoragePoolType.PowerFlex, 
poolUuid, new HashMap<>());
+
+        Assert.assertFalse(result.first());
+        Assert.assertNull(result.second());
+        Assert.assertEquals("SDC service not enabled on host", result.third());
+    }
+
+    @Test
+    public void testPrepareStorageClient_SDCServiceNotRestarted() {
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl 
status scini"))).thenReturn(3);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl 
is-enabled scini"))).thenReturn(0);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl 
is-active scini"))).thenReturn(0);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl 
restart scini"))).thenReturn(1);
+
+        Ternary<Boolean, Map<String, String>, String> result = 
scaleIOStorageAdaptor.prepareStorageClient(Storage.StoragePoolType.PowerFlex, 
poolUuid, new HashMap<>());
+
+        Assert.assertFalse(result.first());
+        Assert.assertNull(result.second());
+        Assert.assertEquals("Couldn't restart SDC service on host", 
result.third());
+    }
+
+    @Test
+    public void testPrepareStorageClient_SDCServiceRestarted() {
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl 
status scini"))).thenReturn(3);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl 
is-enabled scini"))).thenReturn(0);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl 
is-active scini"))).thenReturn(0);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl 
restart scini"))).thenReturn(0);
+
+        Ternary<Boolean, Map<String, String>, String> result = 
scaleIOStorageAdaptor.prepareStorageClient(Storage.StoragePoolType.PowerFlex, 
poolUuid, new HashMap<>());
+
+        Assert.assertTrue(result.first());
+        Assert.assertNotNull(result.second());
+        Assert.assertTrue(result.second().isEmpty());
+    }
+
+    @Test
+    public void testPrepareStorageClient_SDCServiceNotStarted() {
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl 
status scini"))).thenReturn(3);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl 
is-enabled scini"))).thenReturn(0);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl 
is-active scini"))).thenReturn(1);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl 
start scini"))).thenReturn(1);
+
+        Ternary<Boolean, Map<String, String>, String> result = 
scaleIOStorageAdaptor.prepareStorageClient(Storage.StoragePoolType.PowerFlex, 
poolUuid, new HashMap<>());
+
+        Assert.assertFalse(result.first());
+        Assert.assertNull(result.second());
+        Assert.assertEquals("Couldn't start SDC service on host", 
result.third());
+    }
+
+    @Test
+    public void testPrepareStorageClient_SDCServiceStartedReturnSDCId() {
+        Map<String, String> details = new HashMap<>();
+        String systemId = "218ce1797566a00f";
+        details.put(ScaleIOGatewayClient.STORAGE_POOL_SYSTEM_ID, systemId);
+
+        try (MockedStatic<ScaleIOUtil> ignored = 
Mockito.mockStatic(ScaleIOUtil.class)) {
+            when(ScaleIOUtil.isSDCServiceInstalled()).thenReturn(true);
+            when(ScaleIOUtil.isSDCServiceEnabled()).thenReturn(true);
+            when(ScaleIOUtil.isSDCServiceActive()).thenReturn(false);
+            when(ScaleIOUtil.startSDCService()).thenReturn(true);
+            String sdcId = "301b852c00000003";
+            when(ScaleIOUtil.getSdcId(systemId)).thenReturn(sdcId);
+
+            Ternary<Boolean, Map<String, String>, String> result = 
scaleIOStorageAdaptor.prepareStorageClient(Storage.StoragePoolType.PowerFlex, 
poolUuid, details);
+
+            Assert.assertTrue(result.first());
+            Assert.assertNotNull(result.second());
+            Assert.assertEquals(sdcId, 
result.second().get(ScaleIOGatewayClient.SDC_ID));
+        }
+    }
+
+    @Test
+    public void testPrepareStorageClient_SDCServiceStartedReturnSDCGuid() {
+        Map<String, String> details = new HashMap<>();
+        String systemId = "218ce1797566a00f";
+        details.put(ScaleIOGatewayClient.STORAGE_POOL_SYSTEM_ID, systemId);
+
+        String sdcGuid = "B0E3BFB8-C20B-43BF-93C8-13339E85AA50";
+        try (MockedStatic<ScaleIOUtil> ignored = 
Mockito.mockStatic(ScaleIOUtil.class)) {
+            when(ScaleIOUtil.isSDCServiceInstalled()).thenReturn(true);
+            when(ScaleIOUtil.isSDCServiceEnabled()).thenReturn(true);
+            when(ScaleIOUtil.isSDCServiceActive()).thenReturn(false);
+            when(ScaleIOUtil.startSDCService()).thenReturn(true);
+            when(ScaleIOUtil.getSdcId(systemId)).thenReturn(null);
+            when(ScaleIOUtil.getSdcGuid()).thenReturn(sdcGuid);
+
+            Ternary<Boolean, Map<String, String>, String> result = 
scaleIOStorageAdaptor.prepareStorageClient(Storage.StoragePoolType.PowerFlex, 
poolUuid, details);
+            Assert.assertTrue(result.first());
+            Assert.assertNotNull(result.second());
+            Assert.assertEquals(sdcGuid, 
result.second().get(ScaleIOGatewayClient.SDC_GUID));
+        }
+    }
+
+    @Test
+    public void testUnprepareStorageClient_SDCServiceNotInstalled() {
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl 
status scini"))).thenReturn(4);
+
+        Pair<Boolean, String> result = 
scaleIOStorageAdaptor.unprepareStorageClient(Storage.StoragePoolType.PowerFlex, 
poolUuid);
+
+        Assert.assertTrue(result.first());
+        Assert.assertEquals("SDC service not installed on host, no need to 
unprepare the SDC client", result.second());
+    }
+
+    @Test
+    public void testUnprepareStorageClient_SDCServiceNotEnabled() {
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl 
status scini"))).thenReturn(3);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl 
is-enabled scini"))).thenReturn(1);
+
+        Pair<Boolean, String> result = 
scaleIOStorageAdaptor.unprepareStorageClient(Storage.StoragePoolType.PowerFlex, 
poolUuid);
+
+        Assert.assertTrue(result.first());
+        Assert.assertEquals("SDC service not enabled on host, no need to 
unprepare the SDC client", result.second());
+    }
+
+    @Test
+    public void testUnprepareStorageClient_SDCServiceNotStopped() {
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl 
status scini"))).thenReturn(3);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl 
is-enabled scini"))).thenReturn(0);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl stop 
scini"))).thenReturn(1);
+
+        Pair<Boolean, String> result = 
scaleIOStorageAdaptor.unprepareStorageClient(Storage.StoragePoolType.PowerFlex, 
poolUuid);
+
+        Assert.assertFalse(result.first());
+        Assert.assertEquals("Couldn't stop SDC service on host", 
result.second());
+    }
+
+    @Test
+    public void testUnprepareStorageClient_SDCServiceStopped() {
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl 
status scini"))).thenReturn(3);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl 
is-enabled scini"))).thenReturn(0);
+        when(Script.runSimpleBashScriptForExitValue(Mockito.eq("systemctl stop 
scini"))).thenReturn(0);
+
+        Pair<Boolean, String> result = 
scaleIOStorageAdaptor.unprepareStorageClient(Storage.StoragePoolType.PowerFlex, 
poolUuid);
+
+        Assert.assertTrue(result.first());
+    }
 }
diff --git 
a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/client/ScaleIOGatewayClient.java
 
b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/client/ScaleIOGatewayClient.java
index 73b69bdef4f..fd2b93bc674 100644
--- 
a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/client/ScaleIOGatewayClient.java
+++ 
b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/client/ScaleIOGatewayClient.java
@@ -79,6 +79,7 @@ public interface ScaleIOGatewayClient {
     VolumeStatistics getVolumeStatistics(String volumeId);
     String getSystemId(String protectionDomainId);
     List<Volume> listVolumesInStoragePool(String poolId);
+    List<Volume> listVolumesMappedToSdc(String sdcId);
 
     // SDC APIs
     List<Sdc> listSdcs();
@@ -86,6 +87,7 @@ public interface ScaleIOGatewayClient {
     String getSdcIdByGuid(String sdcGuid);
     Sdc getSdcByIp(String ipAddress);
     Sdc getConnectedSdcByIp(String ipAddress);
+    int getConnectedSdcsCount();
     boolean haveConnectedSdcs();
     boolean isSdcConnected(String sdcId);
     boolean isSdcConnectedByIP(String ipAddress);
diff --git 
a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/client/ScaleIOGatewayClientImpl.java
 
b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/client/ScaleIOGatewayClientImpl.java
index fa428313943..2c044d8a0ce 100644
--- 
a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/client/ScaleIOGatewayClientImpl.java
+++ 
b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/client/ScaleIOGatewayClientImpl.java
@@ -1003,6 +1003,17 @@ public class ScaleIOGatewayClientImpl implements 
ScaleIOGatewayClient {
         return new ArrayList<>();
     }
 
+    @Override
+    public List<Volume> listVolumesMappedToSdc(String sdcId) {
+        Preconditions.checkArgument(StringUtils.isNotEmpty(sdcId), "SDC id 
cannot be null");
+
+        Volume[] volumes = get("/instances/Sdc::" + sdcId + 
"/relationships/Volume", Volume[].class);
+        if (volumes != null) {
+            return Arrays.asList(volumes);
+        }
+        return new ArrayList<>();
+    }
+
     ///////////////////////////////////////////////
     //////////////// SDC APIs /////////////////////
     ///////////////////////////////////////////////
@@ -1061,6 +1072,21 @@ public class ScaleIOGatewayClientImpl implements 
ScaleIOGatewayClient {
         return null;
     }
 
+    @Override
+    public int getConnectedSdcsCount() {
+        List<Sdc> sdcs = listSdcs();
+        int connectedSdcsCount = 0;
+        if(sdcs != null) {
+            for (Sdc sdc : sdcs) {
+                if 
(MDM_CONNECTED_STATE.equalsIgnoreCase(sdc.getMdmConnectionState())) {
+                    connectedSdcsCount++;
+                }
+            }
+        }
+
+        return connectedSdcsCount;
+    }
+
     @Override
     public boolean haveConnectedSdcs() {
         List<Sdc> sdcs = listSdcs();
diff --git 
a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/driver/ScaleIOPrimaryDataStoreDriver.java
 
b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/driver/ScaleIOPrimaryDataStoreDriver.java
index 529bd25ec8b..dec6ca00ab0 100644
--- 
a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/driver/ScaleIOPrimaryDataStoreDriver.java
+++ 
b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/driver/ScaleIOPrimaryDataStoreDriver.java
@@ -56,6 +56,8 @@ import 
org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolDetailVO;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolDetailsDao;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
+import org.apache.cloudstack.storage.datastore.manager.ScaleIOSDCManager;
+import org.apache.cloudstack.storage.datastore.manager.ScaleIOSDCManagerImpl;
 import org.apache.cloudstack.storage.datastore.util.ScaleIOUtil;
 import org.apache.cloudstack.storage.to.SnapshotObjectTO;
 import org.apache.cloudstack.storage.to.VolumeObjectTO;
@@ -99,6 +101,7 @@ import com.cloud.storage.dao.VolumeDao;
 import com.cloud.storage.dao.VolumeDetailsDao;
 import com.cloud.utils.NumbersUtil;
 import com.cloud.utils.Pair;
+import com.cloud.utils.component.ComponentContext;
 import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.vm.VMInstanceVO;
 import com.cloud.vm.VirtualMachine;
@@ -141,9 +144,10 @@ public class ScaleIOPrimaryDataStoreDriver implements 
PrimaryDataStoreDriver {
     private VolumeService volumeService;
     @Inject
     private VolumeOrchestrationService volumeMgr;
+    private ScaleIOSDCManager sdcManager;
 
     public ScaleIOPrimaryDataStoreDriver() {
-
+        sdcManager = new ScaleIOSDCManagerImpl();
     }
 
     public ScaleIOGatewayClient getScaleIOClient(final Long storagePoolId) 
throws Exception {
@@ -151,7 +155,8 @@ public class ScaleIOPrimaryDataStoreDriver implements 
PrimaryDataStoreDriver {
     }
 
     private boolean setVolumeLimitsOnSDC(VolumeVO volume, Host host, DataStore 
dataStore, Long iopsLimit, Long bandwidthLimitInKbps) throws Exception {
-        final String sdcId = getConnectedSdc(dataStore.getId(), host.getId());
+        sdcManager = ComponentContext.inject(sdcManager);
+        final String sdcId = sdcManager.prepareSDC(host, dataStore);
         if (StringUtils.isBlank(sdcId)) {
             alertHostSdcDisconnection(host);
             throw new CloudRuntimeException("Unable to grant access to volume: 
" + volume.getId() + ", no Sdc connected with host ip: " + 
host.getPrivateIpAddress());
@@ -187,6 +192,13 @@ public class ScaleIOPrimaryDataStoreDriver implements 
PrimaryDataStoreDriver {
     @Override
     public boolean grantAccess(DataObject dataObject, Host host, DataStore 
dataStore) {
         try {
+            sdcManager = ComponentContext.inject(sdcManager);
+            final String sdcId = sdcManager.prepareSDC(host, dataStore);
+            if (StringUtils.isBlank(sdcId)) {
+                alertHostSdcDisconnection(host);
+                throw new CloudRuntimeException(String.format("Unable to grant 
access to %s: %s, no Sdc connected with host ip: %s", dataObject.getType(), 
dataObject.getId(), host.getPrivateIpAddress()));
+            }
+
             if (DataObjectType.VOLUME.equals(dataObject.getType())) {
                 final VolumeVO volume = volumeDao.findById(dataObject.getId());
                 LOGGER.debug("Granting access for PowerFlex volume: " + 
volume.getPath());
@@ -194,25 +206,11 @@ public class ScaleIOPrimaryDataStoreDriver implements 
PrimaryDataStoreDriver {
             } else if (DataObjectType.TEMPLATE.equals(dataObject.getType())) {
                 final VMTemplateStoragePoolVO templatePoolRef = 
vmTemplatePoolDao.findByPoolTemplate(dataStore.getId(), dataObject.getId(), 
null);
                 LOGGER.debug("Granting access for PowerFlex template volume: " 
+ templatePoolRef.getInstallPath());
-
-                final String sdcId = getConnectedSdc(dataStore.getId(), 
host.getId());
-                if (StringUtils.isBlank(sdcId)) {
-                    alertHostSdcDisconnection(host);
-                    throw new CloudRuntimeException("Unable to grant access to 
template: " + dataObject.getId() + ", no Sdc connected with host ip: " + 
host.getPrivateIpAddress());
-                }
-
                 final ScaleIOGatewayClient client = 
getScaleIOClient(dataStore.getId());
                 return 
client.mapVolumeToSdc(ScaleIOUtil.getVolumePath(templatePoolRef.getInstallPath()),
 sdcId);
             } else if (DataObjectType.SNAPSHOT.equals(dataObject.getType())) {
                 SnapshotInfo snapshot = (SnapshotInfo) dataObject;
                 LOGGER.debug("Granting access for PowerFlex volume snapshot: " 
+ snapshot.getPath());
-
-                final String sdcId = getConnectedSdc(dataStore.getId(), 
host.getId());
-                if (StringUtils.isBlank(sdcId)) {
-                    alertHostSdcDisconnection(host);
-                    throw new CloudRuntimeException("Unable to grant access to 
snapshot: " + dataObject.getId() + ", no Sdc connected with host ip: " + 
host.getPrivateIpAddress());
-                }
-
                 final ScaleIOGatewayClient client = 
getScaleIOClient(dataStore.getId());
                 return 
client.mapVolumeToSdc(ScaleIOUtil.getVolumePath(snapshot.getPath()), sdcId);
             }
@@ -236,40 +234,29 @@ public class ScaleIOPrimaryDataStoreDriver implements 
PrimaryDataStoreDriver {
         }
 
         try {
+            final String sdcId = getConnectedSdc(dataStore.getId(), 
host.getId());
+            if (StringUtils.isBlank(sdcId)) {
+                LOGGER.warn(String.format("Unable to revoke access for %s: %s, 
no Sdc connected with host ip: %s", dataObject.getType(), dataObject.getId(), 
host.getPrivateIpAddress()));
+                return;
+            }
+            final ScaleIOGatewayClient client = 
getScaleIOClient(dataStore.getId());
             if (DataObjectType.VOLUME.equals(dataObject.getType())) {
                 final VolumeVO volume = volumeDao.findById(dataObject.getId());
                 LOGGER.debug("Revoking access for PowerFlex volume: " + 
volume.getPath());
-
-                final String sdcId = getConnectedSdc(dataStore.getId(), 
host.getId());
-                if (StringUtils.isBlank(sdcId)) {
-                    throw new CloudRuntimeException("Unable to revoke access 
for volume: " + dataObject.getId() + ", no Sdc connected with host ip: " + 
host.getPrivateIpAddress());
-                }
-
-                final ScaleIOGatewayClient client = 
getScaleIOClient(dataStore.getId());
                 
client.unmapVolumeFromSdc(ScaleIOUtil.getVolumePath(volume.getPath()), sdcId);
             } else if (DataObjectType.TEMPLATE.equals(dataObject.getType())) {
                 final VMTemplateStoragePoolVO templatePoolRef = 
vmTemplatePoolDao.findByPoolTemplate(dataStore.getId(), dataObject.getId(), 
null);
                 LOGGER.debug("Revoking access for PowerFlex template volume: " 
+ templatePoolRef.getInstallPath());
-
-                final String sdcId = getConnectedSdc(dataStore.getId(), 
host.getId());
-                if (StringUtils.isBlank(sdcId)) {
-                    throw new CloudRuntimeException("Unable to revoke access 
for template: " + dataObject.getId() + ", no Sdc connected with host ip: " + 
host.getPrivateIpAddress());
-                }
-
-                final ScaleIOGatewayClient client = 
getScaleIOClient(dataStore.getId());
                 
client.unmapVolumeFromSdc(ScaleIOUtil.getVolumePath(templatePoolRef.getInstallPath()),
 sdcId);
             } else if (DataObjectType.SNAPSHOT.equals(dataObject.getType())) {
                 SnapshotInfo snapshot = (SnapshotInfo) dataObject;
                 LOGGER.debug("Revoking access for PowerFlex volume snapshot: " 
+ snapshot.getPath());
-
-                final String sdcId = getConnectedSdc(dataStore.getId(), 
host.getId());
-                if (StringUtils.isBlank(sdcId)) {
-                    throw new CloudRuntimeException("Unable to revoke access 
for snapshot: " + dataObject.getId() + ", no Sdc connected with host ip: " + 
host.getPrivateIpAddress());
-                }
-
-                final ScaleIOGatewayClient client = 
getScaleIOClient(dataStore.getId());
                 
client.unmapVolumeFromSdc(ScaleIOUtil.getVolumePath(snapshot.getPath()), sdcId);
             }
+            if (client.listVolumesMappedToSdc(sdcId).isEmpty()) {
+                sdcManager = ComponentContext.inject(sdcManager);
+                sdcManager.stopSDC(host, dataStore);
+            }
         } catch (Exception e) {
             LOGGER.warn("Failed to revoke access due to: " + e.getMessage(), 
e);
         }
@@ -286,11 +273,16 @@ public class ScaleIOPrimaryDataStoreDriver implements 
PrimaryDataStoreDriver {
 
             final String sdcId = getConnectedSdc(dataStore.getId(), 
host.getId());
             if (StringUtils.isBlank(sdcId)) {
-                throw new CloudRuntimeException("Unable to revoke access for 
volume: " + volumePath + ", no Sdc connected with host ip: " + 
host.getPrivateIpAddress());
+                LOGGER.warn(String.format("Unable to revoke access for volume: 
%s, no Sdc connected with host ip: %s", volumePath, 
host.getPrivateIpAddress()));
+                return;
             }
 
             final ScaleIOGatewayClient client = 
getScaleIOClient(dataStore.getId());
             client.unmapVolumeFromSdc(ScaleIOUtil.getVolumePath(volumePath), 
sdcId);
+            if (client.listVolumesMappedToSdc(sdcId).isEmpty()) {
+                sdcManager = ComponentContext.inject(sdcManager);
+                sdcManager.stopSDC(host, dataStore);
+            }
         } catch (Exception e) {
             LOGGER.warn("Failed to revoke access due to: " + e.getMessage(), 
e);
         }
@@ -1363,6 +1355,28 @@ public class ScaleIOPrimaryDataStoreDriver implements 
PrimaryDataStoreDriver {
         return true;
     }
 
+    @Override
+    public boolean poolProvidesCustomStorageStats() {
+        return true;
+    }
+
+    @Override
+    public Map<String, String> getCustomStorageStats(StoragePool pool) {
+        Preconditions.checkArgument(pool != null, "pool cannot be null");
+        Map<String, String> customStats = new HashMap<>();
+
+        try {
+            final ScaleIOGatewayClient client = getScaleIOClient(pool.getId());
+            int connectedSdcsCount = client.getConnectedSdcsCount();
+            customStats.put(ScaleIOUtil.CONNECTED_SDC_COUNT_STAT, 
String.valueOf(connectedSdcsCount));
+        } catch (Exception e) {
+            String errMsg = "Unable to get custom storage stats for the pool: 
" + pool.getId() + " due to " + e.getMessage();
+            LOGGER.error(errMsg);
+        }
+
+        return customStats;
+    }
+
     @Override
     public Pair<Long, Long> getStorageStats(StoragePool storagePool) {
         Preconditions.checkArgument(storagePool != null, "storagePool cannot 
be null");
@@ -1375,7 +1389,7 @@ public class ScaleIOPrimaryDataStoreDriver implements 
PrimaryDataStoreDriver {
                 Long usedBytes = poolStatistics.getNetUsedCapacityInBytes();
                 return new Pair<Long, Long>(capacityBytes, usedBytes);
             }
-        }  catch (Exception e) {
+        } catch (Exception e) {
             String errMsg = "Unable to get storage stats for the pool: " + 
storagePool.getId() + " due to " + e.getMessage();
             LOGGER.warn(errMsg);
             throw new CloudRuntimeException(errMsg, e);
@@ -1430,6 +1444,16 @@ public class ScaleIOPrimaryDataStoreDriver implements 
PrimaryDataStoreDriver {
         }
     }
 
+    @Override
+    public boolean canHostPrepareStoragePoolAccess(Host host, StoragePool 
pool) {
+        if (host == null || pool == null) {
+            return false;
+        }
+
+        sdcManager = ComponentContext.inject(sdcManager);
+        return sdcManager.areSDCConnectionsWithinLimit(pool.getId());
+    }
+
     private void alertHostSdcDisconnection(Host host) {
         if (host == null) {
             return;
diff --git 
a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/ScaleIOPrimaryDataStoreLifeCycle.java
 
b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/ScaleIOPrimaryDataStoreLifeCycle.java
index 17150699923..2d7aca11f84 100644
--- 
a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/ScaleIOPrimaryDataStoreLifeCycle.java
+++ 
b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/ScaleIOPrimaryDataStoreLifeCycle.java
@@ -260,8 +260,6 @@ public class ScaleIOPrimaryDataStoreLifeCycle implements 
PrimaryDataStoreLifeCyc
             throw new CloudRuntimeException("Unsupported hypervisor type: " + 
cluster.getHypervisorType().toString());
         }
 
-        checkConnectedSdcs(dataStore.getId());
-
         PrimaryDataStoreInfo primaryDataStoreInfo = (PrimaryDataStoreInfo) 
dataStore;
         List<HostVO> hostsInCluster = 
resourceManager.listAllUpAndEnabledHosts(Host.Type.Routing, 
primaryDataStoreInfo.getClusterId(),
                 primaryDataStoreInfo.getPodId(), 
primaryDataStoreInfo.getDataCenterId());
@@ -278,14 +276,12 @@ public class ScaleIOPrimaryDataStoreLifeCycle implements 
PrimaryDataStoreLifeCyc
                     poolHosts.add(host);
                 }
             } catch (Exception e) {
-                LOGGER.warn("Unable to establish a connection between " + host 
+ " and " + primaryDataStoreInfo, e);
+                LOGGER.warn("Unable to establish a connection between host: " 
+ host + " and pool: " + dataStore + "on the cluster: " + 
primaryDataStoreInfo.getClusterId(), e);
             }
         }
 
         if (poolHosts.isEmpty()) {
             LOGGER.warn("No host can access storage pool '" + 
primaryDataStoreInfo + "' on cluster '" + primaryDataStoreInfo.getClusterId() + 
"'.");
-            primaryDataStoreDao.expunge(primaryDataStoreInfo.getId());
-            throw new CloudRuntimeException("Failed to create storage pool in 
the cluster: " + primaryDataStoreInfo.getClusterId() + " as it is not 
accessible to hosts");
         }
 
         dataStoreHelper.attachCluster(dataStore);
@@ -303,8 +299,6 @@ public class ScaleIOPrimaryDataStoreLifeCycle implements 
PrimaryDataStoreLifeCyc
             throw new CloudRuntimeException("Unsupported hypervisor type: " + 
hypervisorType.toString());
         }
 
-        checkConnectedSdcs(dataStore.getId());
-
         LOGGER.debug("Attaching the pool to each of the hosts in the zone: " + 
scope.getScopeId());
         List<HostVO> hosts = 
resourceManager.listAllUpAndEnabledHostsInOneZoneByHypervisor(hypervisorType, 
scope.getScopeId());
         List<HostVO> poolHosts = new ArrayList<HostVO>();
@@ -314,35 +308,17 @@ public class ScaleIOPrimaryDataStoreLifeCycle implements 
PrimaryDataStoreLifeCyc
                     poolHosts.add(host);
                 }
             } catch (Exception e) {
-                LOGGER.warn("Unable to establish a connection between " + host 
+ " and " + dataStore, e);
+                LOGGER.warn("Unable to establish a connection between host: " 
+ host + " and pool: " + dataStore + "in the zone: " + scope.getScopeId(), e);
             }
         }
         if (poolHosts.isEmpty()) {
-            LOGGER.warn("No host can access storage pool " + dataStore + " in 
this zone.");
-            primaryDataStoreDao.expunge(dataStore.getId());
-            throw new CloudRuntimeException("Failed to create storage pool as 
it is not accessible to hosts.");
+            LOGGER.warn("No host can access storage pool " + dataStore + " in 
the zone: " + scope.getScopeId());
         }
 
         dataStoreHelper.attachZone(dataStore);
         return true;
     }
 
-    private void checkConnectedSdcs(Long dataStoreId) {
-        boolean haveConnectedSdcs = false;
-        try {
-            ScaleIOGatewayClient client = 
ScaleIOGatewayClientConnectionPool.getInstance().getClient(dataStoreId, 
storagePoolDetailsDao);
-            haveConnectedSdcs = client.haveConnectedSdcs();
-        } catch (NoSuchAlgorithmException | KeyManagementException | 
URISyntaxException e) {
-            LOGGER.error(String.format("Failed to create storage pool for 
datastore: %s", dataStoreId), e);
-            throw new CloudRuntimeException(String.format("Failed to establish 
connection with PowerFlex Gateway to create storage pool for datastore: %s", 
dataStoreId));
-        }
-
-        if (!haveConnectedSdcs) {
-            LOGGER.debug(String.format("No connected SDCs found for the 
PowerFlex storage pool of datastore: %s", dataStoreId));
-            throw new CloudRuntimeException(String.format("Failed to create 
storage pool as connected SDCs not found for datastore: %s", dataStoreId));
-        }
-    }
-
     @Override
     public boolean maintain(DataStore store) {
         storagePoolAutomation.maintain(store);
diff --git 
a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/manager/ScaleIOSDCManager.java
 
b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/manager/ScaleIOSDCManager.java
new file mode 100644
index 00000000000..696643cb17a
--- /dev/null
+++ 
b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/manager/ScaleIOSDCManager.java
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.storage.datastore.manager;
+
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+
+import com.cloud.host.Host;
+
+public interface ScaleIOSDCManager {
+    /**
+     * Checks SDC connections limit.
+     * @param storagePoolId the storage pool id
+     * @return true if SDC connections are within limit
+     */
+    boolean areSDCConnectionsWithinLimit(Long storagePoolId);
+
+    /**
+     * Prepares/starts the SDC on the host.
+     * @param host the host
+     * @param dataStore the datastore
+     * @return SDC Id of the host
+     */
+    String prepareSDC(Host host, DataStore dataStore);
+
+    /**
+     * Stops the SDC on the host.
+     * @param host the host
+     * @param dataStore the datastore
+     * @return true if SDC stopped on the host
+     */
+    boolean stopSDC(Host host, DataStore dataStore);
+}
diff --git 
a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/manager/ScaleIOSDCManagerImpl.java
 
b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/manager/ScaleIOSDCManagerImpl.java
new file mode 100644
index 00000000000..b121a1da66f
--- /dev/null
+++ 
b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/manager/ScaleIOSDCManagerImpl.java
@@ -0,0 +1,346 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.storage.datastore.manager;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.inject.Inject;
+
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import org.apache.cloudstack.engine.subsystem.api.storage.PrimaryDataStore;
+import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
+import org.apache.cloudstack.storage.datastore.client.ScaleIOGatewayClient;
+import 
org.apache.cloudstack.storage.datastore.client.ScaleIOGatewayClientConnectionPool;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolDetailsDao;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Logger;
+import org.springframework.stereotype.Component;
+
+import com.cloud.agent.AgentManager;
+import com.cloud.agent.api.Answer;
+import com.cloud.agent.api.PrepareStorageClientAnswer;
+import com.cloud.agent.api.PrepareStorageClientCommand;
+import com.cloud.agent.api.UnprepareStorageClientCommand;
+import com.cloud.configuration.Config;
+import com.cloud.exception.AgentUnavailableException;
+import com.cloud.exception.OperationTimedoutException;
+import com.cloud.host.Host;
+import com.cloud.storage.StorageManager;
+import com.cloud.storage.StoragePoolHostVO;
+import com.cloud.storage.dao.StoragePoolHostDao;
+import com.cloud.utils.NumbersUtil;
+import com.cloud.utils.db.GlobalLock;
+import com.cloud.utils.exception.CloudRuntimeException;
+
+@Component
+public class ScaleIOSDCManagerImpl implements ScaleIOSDCManager {
+    private static final Logger LOGGER = 
Logger.getLogger(ScaleIOSDCManagerImpl.class);
+
+    @Inject
+    AgentManager agentManager;
+    @Inject
+    StoragePoolHostDao storagePoolHostDao;
+    @Inject
+    StoragePoolDetailsDao storagePoolDetailsDao;
+    @Inject
+    ConfigurationDao configDao;
+
+    private static final String POWERFLEX_SDC_HOSTID_SYSTEMID_LOCK_FORMAT = 
"PowerFlexSDC-HostId:%s-SystemId:%s";
+    private static final String POWERFLEX_SDC_SYSTEMID_LOCK_FORMAT = 
"PowerFlexSDC-SystemId:%s";
+
+    public ScaleIOSDCManagerImpl() {
+
+    }
+
+    @Override
+    public boolean areSDCConnectionsWithinLimit(Long storagePoolId) {
+        try {
+            int connectedClientsLimit = 
StorageManager.STORAGE_POOL_CONNECTED_CLIENTS_LIMIT.valueIn(storagePoolId);
+            if (connectedClientsLimit <= 0) {
+                return true;
+            }
+
+            int connectedSdcsCount = 
getScaleIOClient(storagePoolId).getConnectedSdcsCount();
+            if (connectedSdcsCount < connectedClientsLimit) {
+                LOGGER.debug(String.format("Current connected SDCs count: %d - 
SDC connections are within the limit (%d) on PowerFlex Storage with pool id: 
%d", connectedSdcsCount, connectedClientsLimit, storagePoolId));
+                return true;
+            }
+            LOGGER.debug(String.format("Current connected SDCs count: %d - SDC 
connections limit (%d) reached on PowerFlex Storage with pool id: %d", 
connectedSdcsCount, connectedClientsLimit, storagePoolId));
+            return false;
+        } catch (Exception e) {
+            String errMsg = "Unable to check SDC connections for the PowerFlex 
storage pool with id: " + storagePoolId + " due to " + e.getMessage();
+            LOGGER.warn(errMsg, e);
+            return false;
+        }
+    }
+
+    @Override
+    public String prepareSDC(Host host, DataStore dataStore) {
+        String systemId = storagePoolDetailsDao.findDetail(dataStore.getId(), 
ScaleIOGatewayClient.STORAGE_POOL_SYSTEM_ID).getValue();
+        if (systemId == null) {
+            throw new CloudRuntimeException("Unable to prepare SDC, failed to 
get the system id for PowerFlex storage pool: " + dataStore.getName());
+        }
+
+        GlobalLock hostIdStorageSystemIdLock = null;
+        GlobalLock storageSystemIdLock = null;
+        try {
+            String hostIdStorageSystemIdLockString = 
String.format(POWERFLEX_SDC_HOSTID_SYSTEMID_LOCK_FORMAT, host.getId(), 
systemId);
+            hostIdStorageSystemIdLock = 
GlobalLock.getInternLock(hostIdStorageSystemIdLockString);
+            if (hostIdStorageSystemIdLock == null) {
+                throw new CloudRuntimeException("Unable to prepare SDC, 
couldn't get global lock on " + hostIdStorageSystemIdLockString);
+            }
+
+            int storagePoolMaxWaitSeconds = 
NumbersUtil.parseInt(configDao.getValue(Config.StoragePoolMaxWaitSeconds.key()),
 3600);
+            if (!hostIdStorageSystemIdLock.lock(storagePoolMaxWaitSeconds)) {
+                LOGGER.debug("Unable to prepare SDC, couldn't lock on " + 
hostIdStorageSystemIdLockString);
+                throw new CloudRuntimeException("Unable to prepare SDC, 
couldn't lock on " + hostIdStorageSystemIdLockString);
+            }
+
+            long poolId = dataStore.getId();
+            long hostId = host.getId();
+            String sdcId = getConnectedSdc(poolId, hostId);
+            if (StringUtils.isNotBlank(sdcId)) {
+                LOGGER.debug(String.format("SDC %s already connected for the 
pool: %d on host: %d, no need to prepare/start it", sdcId, poolId, hostId));
+                return sdcId;
+            }
+
+            String storageSystemIdLockString = 
String.format(POWERFLEX_SDC_SYSTEMID_LOCK_FORMAT, systemId);
+            storageSystemIdLock = 
GlobalLock.getInternLock(storageSystemIdLockString);
+            if (storageSystemIdLock == null) {
+                LOGGER.error("Unable to prepare SDC, couldn't get global lock 
on: " + storageSystemIdLockString);
+                throw new CloudRuntimeException("Unable to prepare SDC, 
couldn't get global lock on " + storageSystemIdLockString);
+            }
+
+            if (!storageSystemIdLock.lock(storagePoolMaxWaitSeconds)) {
+                LOGGER.error("Unable to prepare SDC, couldn't lock on " + 
storageSystemIdLockString);
+                throw new CloudRuntimeException("Unable to prepare SDC, 
couldn't lock on " + storageSystemIdLockString);
+            }
+
+            if (!areSDCConnectionsWithinLimit(poolId)) {
+                String errorMsg = String.format("Unable to check SDC 
connections or the connections limit reached for Powerflex storage (System ID: 
%s)", systemId);
+                LOGGER.error(errorMsg);
+                throw new CloudRuntimeException(errorMsg);
+            }
+
+            sdcId = prepareSDCOnHost(host, dataStore, systemId);
+            StoragePoolHostVO storagePoolHost = 
storagePoolHostDao.findByPoolHost(poolId, hostId);
+
+            if (StringUtils.isBlank(sdcId)) {
+                if (storagePoolHost != null) {
+                    storagePoolHostDao.deleteStoragePoolHostDetails(hostId, 
poolId);
+                }
+            } else {
+                if (storagePoolHost == null) {
+                    storagePoolHost = new StoragePoolHostVO(poolId, hostId, 
sdcId);
+                    storagePoolHostDao.persist(storagePoolHost);
+                } else {
+                    storagePoolHost.setLocalPath(sdcId);
+                    storagePoolHostDao.update(storagePoolHost.getId(), 
storagePoolHost);
+                }
+            }
+
+            int waitTimeInSecs = 15; // Wait for 15 secs (usual tests with SDC 
service start took 10-15 secs)
+            if (hostSdcConnected(sdcId, poolId, waitTimeInSecs)) {
+                return sdcId;
+            }
+            return null;
+        } finally {
+            if (storageSystemIdLock != null) {
+                storageSystemIdLock.unlock();
+                storageSystemIdLock.releaseRef();
+            }
+            if (hostIdStorageSystemIdLock != null) {
+                hostIdStorageSystemIdLock.unlock();
+                hostIdStorageSystemIdLock.releaseRef();
+            }
+        }
+    }
+
+    private String prepareSDCOnHost(Host host, DataStore dataStore, String 
systemId) {
+        LOGGER.debug(String.format("Preparing SDC on the host %s (%s)", 
host.getId(), host.getName()));
+        Map<String,String> details = new HashMap<>();
+        details.put(ScaleIOGatewayClient.STORAGE_POOL_SYSTEM_ID, systemId);
+        PrepareStorageClientCommand cmd = new 
PrepareStorageClientCommand(((PrimaryDataStore) dataStore).getPoolType(), 
dataStore.getUuid(), details);
+        int timeoutSeconds = 60;
+        cmd.setWait(timeoutSeconds);
+
+        PrepareStorageClientAnswer prepareStorageClientAnswer;
+        try {
+            prepareStorageClientAnswer = (PrepareStorageClientAnswer) 
agentManager.send(host.getId(), cmd);
+        } catch (AgentUnavailableException | OperationTimedoutException e) {
+            String err = String.format("Failed to prepare SDC on the host %s, 
due to: %s", host.getName(), e.getMessage());
+            LOGGER.error(err);
+            throw new CloudRuntimeException(err);
+        }
+
+        if (prepareStorageClientAnswer == null) {
+            String err = String.format("Unable to prepare SDC on the host %s", 
host.getName());
+            LOGGER.error(err);
+            throw new CloudRuntimeException(err);
+        }
+
+        if (!prepareStorageClientAnswer.getResult()) {
+            String err = String.format("Unable to prepare SDC on the host %s, 
due to: %s", host.getName(), prepareStorageClientAnswer.getDetails());
+            LOGGER.error(err);
+            throw new CloudRuntimeException(err);
+        }
+
+        Map<String,String> poolDetails = 
prepareStorageClientAnswer.getDetailsMap();
+        if (MapUtils.isEmpty(poolDetails)) {
+            LOGGER.warn(String.format("PowerFlex storage SDC details not found 
on the host: %s, try (re)install SDC and restart agent", host.getId()));
+            return null;
+        }
+
+        String sdcId = null;
+        if (poolDetails.containsKey(ScaleIOGatewayClient.SDC_ID)) {
+            sdcId = poolDetails.get(ScaleIOGatewayClient.SDC_ID);
+        } else if (poolDetails.containsKey(ScaleIOGatewayClient.SDC_GUID)) {
+            String sdcGuid = poolDetails.get(ScaleIOGatewayClient.SDC_GUID);
+            sdcId = getHostSdcId(sdcGuid, dataStore.getId());
+        }
+
+        if (StringUtils.isBlank(sdcId)) {
+            LOGGER.warn(String.format("Couldn't retrieve PowerFlex storage SDC 
details from the host: %s, try (re)install SDC and restart agent", 
host.getId()));
+            return null;
+        }
+
+        return sdcId;
+    }
+
+    @Override
+    public boolean stopSDC(Host host, DataStore dataStore) {
+        String systemId = storagePoolDetailsDao.findDetail(dataStore.getId(), 
ScaleIOGatewayClient.STORAGE_POOL_SYSTEM_ID).getValue();
+        if (systemId == null) {
+            throw new CloudRuntimeException("Unable to unprepare SDC, failed 
to get the system id for PowerFlex storage pool: " + dataStore.getName());
+        }
+
+        GlobalLock lock = null;
+        try {
+            String hostIdStorageSystemIdLockString = 
String.format(POWERFLEX_SDC_HOSTID_SYSTEMID_LOCK_FORMAT, host.getId(), 
systemId);
+            lock = GlobalLock.getInternLock(hostIdStorageSystemIdLockString);
+            if (lock == null) {
+                throw new CloudRuntimeException("Unable to unprepare SDC, 
couldn't get global lock on " + hostIdStorageSystemIdLockString);
+            }
+
+            int storagePoolMaxWaitSeconds = 
NumbersUtil.parseInt(configDao.getValue(Config.StoragePoolMaxWaitSeconds.key()),
 3600);
+            if (!lock.lock(storagePoolMaxWaitSeconds)) {
+                LOGGER.debug("Unable to unprepare SDC, couldn't lock on " + 
hostIdStorageSystemIdLockString);
+                throw new CloudRuntimeException("Unable to unprepare SDC, 
couldn't lock on " + hostIdStorageSystemIdLockString);
+            }
+
+            long poolId = dataStore.getId();
+            long hostId = host.getId();
+            String sdcId = getConnectedSdc(poolId, hostId);
+            if (StringUtils.isBlank(sdcId)) {
+                LOGGER.debug("SDC not connected, no need to unprepare it");
+                return true;
+            }
+
+            return unprepareSDCOnHost(host, dataStore);
+        } finally {
+            if (lock != null) {
+                lock.unlock();
+                lock.releaseRef();
+            }
+        }
+    }
+
+    private boolean unprepareSDCOnHost(Host host, DataStore dataStore) {
+        LOGGER.debug(String.format("Unpreparing SDC on the host %s (%s)", 
host.getId(), host.getName()));
+        UnprepareStorageClientCommand cmd = new 
UnprepareStorageClientCommand(((PrimaryDataStore) dataStore).getPoolType(), 
dataStore.getUuid());
+        int timeoutSeconds = 60;
+        cmd.setWait(timeoutSeconds);
+
+        Answer unprepareStorageClientAnswer;
+        try {
+            unprepareStorageClientAnswer = agentManager.send(host.getId(), 
cmd);
+        } catch (AgentUnavailableException | OperationTimedoutException e) {
+            String err = String.format("Failed to unprepare SDC on the host %s 
due to: %s", host.getName(), e.getMessage());
+            LOGGER.error(err);
+            return false;
+        }
+
+        if (!unprepareStorageClientAnswer.getResult()) {
+            String err = String.format("Unable to unprepare SDC on the the 
host %s due to: %s", host.getName(), unprepareStorageClientAnswer.getDetails());
+            LOGGER.error(err);
+            return false;
+        }
+        return true;
+    }
+
+    private String getHostSdcId(String sdcGuid, long poolId) {
+        try {
+            LOGGER.debug(String.format("Try to get host SDC Id for pool: %s, 
with SDC guid %s", poolId, sdcGuid));
+            ScaleIOGatewayClient client = getScaleIOClient(poolId);
+            return client.getSdcIdByGuid(sdcGuid);
+        } catch (Exception e) {
+            LOGGER.error(String.format("Failed to get host SDC Id for pool: 
%s", poolId), e);
+            throw new CloudRuntimeException(String.format("Failed to establish 
connection with PowerFlex Gateway to get host SDC Id for pool: %s", poolId));
+        }
+    }
+
+    private String getConnectedSdc(long poolId, long hostId) {
+        try {
+            StoragePoolHostVO poolHostVO = 
storagePoolHostDao.findByPoolHost(poolId, hostId);
+            if (poolHostVO == null) {
+                return null;
+            }
+
+            final ScaleIOGatewayClient client = getScaleIOClient(poolId);
+            if (client.isSdcConnected(poolHostVO.getLocalPath())) {
+                return poolHostVO.getLocalPath();
+            }
+        } catch (Exception e) {
+            LOGGER.warn("Unable to get connected SDC for the host: " + hostId 
+ " and storage pool: " + poolId + " due to " + e.getMessage(), e);
+        }
+
+        return null;
+    }
+
+    private boolean hostSdcConnected(String sdcId, long poolId, int 
waitTimeInSecs) {
+        LOGGER.debug(String.format("Waiting (for %d secs) for the SDC %s of 
the pool id: %d to connect", waitTimeInSecs, sdcId, poolId));
+        int timeBetweenTries = 1000; // Try more frequently (every sec) and 
return early if connected
+        while (waitTimeInSecs > 0) {
+            if (isHostSdcConnected(sdcId, poolId)) {
+                return true;
+            }
+            waitTimeInSecs--;
+            try {
+                Thread.sleep(timeBetweenTries);
+            } catch (Exception ignore) {
+            }
+        }
+        return isHostSdcConnected(sdcId, poolId);
+    }
+
+    private boolean isHostSdcConnected(String sdcId, long poolId) {
+        try {
+            final ScaleIOGatewayClient client = getScaleIOClient(poolId);
+            return client.isSdcConnected(sdcId);
+        } catch (Exception e) {
+            LOGGER.error("Failed to check host SDC connection", e);
+            throw new CloudRuntimeException("Failed to establish connection 
with PowerFlex Gateway to check host SDC connection");
+        }
+    }
+
+    private ScaleIOGatewayClient getScaleIOClient(final Long storagePoolId) 
throws Exception {
+        return 
ScaleIOGatewayClientConnectionPool.getInstance().getClient(storagePoolId, 
storagePoolDetailsDao);
+    }
+}
diff --git 
a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/provider/ScaleIOHostListener.java
 
b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/provider/ScaleIOHostListener.java
index bb269e85a95..f812ed8cce2 100644
--- 
a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/provider/ScaleIOHostListener.java
+++ 
b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/provider/ScaleIOHostListener.java
@@ -69,12 +69,33 @@ public class ScaleIOHostListener implements 
HypervisorHostListener {
     public boolean hostConnect(long hostId, long poolId) {
         HostVO host = _hostDao.findById(hostId);
         if (host == null) {
-            s_logger.error("Failed to add host by HostListener as host was not 
found with id : " + hostId);
+            s_logger.error("Failed to connect host by HostListener as host was 
not found with id : " + hostId);
             return false;
         }
 
         StoragePool storagePool = 
(StoragePool)_dataStoreMgr.getDataStore(poolId, DataStoreRole.Primary);
+        StoragePoolHostVO storagePoolHost = 
_storagePoolHostDao.findByPoolHost(poolId, hostId);
+        String sdcId = getSdcIdOfHost(host, storagePool);
+        if (StringUtils.isBlank(sdcId)) {
+            if (storagePoolHost != null) {
+                _storagePoolHostDao.deleteStoragePoolHostDetails(hostId, 
poolId);
+            }
+        } else {
+            if (storagePoolHost == null) {
+                storagePoolHost = new StoragePoolHostVO(poolId, hostId, sdcId);
+                _storagePoolHostDao.persist(storagePoolHost);
+            } else {
+                storagePoolHost.setLocalPath(sdcId);
+                _storagePoolHostDao.update(storagePoolHost.getId(), 
storagePoolHost);
+            }
+            s_logger.info("Connection established between storage pool: " + 
storagePool + " and host: " + hostId);
+        }
+        return true;
+    }
 
+    private String getSdcIdOfHost(HostVO host, StoragePool storagePool) {
+        long hostId = host.getId();
+        long poolId = storagePool.getId();
         String systemId = _storagePoolDetailsDao.findDetail(poolId, 
ScaleIOGatewayClient.STORAGE_POOL_SYSTEM_ID).getValue();
         if (systemId == null) {
             throw new CloudRuntimeException("Failed to get the system id for 
PowerFlex storage pool " + storagePool.getName());
@@ -86,10 +107,10 @@ public class ScaleIOHostListener implements 
HypervisorHostListener {
         ModifyStoragePoolAnswer answer  = sendModifyStoragePoolCommand(cmd, 
storagePool, hostId);
         Map<String,String> poolDetails = answer.getPoolInfo().getDetails();
         if (MapUtils.isEmpty(poolDetails)) {
-            String msg = "SDC details not found on the host: " + hostId + ", 
(re)install SDC and restart agent";
+            String msg = "PowerFlex storage SDC details not found on the host: 
" + hostId + ", (re)install SDC and restart agent";
             s_logger.warn(msg);
             _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_HOST, 
host.getDataCenterId(), host.getPodId(), "SDC not found on host: " + 
host.getUuid(), msg);
-            return false;
+            return null;
         }
 
         String sdcId = null;
@@ -101,30 +122,13 @@ public class ScaleIOHostListener implements 
HypervisorHostListener {
         }
 
         if (StringUtils.isBlank(sdcId)) {
-            String msg = "Couldn't retrieve SDC details from the host: " + 
hostId + ", (re)install SDC and restart agent";
+            String msg = "Couldn't retrieve PowerFlex storage SDC details from 
the host: " + hostId + ", (re)install SDC and restart agent";
             s_logger.warn(msg);
             _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_HOST, 
host.getDataCenterId(), host.getPodId(), "SDC details not found on host: " + 
host.getUuid(), msg);
-            return false;
-        }
-
-        if (!isHostSdcConnected(sdcId, poolId)) {
-            s_logger.warn("SDC not connected on the host: " + hostId);
-            String msg = "SDC not connected on the host: " + hostId + ", 
reconnect the SDC to MDM and restart agent";
-            _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_HOST, 
host.getDataCenterId(), host.getPodId(), "SDC disconnected on host: " + 
host.getUuid(), msg);
-            return false;
+            return null;
         }
 
-        StoragePoolHostVO storagePoolHost = 
_storagePoolHostDao.findByPoolHost(poolId, hostId);
-        if (storagePoolHost == null) {
-            storagePoolHost = new StoragePoolHostVO(poolId, hostId, sdcId);
-            _storagePoolHostDao.persist(storagePoolHost);
-        } else {
-            storagePoolHost.setLocalPath(sdcId);
-            _storagePoolHostDao.update(storagePoolHost.getId(), 
storagePoolHost);
-        }
-
-        s_logger.info("Connection established between storage pool: " + 
storagePool + " and host: " + hostId);
-        return true;
+        return sdcId;
     }
 
     private String getHostSdcId(String sdcGuid, long poolId) {
@@ -138,16 +142,6 @@ public class ScaleIOHostListener implements 
HypervisorHostListener {
         }
     }
 
-    private boolean isHostSdcConnected(String sdcId, long poolId) {
-        try {
-            ScaleIOGatewayClient client = 
ScaleIOGatewayClientConnectionPool.getInstance().getClient(poolId, 
_storagePoolDetailsDao);
-            return client.isSdcConnected(sdcId);
-        } catch (NoSuchAlgorithmException | KeyManagementException | 
URISyntaxException e) {
-            s_logger.error("Failed to check host sdc connection", e);
-            throw new CloudRuntimeException("Failed to establish connection 
with PowerFlex Gateway to check host sdc connection");
-        }
-    }
-
     private ModifyStoragePoolAnswer 
sendModifyStoragePoolCommand(ModifyStoragePoolCommand cmd, StoragePool 
storagePool, long hostId) {
         Answer answer = _agentMgr.easySend(hostId, cmd);
 
@@ -156,15 +150,15 @@ public class ScaleIOHostListener implements 
HypervisorHostListener {
         }
 
         if (!answer.getResult()) {
-            String msg = "Unable to attach storage pool " + 
storagePool.getId() + " to host " + hostId;
+            String msg = "Unable to attach  PowerFlex storage pool " + 
storagePool.getId() + " to host " + hostId;
 
             _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_HOST, 
storagePool.getDataCenterId(), storagePool.getPodId(), msg, msg);
 
-            throw new CloudRuntimeException("Unable to establish a connection 
from agent to storage pool " + storagePool.getId() + " due to " + 
answer.getDetails() +
+            throw new CloudRuntimeException("Unable to establish a connection 
from agent to  PowerFlex storage pool " + storagePool.getId() + " due to " + 
answer.getDetails() +
                     " (" + storagePool.getId() + ")");
         }
 
-        assert (answer instanceof ModifyStoragePoolAnswer) : 
"ModifyStoragePoolAnswer expected ; Pool = " + storagePool.getId() + " Host = " 
+ hostId;
+        assert (answer instanceof ModifyStoragePoolAnswer) : 
"ModifyStoragePoolAnswer expected ; PowerFlex Storage Pool = " + 
storagePool.getId() + " Host = " + hostId;
 
         return (ModifyStoragePoolAnswer) answer;
     }
diff --git 
a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/util/ScaleIOUtil.java
 
b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/util/ScaleIOUtil.java
index 736a43df691..e7b06267a51 100644
--- 
a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/util/ScaleIOUtil.java
+++ 
b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/util/ScaleIOUtil.java
@@ -49,6 +49,16 @@ public class ScaleIOUtil {
 
     private static final String RESCAN_CMD = "drv_cfg --rescan";
 
+    private static final String SDC_SERVICE_STATUS_CMD = "systemctl status 
scini";
+    private static final String SDC_SERVICE_START_CMD = "systemctl start 
scini";
+    private static final String SDC_SERVICE_STOP_CMD = "systemctl stop scini";
+    private static final String SDC_SERVICE_RESTART_CMD = "systemctl restart 
scini";
+
+    private static final String SDC_SERVICE_IS_ACTIVE_CMD = "systemctl 
is-active scini";
+    private static final String SDC_SERVICE_IS_ENABLED_CMD = "systemctl 
is-enabled scini";
+    private static final String SDC_SERVICE_ENABLE_CMD = "systemctl enable 
scini";
+
+    public static final String CONNECTED_SDC_COUNT_STAT = "ConnectedSDCCount";
     /**
      * Cmd for querying volumes in SDC
      * Sample output for cmd: drv_cfg --query_vols:
@@ -182,4 +192,39 @@ public class ScaleIOUtil {
 
         return String.format("%s:%s", volumePath, volumeName);
     }
+
+    public static boolean isSDCServiceInstalled() {
+        int exitValue = 
Script.runSimpleBashScriptForExitValue(SDC_SERVICE_STATUS_CMD);
+        return exitValue != 4;
+    }
+
+    public static boolean isSDCServiceActive() {
+        int exitValue = 
Script.runSimpleBashScriptForExitValue(SDC_SERVICE_IS_ACTIVE_CMD);
+        return exitValue == 0;
+    }
+
+    public static boolean isSDCServiceEnabled() {
+        int exitValue = 
Script.runSimpleBashScriptForExitValue(SDC_SERVICE_IS_ENABLED_CMD);
+        return exitValue == 0;
+    }
+
+    public static boolean enableSDCService() {
+        int exitValue = 
Script.runSimpleBashScriptForExitValue(SDC_SERVICE_ENABLE_CMD);
+        return exitValue == 0;
+    }
+
+    public static boolean startSDCService() {
+        int exitValue = 
Script.runSimpleBashScriptForExitValue(SDC_SERVICE_START_CMD);
+        return exitValue == 0;
+    }
+
+    public static boolean stopSDCService() {
+        int exitValue = 
Script.runSimpleBashScriptForExitValue(SDC_SERVICE_STOP_CMD);
+        return exitValue == 0;
+    }
+
+    public static boolean restartSDCService() {
+        int exitValue = 
Script.runSimpleBashScriptForExitValue(SDC_SERVICE_RESTART_CMD);
+        return exitValue == 0;
+    }
 }
diff --git 
a/plugins/storage/volume/scaleio/src/main/resources/META-INF/cloudstack/storage-volume-scaleio/spring-storage-volume-scaleio-context.xml
 
b/plugins/storage/volume/scaleio/src/main/resources/META-INF/cloudstack/storage-volume-scaleio/spring-storage-volume-scaleio-context.xml
index 8b86e212e29..55e74cddd6f 100755
--- 
a/plugins/storage/volume/scaleio/src/main/resources/META-INF/cloudstack/storage-volume-scaleio/spring-storage-volume-scaleio-context.xml
+++ 
b/plugins/storage/volume/scaleio/src/main/resources/META-INF/cloudstack/storage-volume-scaleio/spring-storage-volume-scaleio-context.xml
@@ -32,4 +32,6 @@
     <bean id="scaleioDataStoreProvider"
           
class="org.apache.cloudstack.storage.datastore.provider.ScaleIOPrimaryDatastoreProvider"
 />
 
+    <bean id="scaleioSDCManager" 
class="org.apache.cloudstack.storage.datastore.manager.ScaleIOSDCManagerImpl" />
+
 </beans>
diff --git 
a/plugins/storage/volume/scaleio/src/test/java/org/apache/cloudstack/storage/datastore/lifecycle/ScaleIOPrimaryDataStoreLifeCycleTest.java
 
b/plugins/storage/volume/scaleio/src/test/java/org/apache/cloudstack/storage/datastore/lifecycle/ScaleIOPrimaryDataStoreLifeCycleTest.java
index 4a6e73a327d..e2f850be7ff 100644
--- 
a/plugins/storage/volume/scaleio/src/test/java/org/apache/cloudstack/storage/datastore/lifecycle/ScaleIOPrimaryDataStoreLifeCycleTest.java
+++ 
b/plugins/storage/volume/scaleio/src/test/java/org/apache/cloudstack/storage/datastore/lifecycle/ScaleIOPrimaryDataStoreLifeCycleTest.java
@@ -23,6 +23,7 @@ import static com.google.common.truth.Truth.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.when;
@@ -123,9 +124,9 @@ public class ScaleIOPrimaryDataStoreLifeCycleTest {
         ScaleIOGatewayClientImpl client = mock(ScaleIOGatewayClientImpl.class);
         ScaleIOGatewayClientConnectionPool pool = 
mock(ScaleIOGatewayClientConnectionPool.class);
         scaleIOGatewayClientConnectionPoolMocked.when(() -> 
ScaleIOGatewayClientConnectionPool.getInstance()).thenReturn(pool);
-        when(pool.getClient(1L, storagePoolDetailsDao)).thenReturn(client);
+        lenient().when(pool.getClient(1L, 
storagePoolDetailsDao)).thenReturn(client);
 
-        when(client.haveConnectedSdcs()).thenReturn(true);
+        lenient().when(client.haveConnectedSdcs()).thenReturn(true);
 
         final ZoneScope scope = new ZoneScope(1L);
 
diff --git a/server/src/main/java/com/cloud/api/ApiDBUtils.java 
b/server/src/main/java/com/cloud/api/ApiDBUtils.java
index c45e60f8dd5..0d9447d2dd5 100644
--- a/server/src/main/java/com/cloud/api/ApiDBUtils.java
+++ b/server/src/main/java/com/cloud/api/ApiDBUtils.java
@@ -2027,8 +2027,8 @@ public class ApiDBUtils {
         return s_volJoinDao.newVolumeView(vr);
     }
 
-    public static StoragePoolResponse newStoragePoolResponse(StoragePoolJoinVO 
vr) {
-        return s_poolJoinDao.newStoragePoolResponse(vr);
+    public static StoragePoolResponse newStoragePoolResponse(StoragePoolJoinVO 
vr, boolean customStats) {
+        return s_poolJoinDao.newStoragePoolResponse(vr, customStats);
     }
 
     public static StorageTagResponse newStorageTagResponse(StoragePoolTagVO 
vr) {
diff --git a/server/src/main/java/com/cloud/api/ApiResponseHelper.java 
b/server/src/main/java/com/cloud/api/ApiResponseHelper.java
index f482b595b4e..e801d1f9b31 100644
--- a/server/src/main/java/com/cloud/api/ApiResponseHelper.java
+++ b/server/src/main/java/com/cloud/api/ApiResponseHelper.java
@@ -1439,7 +1439,7 @@ public class ApiResponseHelper implements 
ResponseGenerator {
     @Override
     public StoragePoolResponse createStoragePoolResponse(StoragePool pool) {
         List<StoragePoolJoinVO> viewPools = 
ApiDBUtils.newStoragePoolView(pool);
-        List<StoragePoolResponse> listPools = 
ViewResponseHelper.createStoragePoolResponse(viewPools.toArray(new 
StoragePoolJoinVO[viewPools.size()]));
+        List<StoragePoolResponse> listPools = 
ViewResponseHelper.createStoragePoolResponse(false, viewPools.toArray(new 
StoragePoolJoinVO[viewPools.size()]));
         assert listPools != null && listPools.size() == 1 : "There should be 
one storage pool returned";
         return listPools.get(0);
     }
diff --git a/server/src/main/java/com/cloud/api/query/QueryManagerImpl.java 
b/server/src/main/java/com/cloud/api/query/QueryManagerImpl.java
index a3a1a16b160..698c3d7fa33 100644
--- a/server/src/main/java/com/cloud/api/query/QueryManagerImpl.java
+++ b/server/src/main/java/com/cloud/api/query/QueryManagerImpl.java
@@ -2970,7 +2970,7 @@ public class QueryManagerImpl extends 
MutualExclusiveIdsManagerBase implements Q
     public ListResponse<StoragePoolResponse> 
searchForStoragePools(ListStoragePoolsCmd cmd) {
         Pair<List<StoragePoolJoinVO>, Integer> result = 
(ScopeType.HOST.name().equalsIgnoreCase(cmd.getScope()) && cmd.getHostId() != 
null) ?
                 searchForLocalStorages(cmd) : 
searchForStoragePoolsInternal(cmd);
-        return createStoragesPoolResponse(result);
+        return createStoragesPoolResponse(result, cmd.getCustomStats());
     }
 
     private Pair<List<StoragePoolJoinVO>, Integer> 
searchForLocalStorages(ListStoragePoolsCmd cmd) {
@@ -2998,10 +2998,10 @@ public class QueryManagerImpl extends 
MutualExclusiveIdsManagerBase implements Q
         }
     }
 
-    private ListResponse<StoragePoolResponse> 
createStoragesPoolResponse(Pair<List<StoragePoolJoinVO>, Integer> storagePools) 
{
+    private ListResponse<StoragePoolResponse> 
createStoragesPoolResponse(Pair<List<StoragePoolJoinVO>, Integer> storagePools, 
boolean getCustomStats) {
         ListResponse<StoragePoolResponse> response = new ListResponse<>();
 
-        List<StoragePoolResponse> poolResponses = 
ViewResponseHelper.createStoragePoolResponse(storagePools.first().toArray(new 
StoragePoolJoinVO[storagePools.first().size()]));
+        List<StoragePoolResponse> poolResponses = 
ViewResponseHelper.createStoragePoolResponse(getCustomStats, 
storagePools.first().toArray(new 
StoragePoolJoinVO[storagePools.first().size()]));
         Map<String, Long> poolUuidToIdMap = 
storagePools.first().stream().collect(Collectors.toMap(StoragePoolJoinVO::getUuid,
 StoragePoolJoinVO::getId, (a, b) -> a));
         for (StoragePoolResponse poolResponse : poolResponses) {
             DataStore store = 
dataStoreManager.getPrimaryDataStore(poolResponse.getId());
diff --git a/server/src/main/java/com/cloud/api/query/ViewResponseHelper.java 
b/server/src/main/java/com/cloud/api/query/ViewResponseHelper.java
index 934de8a2558..730a6628491 100644
--- a/server/src/main/java/com/cloud/api/query/ViewResponseHelper.java
+++ b/server/src/main/java/com/cloud/api/query/ViewResponseHelper.java
@@ -312,14 +312,14 @@ public class ViewResponseHelper {
         return new ArrayList<VolumeResponse>(vrDataList.values());
     }
 
-    public static List<StoragePoolResponse> 
createStoragePoolResponse(StoragePoolJoinVO... pools) {
+    public static List<StoragePoolResponse> createStoragePoolResponse(boolean 
customStats, StoragePoolJoinVO... pools) {
         LinkedHashMap<Long, StoragePoolResponse> vrDataList = new 
LinkedHashMap<>();
         // Initialise the vrdatalist with the input data
         for (StoragePoolJoinVO vr : pools) {
             StoragePoolResponse vrData = vrDataList.get(vr.getId());
             if (vrData == null) {
                 // first time encountering this vm
-                vrData = ApiDBUtils.newStoragePoolResponse(vr);
+                vrData = ApiDBUtils.newStoragePoolResponse(vr, customStats);
             } else {
                 // update tags
                 vrData = ApiDBUtils.fillStoragePoolDetails(vrData, vr);
diff --git 
a/server/src/main/java/com/cloud/api/query/dao/StoragePoolJoinDao.java 
b/server/src/main/java/com/cloud/api/query/dao/StoragePoolJoinDao.java
index 26ee3f01789..6e0b59492c0 100644
--- a/server/src/main/java/com/cloud/api/query/dao/StoragePoolJoinDao.java
+++ b/server/src/main/java/com/cloud/api/query/dao/StoragePoolJoinDao.java
@@ -28,7 +28,7 @@ import 
org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
 
 public interface StoragePoolJoinDao extends GenericDao<StoragePoolJoinVO, 
Long> {
 
-    StoragePoolResponse newStoragePoolResponse(StoragePoolJoinVO host);
+    StoragePoolResponse newStoragePoolResponse(StoragePoolJoinVO host, boolean 
customStats);
 
     StoragePoolResponse setStoragePoolResponse(StoragePoolResponse response, 
StoragePoolJoinVO host);
 
diff --git 
a/server/src/main/java/com/cloud/api/query/dao/StoragePoolJoinDaoImpl.java 
b/server/src/main/java/com/cloud/api/query/dao/StoragePoolJoinDaoImpl.java
index f3b832d1042..8c828ba2067 100644
--- a/server/src/main/java/com/cloud/api/query/dao/StoragePoolJoinDaoImpl.java
+++ b/server/src/main/java/com/cloud/api/query/dao/StoragePoolJoinDaoImpl.java
@@ -42,6 +42,7 @@ import 
org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolDetailVO;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolDetailsDao;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
+import org.apache.commons.collections.MapUtils;
 import org.apache.cloudstack.utils.jsinterpreter.TagAsRuleHelper;
 import org.apache.log4j.Logger;
 import org.springframework.stereotype.Component;
@@ -49,6 +50,7 @@ import org.springframework.stereotype.Component;
 import javax.inject.Inject;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 @Component
 public class StoragePoolJoinDaoImpl extends GenericDaoBase<StoragePoolJoinVO, 
Long> implements StoragePoolJoinDao {
@@ -100,7 +102,7 @@ public class StoragePoolJoinDaoImpl extends 
GenericDaoBase<StoragePoolJoinVO, Lo
     }
 
     @Override
-    public StoragePoolResponse newStoragePoolResponse(StoragePoolJoinVO pool) {
+    public StoragePoolResponse newStoragePoolResponse(StoragePoolJoinVO pool, 
boolean customStats) {
         StoragePool storagePool = storagePoolDao.findById(pool.getId());
         StoragePoolResponse poolResponse = new StoragePoolResponse();
         poolResponse.setId(pool.getUuid());
@@ -147,6 +149,13 @@ public class StoragePoolJoinDaoImpl extends 
GenericDaoBase<StoragePoolJoinVO, Lo
             PrimaryDataStoreDriver driver = (PrimaryDataStoreDriver) 
store.getDriver();
             long usedIops = driver.getUsedIops(storagePool);
             poolResponse.setAllocatedIops(usedIops);
+
+            if (customStats && driver.poolProvidesCustomStorageStats()) {
+                Map<String, String> storageCustomStats = 
driver.getCustomStorageStats(storagePool);
+                if (MapUtils.isNotEmpty(storageCustomStats)) {
+                    poolResponse.setCustomStats(storageCustomStats);
+                }
+            }
         }
 
         // TODO: StatsCollector does not persist data
diff --git 
a/server/src/main/java/com/cloud/deploy/DeploymentPlanningManagerImpl.java 
b/server/src/main/java/com/cloud/deploy/DeploymentPlanningManagerImpl.java
index 14667c2d47d..1f1a9c22358 100644
--- a/server/src/main/java/com/cloud/deploy/DeploymentPlanningManagerImpl.java
+++ b/server/src/main/java/com/cloud/deploy/DeploymentPlanningManagerImpl.java
@@ -1618,6 +1618,15 @@ StateListener<State, VirtualMachine.Event, 
VirtualMachine>, Configurable {
         }
 
         s_logger.debug("Host: " + host.getId() + (hostCanAccessSPool ? " can" 
: " cannot") + " access pool: " + pool.getId());
+        if (!hostCanAccessSPool) {
+            if (_storageMgr.canHostPrepareStoragePoolAccess(host, pool)) {
+                s_logger.debug("Host: " + host.getId() + " can prepare access 
to pool: " + pool.getId());
+                hostCanAccessSPool = true;
+            } else {
+                s_logger.debug("Host: " + host.getId() + " cannot prepare 
access to pool: " + pool.getId());
+            }
+        }
+
         return hostCanAccessSPool;
     }
 
diff --git a/server/src/main/java/com/cloud/storage/StorageManagerImpl.java 
b/server/src/main/java/com/cloud/storage/StorageManagerImpl.java
index a5fd7842964..de3ec02dc7a 100644
--- a/server/src/main/java/com/cloud/storage/StorageManagerImpl.java
+++ b/server/src/main/java/com/cloud/storage/StorageManagerImpl.java
@@ -573,6 +573,31 @@ public class StorageManagerImpl extends ManagerBase 
implements StorageManager, C
         return storeDriver instanceof PrimaryDataStoreDriver && 
((PrimaryDataStoreDriver)storeDriver).canProvideStorageStats();
     }
 
+    @Override
+    public boolean poolProvidesCustomStorageStats(StoragePool pool) {
+        DataStoreProvider storeProvider = 
_dataStoreProviderMgr.getDataStoreProvider(pool.getStorageProviderName());
+        DataStoreDriver storeDriver = storeProvider.getDataStoreDriver();
+        return storeDriver instanceof PrimaryDataStoreDriver && 
((PrimaryDataStoreDriver)storeDriver).poolProvidesCustomStorageStats();
+    }
+
+    @Override
+    public Map<String, String> getCustomStorageStats(StoragePool pool) {
+        if (pool == null) {
+            return null;
+        }
+
+        if (!pool.isManaged()) {
+            return null;
+        }
+
+        DataStoreProvider storeProvider = 
_dataStoreProviderMgr.getDataStoreProvider(pool.getStorageProviderName());
+        DataStoreDriver storeDriver = storeProvider.getDataStoreDriver();
+        if (storeDriver instanceof PrimaryDataStoreDriver) {
+            return 
((PrimaryDataStoreDriver)storeDriver).getCustomStorageStats(pool);
+        }
+        return null;
+    }
+
     @Override
     public Answer getVolumeStats(StoragePool pool, Command cmd) {
         DataStoreProvider storeProvider = 
_dataStoreProviderMgr.getDataStoreProvider(pool.getStorageProviderName());
@@ -2649,6 +2674,21 @@ public class StorageManagerImpl extends ManagerBase 
implements StorageManager, C
         return false;
     }
 
+    @Override
+    public boolean canHostPrepareStoragePoolAccess(Host host, StoragePool 
pool) {
+        if (host == null || pool == null) {
+            return false;
+        }
+
+        if (!pool.isManaged()) {
+            return true;
+        }
+
+        DataStoreProvider storeProvider = 
_dataStoreProviderMgr.getDataStoreProvider(pool.getStorageProviderName());
+        DataStoreDriver storeDriver = storeProvider.getDataStoreDriver();
+        return storeDriver instanceof PrimaryDataStoreDriver && 
((PrimaryDataStoreDriver)storeDriver).canHostPrepareStoragePoolAccess(host, 
pool);
+    }
+
     @Override
     @DB
     public Host getHost(long hostId) {
@@ -3824,6 +3864,7 @@ public class StorageManagerImpl extends ManagerBase 
implements StorageManager, C
                 STORAGE_POOL_DISK_WAIT,
                 STORAGE_POOL_CLIENT_TIMEOUT,
                 STORAGE_POOL_CLIENT_MAX_CONNECTIONS,
+                STORAGE_POOL_CONNECTED_CLIENTS_LIMIT,
                 STORAGE_POOL_IO_POLICY,
                 PRIMARY_STORAGE_DOWNLOAD_WAIT,
                 SecStorageMaxMigrateSessions,

Reply via email to