This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f45bbacc CASSSIDECAR-151: Adds sidecar endpoint for node 
decommissioning operation (#144)
f45bbacc is described below

commit f45bbacc42990e5196c927ed6169d82cd9faf5f6
Author: Arjun Ashok <arjun_as...@apple.com>
AuthorDate: Wed Jan 15 15:11:04 2025 -0800

    CASSSIDECAR-151: Adds sidecar endpoint for node decommissioning operation 
(#144)
    
    Patch by Arjun Ashok; Reviewed by Shailaja Koppu, Yifan Cai, Francisco 
Guerrero for CASSSIDECAR-151
---
 CHANGES.txt                                        |   1 +
 .../adapters/base/CassandraStorageOperations.java  |  20 ++
 .../base/GossipDependentStorageJmxOperations.java  |  12 ++
 .../adapters/base/StorageJmxOperations.java        |  12 ++
 .../cassandra/sidecar/common/ApiEndpointsV1.java   |   2 +-
 .../common/request/NodeDecommissionRequest.java    |  46 +++++
 .../cassandra/sidecar/client/RequestContext.java   |  13 ++
 .../cassandra/sidecar/client/SidecarClient.java    |  13 ++
 .../sidecar/client/SidecarClientTest.java          |  19 ++
 .../sidecar/common/server/StorageOperations.java   |  12 ++
 .../cassandra/sidecar/job/NodeDecommissionJob.java | 100 ++++++++++
 .../cassandra/sidecar/job/OperationalJob.java      |  35 ++--
 .../sidecar/job/OperationalJobManager.java         |  30 ++-
 .../sidecar/job/OperationalJobTracker.java         |  29 ++-
 .../sidecar/routes/ListOperationalJobsHandler.java |   2 +-
 .../sidecar/routes/NodeDecommissionHandler.java    | 110 +++++++++++
 .../sidecar/routes/OperationalJobHandler.java      |  56 +++---
 .../cassandra/sidecar/server/MainModule.java       |   5 +
 .../sidecar/utils/OperationalJobUtils.java         |  61 ++++++
 ...kenZeroElectorateMembershipIntegrationTest.java |   2 +
 .../routes/NodeDecommissionIntegrationTest.java    | 130 +++++++++++++
 .../sidecar/testing/BootstrapBBUtils.java          |  16 ++
 .../sidecar/testing/IntegrationTestBase.java       |  38 +++-
 .../sidecar/job/OperationalJobManagerTest.java     |  29 +--
 .../cassandra/sidecar/job/OperationalJobTest.java  |  18 ++
 .../sidecar/job/OperationalJobTrackerTest.java     |  46 ++---
 .../routes/ListOperationalJobsHandlerTest.java     |   6 +
 .../routes/NodeDecommissionHandlerTest.java        | 207 +++++++++++++++++++++
 .../sidecar/routes/OperationalJobHandlerTest.java  |   3 +
 29 files changed, 965 insertions(+), 108 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 0e71aa10..0bde3ab1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.0.0
 -----
+ * Adds sidecar endpoint for node decommissioning operation (CASSANDRASC-151)
  * Rename field in ListCdcSegmentsResponse (CASSSIDECAR-184)
  * Ensure memory consistency from PeriodicTask executions and expose richer 
ScheduleDecision (CASSSIDECAR-181)
  * Refactor access to delegate methods to simplify (CASSSIDECAR-182)
diff --git 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
index 4f2e1b4f..49b3ec9d 100644
--- 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
+++ 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
@@ -213,4 +213,24 @@ public class CassandraStorageOperations implements 
StorageOperations
         jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME)
                  .forceKeyspaceCleanup(concurrency, keyspace, table);
     }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public String operationMode()
+    {
+        StorageJmxOperations ssProxy = 
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+        return ssProxy.getOperationMode();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void decommission(boolean force)
+    {
+        StorageJmxOperations ssProxy = 
jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+        ssProxy.decommission(force);
+    }
 }
diff --git 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java
 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java
index 5b641ee8..bbdb29c4 100644
--- 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java
+++ 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java
@@ -159,4 +159,16 @@ public class GossipDependentStorageJmxOperations 
implements StorageJmxOperations
         LOGGER.warn("Gossip is disabled and unavailable for the operation");
         throw new OperationUnavailableException("Gossip is required for the 
operation but it is disabled");
     }
+
+    @Override
+    public void decommission(boolean force) throws IllegalStateException, 
IllegalArgumentException, UnsupportedOperationException
+    {
+        delegate.decommission(force);
+    }
+
+    @Override
+    public String getOperationMode()
+    {
+        return delegate.getOperationMode();
+    }
 }
diff --git 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java
 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java
index d8f94125..ca18db04 100644
--- 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java
+++ 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java
@@ -159,4 +159,16 @@ public interface StorageJmxOperations
      * @throws InterruptedException it does not really throw but declared in 
MBean
      */
     int forceKeyspaceCleanup(int jobs, String keyspaceName, String... tables) 
throws IOException, ExecutionException, InterruptedException;
+
+    /**
+     * Triggers the node decommission operation
+     * @param force force decommission, bypassing RF checks, when this flag is 
set
+     */
+    void decommission(boolean force) throws IllegalStateException, 
IllegalArgumentException, UnsupportedOperationException;
+
+    /**
+     * Fetch the operation-mode of the node
+     * @return string representation of theoperation-mode
+     */
+    String getOperationMode();
 }
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
index 108bc6e1..e608913e 100644
--- 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
@@ -125,7 +125,7 @@ public final class ApiEndpointsV1
     public static final String PER_OPERATIONAL_JOB = OPERATIONAL_JOBS + '/' + 
OPERATIONAL_JOB_ID_PATH_PARAM;
     public static final String LIST_OPERATIONAL_JOBS_ROUTE = API_V1 + 
CASSANDRA + OPERATIONAL_JOBS;
     public static final String OPERATIONAL_JOB_ROUTE = API_V1 + CASSANDRA + 
PER_OPERATIONAL_JOB;
-
+    public static final String NODE_DECOMMISSION_ROUTE = API_V1 + CASSANDRA + 
"/operations/decommission";
     private ApiEndpointsV1()
     {
         throw new IllegalStateException(getClass() + " is a constants 
container and shall not be instantiated");
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeDecommissionRequest.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeDecommissionRequest.java
new file mode 100644
index 00000000..48269f28
--- /dev/null
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeDecommissionRequest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.request;
+
+import io.netty.handler.codec.http.HttpMethod;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+
+/**
+ * Represents a request to execute node decommission operation
+ */
+public class NodeDecommissionRequest extends 
JsonRequest<OperationalJobResponse>
+{
+    /**
+     * Constructs a request to execute a node decommission operation
+     */
+    public NodeDecommissionRequest()
+    {
+        super(ApiEndpointsV1.NODE_DECOMMISSION_ROUTE);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public HttpMethod method()
+    {
+        return HttpMethod.PUT;
+    }
+}
diff --git 
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java 
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
index 65fad58d..938ef0ac 100644
--- 
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
+++ 
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
@@ -38,6 +38,7 @@ import 
org.apache.cassandra.sidecar.common.request.GossipInfoRequest;
 import org.apache.cassandra.sidecar.common.request.ImportSSTableRequest;
 import org.apache.cassandra.sidecar.common.request.ListOperationalJobsRequest;
 import org.apache.cassandra.sidecar.common.request.ListSnapshotFilesRequest;
+import org.apache.cassandra.sidecar.common.request.NodeDecommissionRequest;
 import org.apache.cassandra.sidecar.common.request.NodeSettingsRequest;
 import org.apache.cassandra.sidecar.common.request.OperationalJobRequest;
 import org.apache.cassandra.sidecar.common.request.Request;
@@ -76,6 +77,7 @@ public class RequestContext
     protected static final RingRequest RING_REQUEST = new RingRequest();
     protected static final GossipInfoRequest GOSSIP_INFO_REQUEST = new 
GossipInfoRequest();
     protected static final ListOperationalJobsRequest LIST_JOBS_REQUEST = new 
ListOperationalJobsRequest();
+    protected static final NodeDecommissionRequest NODE_DECOMMISSION_REQUEST = 
new NodeDecommissionRequest();
     protected static final RetryPolicy DEFAULT_RETRY_POLICY = new 
NoRetryPolicy();
     protected static final RetryPolicy 
DEFAULT_EXPONENTIAL_BACKOFF_RETRY_POLICY =
     new ExponentialBackoffRetryPolicy(10, 500L, 60_000L);
@@ -512,6 +514,17 @@ public class RequestContext
             return request(LIST_JOBS_REQUEST);
         }
 
+        /**
+         * Sets the {@code request} to be a {@link NodeDecommissionRequest} 
and returns a reference to this Builder
+         * enabling method chaining.
+         *
+         * @return a reference to this Builder
+         */
+        public Builder nodeDecommissionRequest()
+        {
+            return request(NODE_DECOMMISSION_REQUEST);
+        }
+
         /**
          * Sets the {@code retryPolicy} to be an
          * {@link 
org.apache.cassandra.sidecar.client.retry.ExponentialBackoffRetryPolicy} 
configured with
diff --git 
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java 
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
index f896a79e..7be50b5d 100644
--- 
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
+++ 
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
@@ -664,6 +664,19 @@ public class SidecarClient implements AutoCloseable, 
SidecarClientBlobRestoreExt
                                             .build());
     }
 
+    /**
+     * Executes the node decommission request using the default retry policy 
and configured selection policy
+     * @param instance the instance where the request will be executed
+     * @return a completable future of the jobs list
+     */
+    public CompletableFuture<OperationalJobResponse> 
nodeDecommission(SidecarInstance instance)
+    {
+        return executor.executeRequestAsync(requestBuilder()
+                                            
.singleInstanceSelectionPolicy(instance)
+                                            .nodeDecommissionRequest()
+                                            .build());
+    }
+
     /**
      * Returns a copy of the request builder with the default parameters 
configured for the client.
      *
diff --git 
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
 
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
index f32356ae..7fc1f908 100644
--- 
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
+++ 
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
@@ -1324,6 +1324,25 @@ abstract class SidecarClientTest
         }
     }
 
+    @Test
+    public void testNodeDecommission() throws Exception
+    {
+        UUID jobId = UUID.randomUUID();
+        String nodeDecommissionString = "{\"jobId\":\"" + jobId + 
"\",\"jobStatus\":\"SUCCEEDED\",\"instance\":\"127.0.0.1\"}";
+
+        MockResponse response = new MockResponse()
+                                .setResponseCode(OK.code())
+                                .setHeader("content-type", "application/json")
+                                .setBody(nodeDecommissionString);
+        enqueue(response);
+
+        SidecarInstanceImpl sidecarInstance = 
RequestExecutorTest.newSidecarInstance(servers.get(0));
+        OperationalJobResponse result = 
client.nodeDecommission(sidecarInstance).get(30, TimeUnit.SECONDS);
+        assertThat(result).isNotNull();
+        assertThat(result.status()).isEqualTo(OperationalJobStatus.SUCCEEDED);
+        validateResponseServed(ApiEndpointsV1.NODE_DECOMMISSION_ROUTE);
+    }
+
     @Test
     void testFailsWithOneAttemptPerServer()
     {
diff --git 
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java
 
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java
index c13c2a99..26113e3d 100644
--- 
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java
+++ 
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java
@@ -102,4 +102,16 @@ public interface StorageOperations
     {
         outOfRangeDataCleanup(keyspace, table, 1);
     }
+
+    /**
+     * @return the operation-mode of the Cassandra instance
+     */
+    String operationMode();
+
+    /**
+     * Triggers the node decommission operation
+     *
+     * @param force force decommission, bypassing RF checks, when this flag is 
set
+     */
+    void decommission(boolean force);
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/job/NodeDecommissionJob.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/job/NodeDecommissionJob.java
new file mode 100644
index 00000000..32a05fa9
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/job/NodeDecommissionJob.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+
+/**
+ * Implementation of {@link OperationalJob} to perform node decommission 
operation.
+ */
+public class NodeDecommissionJob extends OperationalJob
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(NodeDecommissionJob.class);
+    private static final String OPERATION = "decommission";
+    private final boolean isForce;
+    protected StorageOperations storageOperations;
+
+    public NodeDecommissionJob(UUID jobId, StorageOperations storageOps, 
boolean isForce)
+    {
+        super(jobId);
+        this.storageOperations = storageOps;
+        this.isForce = isForce;
+    }
+
+    @Override
+    public boolean isRunningOnCassandra()
+    {
+        String operationMode = storageOperations.operationMode();
+        return "LEAVING".equals(operationMode) || 
"DECOMMISSIONED".equals(operationMode);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public OperationalJobStatus status()
+    {
+        String operationMode = storageOperations.operationMode();
+
+        if ("LEAVING".equals(operationMode))
+        {
+            return OperationalJobStatus.RUNNING;
+        }
+        else if ("DECOMMISSIONED".equals(operationMode))
+        {
+            return OperationalJobStatus.SUCCEEDED;
+        }
+        else
+        {
+            return super.status();
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected void executeInternal()
+    {
+        if (isRunningOnCassandra())
+        {
+            LOGGER.info("Not executing job as an ongoing or completed 
decommission operation was found jobId={}", this.jobId());
+            return;
+        }
+
+        LOGGER.info("Executing decommission operation. jobId={}", 
this.jobId());
+        storageOperations.decommission(isForce);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public String name()
+    {
+        return OPERATION;
+    }
+}
+
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java 
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java
index 80ec374c..0955176e 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.sidecar.job;
 
 import java.time.Duration;
 import java.util.UUID;
-import java.util.concurrent.TimeoutException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,7 +41,7 @@ public abstract class OperationalJob implements Task<Void>
     private static final Logger LOGGER = 
LoggerFactory.getLogger(OperationalJob.class);
 
     // use v1 time-based uuid
-    public final UUID jobId;
+    private final UUID jobId;
 
     private final Promise<Void> executionPromise;
     private volatile boolean isExecuting = false;
@@ -59,6 +58,11 @@ public abstract class OperationalJob implements Task<Void>
         this.executionPromise = Promise.promise();
     }
 
+    public UUID jobId()
+    {
+        return jobId;
+    }
+
     @Override
     public final Void result()
     {
@@ -95,12 +99,17 @@ public abstract class OperationalJob implements Task<Void>
         return referenceTimestampInMillis - createdAt > ttlInMillis;
     }
 
+    /**
+     * The concrete-job-specific implementation to determine if the job is 
running on the Cassandra node.
+     * @return true if the job is running on the Cassandra node. For example, 
node decommission is tracked by the
+     * operationMode exposed from Cassandra.
+     */
+    public abstract boolean isRunningOnCassandra();
+
     /**
      * Determines the status of the job. OperationalJob subclasses could 
choose to override the method.
      * <p>
      * For long-lived jobs, the implementations should return the {@link 
OperationalJobStatus#RUNNING} status intelligently.
-     * The condition of {@link OperationalJobStatus#RUNNING} is 
implementation-specific.
-     * For example, node decommission is tracked by the operationMode exposed 
from Cassandra.
      * If the operationMode is LEAVING, the corresponding OperationalJob is 
{@link OperationalJobStatus#RUNNING}.
      * In this case, even if the OperationalJobStatus determined from this 
method is {@link OperationalJobStatus#CREATED},
      * the concrete implementation can override and return {@link 
OperationalJobStatus#RUNNING}.
@@ -113,7 +122,8 @@ public abstract class OperationalJob implements Task<Void>
     public OperationalJobStatus status()
     {
         Future<Void> fut = asyncResult();
-        if (!isExecuting)
+        // Jobs that are created and yet to be picked up by the executor thread
+        if (!isExecuting && !fut.isComplete())
         {
             return OperationalJobStatus.CREATED;
         }
@@ -140,12 +150,12 @@ public abstract class OperationalJob implements Task<Void>
      * Get the async result with waiting for at most the specified wait time
      * <p>
      * Note: This call does not block the calling thread.
-     * The call-site should handle the possible failed future with {@link 
TimeoutException} from this method.
      *
      * @param executorPool executor pool to run the timer
      * @param waitTime     maximum time to wait before returning
-     * @return the async result or a failed future of {@link 
OperationalJobException} with the cause
-     * {@link TimeoutException} after exceeding the wait time
+     * @return a future that is either the result of the configured timeout 
based on {@code waitTime} or the async
+     * result. A succeeded future here, represents either a timeout or the 
result of the job and a failure is
+     * represented by an exception thrown by the job execution, within the 
configured timeout.
      */
     public Future<Void> asyncResult(TaskExecutorPool executorPool, Duration 
waitTime)
     {
@@ -163,9 +173,8 @@ public abstract class OperationalJob implements Task<Void>
         // Completes as soon as any future succeeds, or when all futures fail. 
Note that maxWaitTimePromise is
         // closed as soon as resultFut completes
         return Future.any(maxWaitTimeFut, resultFut)
-                     // We want to return the result when applicable, of 
course.
-                     // If this lambda below is evaluated, both futures are 
completed;
-                     // Depending on whether timeout flag is set, it either 
throws or complete with result
+                     // If this lambda below is evaluated, either one of the 
futures have completed;
+                     // In either case, the future corresponding to the job 
execution is returned
                      .compose(f -> {
                          boolean isTimeout = maxWaitTimeFut.result();
                          if (isTimeout)
@@ -179,10 +188,8 @@ public abstract class OperationalJob implements Task<Void>
 
     /**
      * OperationalJob body. The implementation is executed in a blocking 
manner.
-     *
-     * @throws OperationalJobException OperationalJobException that wraps job 
failure
      */
-    protected abstract void executeInternal() throws OperationalJobException;
+    protected abstract void executeInternal();
 
     /**
      * Execute the job behavior as specified in the internal execution {@link 
#executeInternal()},
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java
index 034fd567..dd5d199e 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java
@@ -24,7 +24,8 @@ import java.util.stream.Collectors;
 import javax.inject.Inject;
 
 import com.google.inject.Singleton;
-import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
 import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
 
 /**
@@ -35,15 +36,18 @@ public class OperationalJobManager
 {
     private final OperationalJobTracker jobTracker;
 
+    private final TaskExecutorPool internalExecutorPool;
+
     /**
      * Creates a manager instance with a default sized job-tracker.
      *
      * @param jobTracker the tracker for the operational jobs
      */
     @Inject
-    public OperationalJobManager(OperationalJobTracker jobTracker)
+    public OperationalJobManager(OperationalJobTracker jobTracker, 
ExecutorPools executorPools)
     {
         this.jobTracker = jobTracker;
+        this.internalExecutorPool = executorPools.internal();
     }
 
     /**
@@ -53,7 +57,7 @@ public class OperationalJobManager
      */
     public List<OperationalJob> allInflightJobs()
     {
-        return jobTracker.getJobsView().values()
+        return jobTracker.jobsView().values()
                          .stream()
                          .filter(j -> !j.asyncResult().isComplete())
                          .collect(Collectors.toList());
@@ -76,22 +80,30 @@ public class OperationalJobManager
      * The job execution failure behavior is tracked within the {@link 
OperationalJob}.
      *
      * @param job OperationalJob instance to submit
-     * @return OperationalJob instance that is submitted
      * @throws OperationalJobConflictException when the same operational job 
is already running on Cassandra
      */
-    public OperationalJob trySubmitJob(OperationalJob job) throws 
OperationalJobConflictException
+    public void trySubmitJob(OperationalJob job) throws 
OperationalJobConflictException
     {
         checkConflict(job);
 
         // New job is submitted for all cases when we do not have a 
corresponding downstream job
-        return jobTracker.computeIfAbsent(job.jobId, jobId -> job);
+        jobTracker.computeIfAbsent(job.jobId(), jobId -> {
+            internalExecutorPool.executeBlocking(job::execute);
+            return job;
+        });
     }
 
+    /**
+     * Checks the job tracker for existing inflight jobs with the same 
operation before checking downstream for
+     * corresponding running job on the Cassandra node as a conflict of the 
job being submitted.
+     * @param job instance of the job to check conflicts for
+     * @throws OperationalJobConflictException when a conflicting inflight job 
is found
+     */
     private void checkConflict(OperationalJob job) throws 
OperationalJobConflictException
     {
-        // The job is not yet submitted (and running), but its status 
indicates that there is an identical job running on Cassandra already
-        // In this case, this job submission is rejected.
-        if (job.status() == OperationalJobStatus.RUNNING)
+        // If there are no tracked running jobs for same operation, then we 
confirm downstream
+        // Downstream check is done in most cases - by design
+        if (!jobTracker.inflightJobsByOperation(job.name()).isEmpty() || 
job.isRunningOnCassandra())
         {
             throw new OperationalJobConflictException("The same operational 
job is already running on Cassandra. operationName='" + job.name() + '\'');
         }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
index 649a8a0b..6d87510b 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
@@ -20,16 +20,19 @@ package org.apache.cassandra.sidecar.job;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
 import org.apache.cassandra.sidecar.config.ServiceConfiguration;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.VisibleForTesting;
@@ -65,16 +68,17 @@ public class OperationalJobTracker
                 if (map.size() > initialCapacity)
                 {
                     OperationalJob job = eldest.getValue();
-                    if (job.status().isCompleted() && 
job.isStale(System.currentTimeMillis(), ONE_DAY_TTL))
+                    OperationalJobStatus status = job.status();
+                    if (status.isCompleted() && 
job.isStale(System.currentTimeMillis(), ONE_DAY_TTL))
                     {
                         LOGGER.debug("Expiring completed and stale job due to 
job tracker has reached max size. jobId={} status={} createdAt={}",
-                                     job.jobId, job.status(), 
job.creationTime());
+                                     job.jobId(), status, job.creationTime());
                         return true;
                     }
                     else
                     {
                         LOGGER.warn("Job tracker reached max size, but the 
eldest job is not completed yet. " +
-                                    "Not evicting. jobId={} status={}", 
job.jobId, job.status());
+                                    "Not evicting. jobId={} status={}", 
job.jobId(), status);
                         // TODO: Optionally trigger cleanup to fetch next 
oldest to evict
                     }
                 }
@@ -100,15 +104,30 @@ public class OperationalJobTracker
      * @return an immutable copy of the underlying mapping
      */
     @NotNull
-    Map<UUID, OperationalJob> getJobsView()
+    Map<UUID, OperationalJob> jobsView()
     {
         return Collections.unmodifiableMap(map);
     }
 
+    /**
+     * Filters the inflight (created or running) jobs matching the job name 
from the jobsView
+     * @return list of inflight jobs being tracked
+     */
+    @NotNull
+    List<OperationalJob> inflightJobsByOperation(String operation)
+    {
+        return jobsView().values()
+                         .stream()
+                         .filter(j -> (j.name().equals(operation)) &&
+                                      (j.status() == 
OperationalJobStatus.RUNNING ||
+                                       j.status() == 
OperationalJobStatus.CREATED))
+                         .collect(Collectors.toList());
+    }
+
     @VisibleForTesting
     OperationalJob put(OperationalJob job)
     {
-        return map.put(job.jobId, job);
+        return map.put(job.jobId(), job);
     }
 
     @VisibleForTesting
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/routes/ListOperationalJobsHandler.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/routes/ListOperationalJobsHandler.java
index ba7a24dc..93a0861d 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/routes/ListOperationalJobsHandler.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/routes/ListOperationalJobsHandler.java
@@ -61,7 +61,7 @@ public class ListOperationalJobsHandler extends 
AbstractHandler<Void>
         ListOperationalJobsResponse listResponse = new 
ListOperationalJobsResponse();
         jobManager.allInflightJobs()
                   .stream()
-                  .map(job -> new OperationalJobResponse(job.jobId, RUNNING, 
job.name(), null))
+                  .map(job -> new OperationalJobResponse(job.jobId(), RUNNING, 
job.name(), null))
                   .forEach(listResponse::addJob);
         context.json(listResponse);
     }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/routes/NodeDecommissionHandler.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/routes/NodeDecommissionHandler.java
new file mode 100644
index 00000000..231d8fdb
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/routes/NodeDecommissionHandler.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
+import org.apache.cassandra.sidecar.job.NodeDecommissionJob;
+import org.apache.cassandra.sidecar.job.OperationalJobManager;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.sidecar.utils.OperationalJobUtils;
+
+import static 
org.apache.cassandra.sidecar.utils.RequestUtils.parseBooleanQueryParam;
+
+/**
+ * Provides REST API for asynchronously decommissioning the corresponding 
Cassandra node
+ */
+public class NodeDecommissionHandler extends AbstractHandler<Boolean>
+{
+    private final OperationalJobManager jobManager;
+    private final ServiceConfiguration config;
+
+    /**
+     * Constructs a handler with the provided {@code metadataFetcher}
+     *
+     * @param metadataFetcher the interface to retrieve instance metadata
+     * @param executorPools   the executor pools for blocking executions
+     * @param validator       a validator instance to validate 
Cassandra-specific input
+     */
+    @Inject
+    protected NodeDecommissionHandler(InstanceMetadataFetcher metadataFetcher,
+                                      ExecutorPools executorPools,
+                                      ServiceConfiguration 
serviceConfiguration,
+                                      CassandraInputValidator validator,
+                                      OperationalJobManager jobManager)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.jobManager = jobManager;
+        this.config = serviceConfiguration;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void handleInternal(RoutingContext context,
+                               HttpServerRequest httpRequest,
+                               String host,
+                               SocketAddress remoteAddress,
+                               Boolean isForce)
+    {
+        StorageOperations operations = 
metadataFetcher.delegate(host).storageOperations();
+        NodeDecommissionJob job = new NodeDecommissionJob(UUIDs.timeBased(), 
operations, isForce);
+        try
+        {
+            jobManager.trySubmitJob(job);
+        }
+        catch (OperationalJobConflictException oje)
+        {
+            String reason = oje.getMessage();
+            logger.error("Conflicting job encountered. reason={}", reason);
+            
context.response().setStatusCode(HttpResponseStatus.CONFLICT.code());
+            context.json(new OperationalJobResponse(job.jobId(), 
OperationalJobStatus.FAILED, job.name(), reason));
+            return;
+        }
+
+        // Get the result, waiting for the specified wait time for result
+        job.asyncResult(executorPools.service(),
+                        
Duration.of(config.operationalJobExecutionMaxWaitTimeInMillis(), 
ChronoUnit.MILLIS))
+           .onComplete(v -> 
OperationalJobUtils.sendStatusBasedResponse(context, job));
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected Boolean extractParamsOrThrow(RoutingContext context)
+    {
+        return parseBooleanQueryParam(context.request(), "force", false);
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/routes/OperationalJobHandler.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/routes/OperationalJobHandler.java
index 162903af..1f0eb12f 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/routes/OperationalJobHandler.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/routes/OperationalJobHandler.java
@@ -25,22 +25,20 @@ import io.netty.handler.codec.http.HttpResponseStatus;
 import io.vertx.core.http.HttpServerRequest;
 import io.vertx.core.net.SocketAddress;
 import io.vertx.ext.web.RoutingContext;
-import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
-import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
 import org.apache.cassandra.sidecar.job.OperationalJob;
 import org.apache.cassandra.sidecar.job.OperationalJobManager;
 import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
 import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.sidecar.utils.OperationalJobUtils;
 
 import static 
org.apache.cassandra.sidecar.common.ApiEndpointsV1.OPERATIONAL_JOB_ID_PATH_PARAM;
-import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED;
 import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
 
 /**
  * Handler for retrieving the status of async operational jobs running on the 
sidecar
  */
-public class OperationalJobHandler extends AbstractHandler<Void>
+public class OperationalJobHandler extends AbstractHandler<UUID>
 {
     private final OperationalJobManager jobManager;
 
@@ -54,16 +52,16 @@ public class OperationalJobHandler extends 
AbstractHandler<Void>
         this.jobManager = jobManager;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    protected Void extractParamsOrThrow(RoutingContext context)
+    public void handleInternal(RoutingContext context,
+                               HttpServerRequest httpRequest,
+                               String host,
+                               SocketAddress remoteAddress,
+                               UUID jobId)
     {
-        return null;
-    }
-
-    @Override
-    public void handleInternal(RoutingContext context, HttpServerRequest 
httpRequest, String host, SocketAddress remoteAddress, Void request)
-    {
-        UUID jobId = validatedJobIdParam(context);
         executorPools.service()
                      .executeBlocking(() -> {
                          OperationalJob job = jobManager.getJobIfExists(jobId);
@@ -75,11 +73,20 @@ public class OperationalJobHandler extends 
AbstractHandler<Void>
                          }
                          return job;
                      })
-                     .onFailure(cause -> processFailure(cause, context, host, 
remoteAddress, request))
-                     .onSuccess(job -> sendStatusBasedResponse(context, jobId, 
job));
+                     .onFailure(cause -> processFailure(cause, context, host, 
remoteAddress, jobId))
+                     .onSuccess(job -> 
OperationalJobUtils.sendStatusBasedResponse(context, job));
     }
 
-    UUID validatedJobIdParam(RoutingContext context)
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected UUID extractParamsOrThrow(RoutingContext context)
+    {
+        return validatedJobIdParam(context);
+    }
+
+    private UUID validatedJobIdParam(RoutingContext context)
     {
         String requestJobId = 
context.pathParam(OPERATIONAL_JOB_ID_PATH_PARAM.substring(1));
         if (requestJobId == null)
@@ -96,24 +103,9 @@ public class OperationalJobHandler extends 
AbstractHandler<Void>
         catch (IllegalArgumentException e)
         {
             logger.info("Invalid jobId. jobId={}", requestJobId);
-            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, 
String.format("Invalid job ID provided :%s.", requestJobId));
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST,
+                                    String.format("Invalid job ID provided: 
%s.", requestJobId));
         }
         return jobId;
     }
-
-    public void sendStatusBasedResponse(RoutingContext context, UUID jobId, 
OperationalJob job)
-    {
-        OperationalJobStatus status = job.status();
-        if (status.isCompleted())
-        {
-            context.response().setStatusCode(HttpResponseStatus.OK.code());
-        }
-        else
-        {
-            
context.response().setStatusCode(HttpResponseStatus.ACCEPTED.code());
-        }
-
-        String reason = status == FAILED ? 
job.asyncResult().cause().getMessage() : null;
-        context.json(new OperationalJobResponse(jobId, status, job.name(), 
reason));
-    }
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java 
b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
index e8051006..10a9b321 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
@@ -106,6 +106,7 @@ import 
org.apache.cassandra.sidecar.routes.FileStreamHandler;
 import org.apache.cassandra.sidecar.routes.GossipInfoHandler;
 import org.apache.cassandra.sidecar.routes.JsonErrorHandler;
 import org.apache.cassandra.sidecar.routes.ListOperationalJobsHandler;
+import org.apache.cassandra.sidecar.routes.NodeDecommissionHandler;
 import org.apache.cassandra.sidecar.routes.OperationalJobHandler;
 import org.apache.cassandra.sidecar.routes.RingHandler;
 import org.apache.cassandra.sidecar.routes.RoutingOrder;
@@ -280,6 +281,7 @@ public class MainModule extends AbstractModule
                               ConnectedClientStatsHandler 
connectedClientStatsHandler,
                               OperationalJobHandler operationalJobHandler,
                               ListOperationalJobsHandler 
listOperationalJobsHandler,
+                              NodeDecommissionHandler nodeDecommissionHandler,
                               ErrorHandler errorHandler)
     {
         Router router = Router.router(vertx);
@@ -379,6 +381,9 @@ public class MainModule extends AbstractModule
         router.get(ApiEndpointsV1.LIST_OPERATIONAL_JOBS_ROUTE)
               .handler(listOperationalJobsHandler);
 
+        router.put(ApiEndpointsV1.NODE_DECOMMISSION_ROUTE)
+              .handler(nodeDecommissionHandler);
+
         router.get(ApiEndpointsV1.RING_ROUTE_PER_KEYSPACE)
               .handler(ringHandler);
 
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/utils/OperationalJobUtils.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/utils/OperationalJobUtils.java
new file mode 100644
index 00000000..e36f1a89
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/utils/OperationalJobUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.job.OperationalJob;
+
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED;
+
+/**
+ * Utility class for OperationalJob framework operations.
+ */
+public class OperationalJobUtils
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(OperationalJobUtils.class);
+
+    /**
+     * In the operational job context, sends a {@link OperationalJobResponse} 
based on the status of the job.
+     *
+     * @param context the request context
+     * @param job     the operational job to reports status on
+     */
+    public static void sendStatusBasedResponse(RoutingContext context, 
OperationalJob job)
+    {
+        OperationalJobStatus status = job.status();
+        LOGGER.info("Job completion status={} jobId={}", status, job.jobId());
+        if (status.isCompleted())
+        {
+            context.response().setStatusCode(HttpResponseStatus.OK.code());
+        }
+        else
+        {
+            
context.response().setStatusCode(HttpResponseStatus.ACCEPTED.code());
+        }
+
+        String reason = status == FAILED ? 
job.asyncResult().cause().getMessage() : null;
+        context.json(new OperationalJobResponse(job.jobId(), status, 
job.name(), reason));
+    }
+}
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/coordination/MostReplicatedKeyspaceTokenZeroElectorateMembershipIntegrationTest.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/coordination/MostReplicatedKeyspaceTokenZeroElectorateMembershipIntegrationTest.java
index fdb0c45b..6f335b85 100644
--- 
a/server/src/test/integration/org/apache/cassandra/sidecar/coordination/MostReplicatedKeyspaceTokenZeroElectorateMembershipIntegrationTest.java
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/coordination/MostReplicatedKeyspaceTokenZeroElectorateMembershipIntegrationTest.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.slf4j.Logger;
@@ -68,6 +69,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 /**
  * Integration tests for the {@link 
MostReplicatedKeyspaceTokenZeroElectorateMembership} class
  */
+@Tag("heavy")
 class MostReplicatedKeyspaceTokenZeroElectorateMembershipIntegrationTest
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(MostReplicatedKeyspaceTokenZeroElectorateMembershipIntegrationTest.class);
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/routes/NodeDecommissionIntegrationTest.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/routes/NodeDecommissionIntegrationTest.java
new file mode 100644
index 00000000..fadd012e
--- /dev/null
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/routes/NodeDecommissionIntegrationTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+
+import static org.apache.cassandra.sidecar.AssertionUtils.loopAssert;
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED;
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/**
+ * Test the node decommission endpoint with cassandra container.
+ */
+@ExtendWith(VertxExtension.class)
+public class NodeDecommissionIntegrationTest extends IntegrationTestBase
+{
+    @CassandraIntegrationTest(nodesPerDc = 2)
+    void decommissionNodeDefault(VertxTestContext context)
+    {
+        final AtomicReference<String> jobId = new AtomicReference<>();
+        String testRoute = 
"/api/v1/cassandra/operations/decommission?force=true";
+        testWithClient(client -> client.put(server.actualPort(), "127.0.0.1", 
testRoute)
+                                       .send(context.succeeding(response -> {
+                                           OperationalJobResponse 
decommissionResponse = response.bodyAsJson(OperationalJobResponse.class);
+                                           
assertThat(decommissionResponse.status()).isEqualTo(RUNNING);
+                                           
jobId.set(String.valueOf(decommissionResponse.jobId()));
+                                       })));
+        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
+        assertThat(jobId.get()).isNotNull();
+        pollStatusForState(jobId.get(), SUCCEEDED, null);
+        context.completeNow();
+    }
+
+    @CassandraIntegrationTest(nodesPerDc = 2)
+    void decommissionNodeWithFailure(VertxTestContext context)
+    {
+        String testRoute = "/api/v1/cassandra/operations/decommission";
+        testWithClient(client -> client.put(server.actualPort(), "127.0.0.1", 
testRoute)
+                                       .send(context.succeeding(response -> {
+                                           OperationalJobResponse 
decommissionResponse = response.bodyAsJson(OperationalJobResponse.class);
+                                           
assertThat(decommissionResponse.status()).isEqualTo(FAILED);
+                                           
assertThat(decommissionResponse.jobId()).isNotNull();
+                                           String jobId = 
String.valueOf(decommissionResponse.jobId());
+                                           assertThat(jobId).isNotNull();
+                                           context.completeNow();
+                                       })));
+
+    }
+
+    private void pollStatusForState(String uuid,
+                                    OperationalJobStatus expectedStatus,
+                                    String expectedReason)
+    {
+        String status = "/api/v1/cassandra/operational-jobs/" + uuid;
+        AtomicBoolean stateReached = new AtomicBoolean(false);
+        AtomicInteger counter = new AtomicInteger(0);
+        loopAssert(30, () -> {
+            counter.incrementAndGet();
+            // TODO: optionally create a helper method in the base class to 
get response in the blocking manner
+            HttpResponse<Buffer> resp;
+            try
+            {
+                resp = client.get(server.actualPort(), "127.0.0.1", status)
+                                                  .send()
+                                                  .toCompletionStage()
+                                                  .toCompletableFuture()
+                                                  .get();
+                logger.info("Success Status Response code: {}", 
resp.statusCode());
+                logger.info("Status Response: {}", resp.bodyAsString());
+                if (resp.statusCode() == HttpResponseStatus.OK.code())
+                {
+                    stateReached.set(true);
+                    OperationalJobResponse jobStatusResp = 
resp.bodyAsJson(OperationalJobResponse.class);
+                    
assertThat(jobStatusResp.jobId()).isEqualTo(UUID.fromString(uuid));
+                    
assertThat(jobStatusResp.status()).isEqualTo(expectedStatus);
+                    
assertThat(jobStatusResp.reason()).isEqualTo(expectedReason);
+                    
assertThat(jobStatusResp.operation()).isEqualTo("decommission");
+                }
+                else
+                {
+                    
assertThat(resp.statusCode()).isEqualTo(HttpResponseStatus.ACCEPTED.code());
+                    OperationalJobResponse jobStatusResp = 
resp.bodyAsJson(OperationalJobResponse.class);
+                    
assertThat(jobStatusResp.jobId()).isEqualTo(UUID.fromString(uuid));
+                }
+                logger.info("Request completed");
+                assertThat(stateReached.get()).isTrue();
+            }
+            catch (InterruptedException | ExecutionException e)
+            {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+}
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/testing/BootstrapBBUtils.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/testing/BootstrapBBUtils.java
index ab5ec788..a3420e2f 100644
--- 
a/server/src/test/integration/org/apache/cassandra/sidecar/testing/BootstrapBBUtils.java
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/testing/BootstrapBBUtils.java
@@ -50,4 +50,20 @@ public class BootstrapBBUtils
                        .make(TypeResolutionStrategy.Lazy.INSTANCE, typePool)
                        .load(cl, ClassLoadingStrategy.Default.INJECTION);
     }
+
+    public static void installDecommissionIntercepter(ClassLoader cl, Class<?> 
interceptor)
+    {
+
+        // "org.apache.cassandra.service.StorageService"  "operationMode"
+        //  "org.apache.cassandra.tcm.sequences.InProgressSequences" "isLeave"
+        TypePool typePool = TypePool.Default.of(cl);
+        TypeDescription description = 
typePool.describe("org.apache.cassandra.service.StorageService")
+                                              .resolve();
+        new ByteBuddy().rebase(description, 
ClassFileLocator.ForClassLoader.of(cl))
+                       .method(named("operationMode"))
+                       .intercept(MethodDelegation.to(interceptor))
+                       // Defer class loading until all dependencies are loaded
+                       .make(TypeResolutionStrategy.Lazy.INSTANCE, typePool)
+                       .load(cl, ClassLoadingStrategy.Default.INJECTION);
+    }
 }
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java
index 7cdedaf7..5934e504 100644
--- 
a/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java
@@ -195,6 +195,11 @@ public abstract class IntegrationTestBase
         return -1;
     }
 
+    protected void testWithClient(Consumer<WebClient> tester)
+    {
+        testWithClient(true, tester);
+    }
+
     protected void testWithClient(VertxTestContext context, 
Consumer<WebClient> tester) throws Exception
     {
         testWithClient(context, true, tester);
@@ -204,6 +209,37 @@ public abstract class IntegrationTestBase
                                   boolean waitForCluster,
                                   Consumer<WebClient> tester)
     throws Exception
+    {
+        testWithClient(waitForCluster, tester);
+         // wait until the test completes
+        assertThat(context.awaitCompletion(2, TimeUnit.MINUTES)).isTrue();
+    }
+
+    protected void testWithClient(boolean waitForCluster,
+                                  Consumer<WebClient> tester)
+    {
+        CassandraAdapterDelegate delegate = 
sidecarTestContext.instancesMetadata()
+                                                              
.instanceFromId(1)
+                                                              .delegate();
+
+        assertThat(delegate).isNotNull();
+        if (delegate.isNativeUp() || !waitForCluster)
+        {
+            tester.accept(client);
+        }
+        else
+        {
+            vertx.eventBus().localConsumer(ON_CASSANDRA_CQL_READY.address(), 
(Message<JsonObject> message) -> {
+                if (message.body().getInteger("cassandraInstanceId") == 1)
+                {
+                    tester.accept(client);
+                }
+            });
+        }
+    }
+
+    protected void testWithClientBlocking(boolean waitForCluster,
+                                     Consumer<WebClient> tester)
     {
         CassandraAdapterDelegate delegate = 
sidecarTestContext.instancesMetadata()
                                                               
.instanceFromId(1)
@@ -224,8 +260,6 @@ public abstract class IntegrationTestBase
             });
         }
 
-        // wait until the test completes
-        assertThat(context.awaitCompletion(2, TimeUnit.MINUTES)).isTrue();
     }
 
     protected void createTestKeyspace()
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java
index f2f8307c..b680e5e9 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java
@@ -30,10 +30,8 @@ import io.vertx.core.Vertx;
 import 
org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
 import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
-import org.apache.cassandra.sidecar.config.ServiceConfiguration;
-import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
 import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
-import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
 import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
@@ -55,33 +53,30 @@ import static org.mockito.Mockito.when;
  */
 class OperationalJobManagerTest
 {
-    @Mock
-    SidecarConfiguration mockConfig;
-
     protected Vertx vertx;
 
+    protected ExecutorPools executorPool;
+
     @BeforeEach
     void setup()
     {
         vertx = Vertx.vertx();
+        executorPool = new ExecutorPools(vertx, new 
ServiceConfigurationImpl());
         MockitoAnnotations.openMocks(this);
-        ServiceConfiguration mockServiceConfig = 
mock(ServiceConfiguration.class);
-        when(mockConfig.serviceConfiguration()).thenReturn(mockServiceConfig);
-        
when(mockServiceConfig.operationalJobExecutionMaxWaitTimeInMillis()).thenReturn(5000L);
     }
 
     @Test
     void testWithNoDownstreamJob()
     {
         OperationalJobTracker tracker = new OperationalJobTracker(4);
-        OperationalJobManager manager = new OperationalJobManager(tracker);
+        OperationalJobManager manager = new OperationalJobManager(tracker, 
executorPool);
 
         OperationalJob testJob = 
OperationalJobTest.createOperationalJob(SUCCEEDED);
         manager.trySubmitJob(testJob);
         testJob.execute(Promise.promise());
         assertThat(testJob.asyncResult().isComplete()).isTrue();
         assertThat(testJob.status()).isEqualTo(SUCCEEDED);
-        assertThat(tracker.get(testJob.jobId)).isNotNull();
+        assertThat(tracker.get(testJob.jobId())).isNotNull();
     }
 
     @Test
@@ -93,7 +88,7 @@ class OperationalJobManagerTest
         TaskExecutorPool mockExecPool = mock(TaskExecutorPool.class);
         when(mockPools.internal()).thenReturn(mockExecPool);
         when(mockExecPool.runBlocking(any())).thenReturn(null);
-        OperationalJobManager manager = new OperationalJobManager(tracker);
+        OperationalJobManager manager = new OperationalJobManager(tracker, 
executorPool);
         assertThatThrownBy(() -> manager.trySubmitJob(runningJob))
         .isExactlyInstanceOf(OperationalJobConflictException.class)
         .hasMessage("The same operational job is already running on Cassandra. 
operationName='Operation X'");
@@ -105,7 +100,7 @@ class OperationalJobManagerTest
         UUID jobId = UUIDs.timeBased();
 
         OperationalJobTracker tracker = new OperationalJobTracker(4);
-        OperationalJobManager manager = new OperationalJobManager(tracker);
+        OperationalJobManager manager = new OperationalJobManager(tracker, 
executorPool);
 
         OperationalJob testJob = 
OperationalJobTest.createOperationalJob(jobId, Duration.ofSeconds(10));
 
@@ -123,11 +118,17 @@ class OperationalJobManagerTest
         UUID jobId = UUIDs.timeBased();
 
         OperationalJobTracker tracker = new OperationalJobTracker(4);
-        OperationalJobManager manager = new OperationalJobManager(tracker);
+        OperationalJobManager manager = new OperationalJobManager(tracker, 
executorPool);
 
         String msg = "Test Job failed";
         OperationalJob failingJob = new OperationalJob(jobId)
         {
+            @Override
+            public boolean isRunningOnCassandra()
+            {
+                return false;
+            }
+
             @Override
             protected void executeInternal() throws OperationalJobException
             {
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java 
b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java
index 9e2bd51f..81f9c9ad 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java
@@ -59,6 +59,12 @@ class OperationalJobTest
             {
             }
 
+            @Override
+            public boolean isRunningOnCassandra()
+            {
+                return jobStatus == OperationalJobStatus.RUNNING;
+            }
+
             @Override
             public OperationalJobStatus status()
             {
@@ -82,6 +88,12 @@ class OperationalJobTest
     {
         return new OperationalJob(jobId)
         {
+            @Override
+            public boolean isRunningOnCassandra()
+            {
+                return false;
+            }
+
             @Override
             protected void executeInternal() throws OperationalJobException
             {
@@ -122,6 +134,12 @@ class OperationalJobTest
         String msg = "Test Job failed";
         OperationalJob failingJob = new OperationalJob(UUIDs.timeBased())
         {
+            @Override
+            public boolean isRunningOnCassandra()
+            {
+                return false;
+            }
+
             @Override
             protected void executeInternal() throws OperationalJobException
             {
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTrackerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTrackerTest.java
index f6a49d4c..1840ab97 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTrackerTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTrackerTest.java
@@ -31,8 +31,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import com.datastax.driver.core.utils.UUIDs;
-import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
-import 
org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException;
 
 import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
 import static 
org.apache.cassandra.sidecar.job.OperationalJobTest.createOperationalJob;
@@ -53,19 +51,7 @@ class OperationalJobTrackerTest
     OperationalJob job4 = createOperationalJob(SUCCEEDED);
 
     long twoDaysAgo = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2);
-    OperationalJob jobWithStaleCreationTime = new 
OperationalJob(UUIDs.startOf(twoDaysAgo))
-    {
-        @Override
-        protected void executeInternal() throws OperationalJobException
-        {
-        }
-
-        @Override
-        public OperationalJobStatus status()
-        {
-            return SUCCEEDED;
-        }
-    };
+    OperationalJob jobWithStaleCreationTime = 
createOperationalJob(UUIDs.startOf(twoDaysAgo), SUCCEEDED);
 
     @BeforeEach
     void setUp()
@@ -78,18 +64,18 @@ class OperationalJobTrackerTest
     {
         jobTracker.put(job1);
         jobTracker.put(job2);
-        assertThat(jobTracker.get(job1.jobId)).isSameAs(job1);
-        assertThat(jobTracker.get(job2.jobId)).isSameAs(job2);
+        assertThat(jobTracker.get(job1.jobId())).isSameAs(job1);
+        assertThat(jobTracker.get(job2.jobId())).isSameAs(job2);
     }
 
     @Test
     void testComputeIfAbsent()
     {
         jobTracker.put(job1);
-        OperationalJob job = jobTracker.computeIfAbsent(job1.jobId, v -> job3);
+        OperationalJob job = jobTracker.computeIfAbsent(job1.jobId(), v -> 
job3);
         assertThat(job).isNotSameAs(job3);
         assertThat(job).isSameAs(job1);
-        assertThat(jobTracker.get(job1.jobId)).isSameAs(job1);
+        assertThat(jobTracker.get(job1.jobId())).isSameAs(job1);
     }
 
     @Test
@@ -103,10 +89,10 @@ class OperationalJobTrackerTest
         assertThat(jobTracker.size())
         .describedAs("Although the tracker initial size is 3, no job is 
evicted since all jobs are still running")
         .isEqualTo(4);
-        assertThat(jobTracker.get(job1.jobId)).isNotNull();
-        assertThat(jobTracker.get(job2.jobId)).isNotNull();
-        assertThat(jobTracker.get(job3.jobId)).isNotNull();
-        assertThat(jobTracker.get(job4.jobId)).isNotNull();
+        assertThat(jobTracker.get(job1.jobId())).isNotNull();
+        assertThat(jobTracker.get(job2.jobId())).isNotNull();
+        assertThat(jobTracker.get(job3.jobId())).isNotNull();
+        assertThat(jobTracker.get(job4.jobId())).isNotNull();
     }
 
     @Test
@@ -118,10 +104,10 @@ class OperationalJobTrackerTest
         jobTracker.put(job3);
 
         assertThat(jobTracker.size()).isEqualTo(3);
-        assertThat(jobTracker.get(job1.jobId)).isNotNull();
-        assertThat(jobTracker.get(job2.jobId)).isNotNull();
-        assertThat(jobTracker.get(job3.jobId)).isNotNull();
-        assertThat(jobTracker.get(jobWithStaleCreationTime.jobId)).isNull();
+        assertThat(jobTracker.get(job1.jobId())).isNotNull();
+        assertThat(jobTracker.get(job2.jobId())).isNotNull();
+        assertThat(jobTracker.get(job3.jobId())).isNotNull();
+        assertThat(jobTracker.get(jobWithStaleCreationTime.jobId())).isNull();
     }
 
     @Test
@@ -131,9 +117,9 @@ class OperationalJobTrackerTest
         jobTracker.put(job1);
         jobTracker.put(job2);
 
-        Map<UUID, OperationalJob> view = jobTracker.getJobsView();
+        Map<UUID, OperationalJob> view = jobTracker.jobsView();
         assertThat(view.size()).isEqualTo(2);
-        assertThatThrownBy(() -> view.put(job3.jobId, job3))
+        assertThatThrownBy(() -> view.put(job3.jobId(), job3))
         .isExactlyInstanceOf(UnsupportedOperationException.class);
     }
 
@@ -152,7 +138,7 @@ class OperationalJobTrackerTest
         executorService.shutdown();
         executorService.awaitTermination(5, TimeUnit.SECONDS);
         assertThat(tracker.size()).isEqualTo(one);
-        assertThat(tracker.getJobsView().values().iterator().next())
+        assertThat(tracker.jobsView().values().iterator().next())
         .describedAs("Only the last job is kept")
         .isSameAs(sortedJobs.get(sortedJobs.size() - 1));
     }
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/routes/ListOperationalJobsHandlerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/routes/ListOperationalJobsHandlerTest.java
index 57b9ff4b..c2a5640d 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/routes/ListOperationalJobsHandlerTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/routes/ListOperationalJobsHandlerTest.java
@@ -149,6 +149,12 @@ class ListOperationalJobsHandlerTest
             super(jobId);
         }
 
+        @Override
+        public boolean isRunningOnCassandra()
+        {
+            return false;
+        }
+
         @Override
         protected void executeInternal() throws OperationalJobException
         {
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/routes/NodeDecommissionHandlerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/routes/NodeDecommissionHandlerTest.java
new file mode 100644
index 00000000..5b2dde64
--- /dev/null
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/routes/NodeDecommissionHandlerTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.server.MainModule;
+import org.apache.cassandra.sidecar.server.Server;
+import org.mockito.AdditionalAnswers;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED;
+import static io.netty.handler.codec.http.HttpResponseStatus.CONFLICT;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link NodeDecommissionHandler}
+ */
+@ExtendWith(VertxExtension.class)
+public class NodeDecommissionHandlerTest
+{
+    static final Logger LOGGER = 
LoggerFactory.getLogger(NodeDecommissionHandlerTest.class);
+    Vertx vertx;
+    Server server;
+    StorageOperations mockStorageOperations = mock(StorageOperations.class);
+    @BeforeEach
+    void before() throws InterruptedException
+    {
+        Injector injector;
+        Module testOverride = Modules.override(new TestModule())
+                                     .with(new 
NodeDecommissionHandlerTest.NodeDecommissionTestModule());
+        injector = Guice.createInjector(Modules.override(new MainModule())
+                                               .with(testOverride));
+        vertx = injector.getInstance(Vertx.class);
+        server = injector.getInstance(Server.class);
+        VertxTestContext context = new VertxTestContext();
+        server.start()
+              .onSuccess(s -> context.completeNow())
+              .onFailure(context::failNow);
+        context.awaitCompletion(5, TimeUnit.SECONDS);
+    }
+
+    @AfterEach
+    void after() throws InterruptedException
+    {
+        CountDownLatch closeLatch = new CountDownLatch(1);
+        server.close().onSuccess(res -> closeLatch.countDown());
+        if (closeLatch.await(60, TimeUnit.SECONDS))
+            LOGGER.info("Close event received before timeout.");
+        else
+            LOGGER.error("Close event timed out.");
+    }
+
+    @Test
+    void testDecommissionLongRunning(VertxTestContext context)
+    {
+        when(mockStorageOperations.operationMode()).thenReturn("NORMAL");
+        doAnswer(AdditionalAnswers.answersWithDelay(6000, invocation -> null))
+        .when(mockStorageOperations).decommission(anyBoolean());
+
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/api/v1/cassandra/operations/decommission";
+        client.put(server.actualPort(), "127.0.0.1", testRoute)
+              .expect(ResponsePredicate.SC_ACCEPTED)
+              .send(context.succeeding(response -> {
+                  assertThat(response.statusCode()).isEqualTo(ACCEPTED.code());
+                  OperationalJobResponse decommissionResponse = 
response.bodyAsJson(OperationalJobResponse.class);
+                  assertThat(decommissionResponse).isNotNull();
+                  assertThat(decommissionResponse.status()).isEqualTo(RUNNING);
+                  context.completeNow();
+              }));
+    }
+
+    @Test
+    void testDecommissionCompleted(VertxTestContext context)
+    {
+        when(mockStorageOperations.operationMode()).thenReturn("NORMAL");
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/api/v1/cassandra/operations/decommission";
+        client.put(server.actualPort(), "127.0.0.1", testRoute)
+              .send(context.succeeding(response -> {
+                  assertThat(response.statusCode()).isEqualTo(OK.code());
+                  LOGGER.info("Decommission Response: {}", 
response.bodyAsString());
+
+                  OperationalJobResponse decommissionResponse = 
response.bodyAsJson(OperationalJobResponse.class);
+                  assertThat(decommissionResponse).isNotNull();
+                  
assertThat(decommissionResponse.status()).isEqualTo(SUCCEEDED);
+                  context.completeNow();
+              }));
+    }
+
+    @Test
+    void testDecommissionFailed(VertxTestContext context)
+    {
+        when(mockStorageOperations.operationMode()).thenReturn("NORMAL");
+        doThrow(new RuntimeException("Simulated 
failure")).when(mockStorageOperations).decommission(anyBoolean());
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/api/v1/cassandra/operations/decommission";
+        client.put(server.actualPort(), "127.0.0.1", testRoute)
+              .expect(ResponsePredicate.SC_OK)
+              .send(context.succeeding(response -> {
+                  assertThat(response.statusCode()).isEqualTo(OK.code());
+                  context.completeNow();
+              }));
+    }
+
+    @Test
+    void testDecommissionConflict(VertxTestContext context)
+    {
+        when(mockStorageOperations.operationMode()).thenReturn("LEAVING");
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/api/v1/cassandra/operations/decommission";
+        client.put(server.actualPort(), "127.0.0.1", testRoute)
+              .expect(ResponsePredicate.SC_CONFLICT)
+              .send(context.succeeding(response -> {
+                  assertThat(response.statusCode()).isEqualTo(CONFLICT.code());
+
+                  LOGGER.info("Decommission Response: {}", 
response.bodyAsString());
+                  OperationalJobResponse decommissionResponse = 
response.bodyAsJson(OperationalJobResponse.class);
+                  assertThat(decommissionResponse).isNotNull();
+                  assertThat(decommissionResponse.jobId()).isNotNull();
+                  context.completeNow();
+              }));
+    }
+
+    /**
+     * Test guice module for Node Decommission handler tests
+     */
+    class NodeDecommissionTestModule extends AbstractModule
+    {
+        @Provides
+        @Singleton
+        public InstancesMetadata instanceMetadata()
+        {
+            final int instanceId = 100;
+            final String host = "127.0.0.1";
+            final InstanceMetadata instanceMetadata = 
mock(InstanceMetadata.class);
+            when(instanceMetadata.host()).thenReturn(host);
+            when(instanceMetadata.port()).thenReturn(9042);
+            when(instanceMetadata.id()).thenReturn(instanceId);
+            when(instanceMetadata.stagingDir()).thenReturn("");
+
+            CassandraAdapterDelegate delegate = 
mock(CassandraAdapterDelegate.class);
+
+            
when(delegate.storageOperations()).thenReturn(mockStorageOperations);
+            when(instanceMetadata.delegate()).thenReturn(delegate);
+
+            InstancesMetadata mockInstancesMetadata = 
mock(InstancesMetadata.class);
+            
when(mockInstancesMetadata.instances()).thenReturn(Collections.singletonList(instanceMetadata));
+            
when(mockInstancesMetadata.instanceFromId(instanceId)).thenReturn(instanceMetadata);
+            
when(mockInstancesMetadata.instanceFromHost(host)).thenReturn(instanceMetadata);
+
+            return mockInstancesMetadata;
+        }
+    }
+
+}
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/routes/OperationalJobHandlerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/routes/OperationalJobHandlerTest.java
index 39becc6f..f4d0e98c 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/routes/OperationalJobHandlerTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/routes/OperationalJobHandlerTest.java
@@ -173,12 +173,15 @@ class OperationalJobHandlerTest
             OperationalJobManager mockManager = 
mock(OperationalJobManager.class);
             OperationalJob runningMock = mock(OperationalJob.class);
             Promise<Void> p = Promise.promise();
+            when(runningMock.jobId()).thenReturn(runningUuid);
             
when(runningMock.status()).thenReturn(OperationalJobStatus.RUNNING);
             when(runningMock.asyncResult()).thenReturn(p.future());
             OperationalJob completedMock = mock(OperationalJob.class);
+            when(completedMock.jobId()).thenReturn(completedUuid);
             
when(completedMock.status()).thenReturn(OperationalJobStatus.SUCCEEDED);
             when(completedMock.name()).thenReturn("testCompleted");
             OperationalJob failedMock = mock(OperationalJob.class);
+            when(failedMock.jobId()).thenReturn(failedUuid);
             when(failedMock.status()).thenReturn(OperationalJobStatus.FAILED);
             
when(failedMock.asyncResult()).thenReturn(Future.failedFuture("Test failed"));
             when(failedMock.name()).thenReturn("testFailed");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to