This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 35138a46 CASSSIDECAR-150: Adds job management framework and APIs (#139)
35138a46 is described below

commit 35138a46905db3b3cf5f1c3920f46e126b0b3259
Author: Arjun Ashok <arjun_as...@apple.com>
AuthorDate: Thu Dec 12 15:29:52 2024 -0800

    CASSSIDECAR-150: Adds job management framework and APIs (#139)
    
    
    Patch by Arjun Ashok; Reviewed by Shailaja Koppu, Yifan Cai, Venkata 
Harikrishna Nukala, Francisco Guerrero for CASSSIDECAR-150
---
 CHANGES.txt                                        |   1 +
 .../cassandra/sidecar/common/ApiEndpointsV1.java   |   8 +
 .../sidecar/common/data/OperationalJobStatus.java  |  48 +++++
 .../common/request/ListOperationalJobsRequest.java |  46 +++++
 .../common/request/OperationalJobRequest.java      |  50 +++++
 .../response/ListOperationalJobsResponse.java      |  51 +++++
 .../common/response/OperationalJobResponse.java    |  89 +++++++++
 .../cassandra/sidecar/client/RequestContext.java   |  26 +++
 .../cassandra/sidecar/client/SidecarClient.java    |  43 ++++-
 .../sidecar/client/SidecarClientTest.java          | 121 +++++++++---
 .../server/exceptions/OperationalJobException.java |  52 +++++
 .../sidecar/config/ServiceConfiguration.java       |  13 ++
 .../config/yaml/ServiceConfigurationImpl.java      |  57 ++++++
 .../OperationalJobConflictException.java           |  32 +++
 .../cassandra/sidecar/job/OperationalJob.java      | 215 +++++++++++++++++++++
 .../sidecar/job/OperationalJobManager.java         |  99 ++++++++++
 .../sidecar/job/OperationalJobTracker.java         | 119 ++++++++++++
 .../sidecar/routes/ListOperationalJobsHandler.java |  68 +++++++
 .../sidecar/routes/OperationalJobHandler.java      | 119 ++++++++++++
 .../cassandra/sidecar/server/MainModule.java       |  10 +
 .../cassandra/sidecar/tasks/PeriodicTask.java      |  34 +---
 .../sidecar/tasks/{PeriodicTask.java => Task.java} |  62 +-----
 .../sidecar/job/OperationalJobManagerTest.java     | 144 ++++++++++++++
 .../cassandra/sidecar/job/OperationalJobTest.java  | 186 ++++++++++++++++++
 .../sidecar/job/OperationalJobTrackerTest.java     | 159 +++++++++++++++
 .../routes/ListOperationalJobsHandlerTest.java     | 157 +++++++++++++++
 .../sidecar/routes/OperationalJobHandlerTest.java  | 192 ++++++++++++++++++
 27 files changed, 2087 insertions(+), 114 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 6cc3016f..1c8d31a7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.0.0
 -----
+ * Add support for operational job management of long-running node operations 
(CASSSIDECAR-150)
  * CDC directory not initialized properly in InstanceMetadata (CASSSIDECAR-171)
  * yaml configuration defaults to a file that doesn't exist (CASSSIDECAR-122)
  * Add advanced driver settings to allow taking in password or certificates 
for Cassandra connection (CASSSIDECAR-159)
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
index be49b511..108bc6e1 100644
--- 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
@@ -29,6 +29,7 @@ public final class ApiEndpointsV1
 
     public static final String HEALTH = "/__health";
     public static final String CASSANDRA = "/cassandra";
+
     public static final String NATIVE = "/native";
     public static final String JMX = "/jmx";
     public static final String KEYSPACE_PATH_PARAM = ":keyspace";
@@ -39,6 +40,8 @@ public final class ApiEndpointsV1
     public static final String UPLOAD_ID_PATH_PARAM = ":uploadId";
     public static final String JOB_ID_PATH_PARAM = ":jobId";
 
+    public static final String OPERATIONAL_JOB_ID_PATH_PARAM = ":operationId";
+
     public static final String PER_KEYSPACE = "/keyspaces/" + 
KEYSPACE_PATH_PARAM;
     public static final String PER_TABLE = "/tables/" + TABLE_PATH_PARAM;
     public static final String PER_SNAPSHOT = "/snapshots/" + 
SNAPSHOT_PATH_PARAM;
@@ -118,6 +121,11 @@ public final class ApiEndpointsV1
 
     public static final String CONNECTED_CLIENT_STATS_ROUTE = API_V1 + 
CASSANDRA + "/stats/connected-clients";
 
+    public static final String OPERATIONAL_JOBS = "/operational-jobs";
+    public static final String PER_OPERATIONAL_JOB = OPERATIONAL_JOBS + '/' + 
OPERATIONAL_JOB_ID_PATH_PARAM;
+    public static final String LIST_OPERATIONAL_JOBS_ROUTE = API_V1 + 
CASSANDRA + OPERATIONAL_JOBS;
+    public static final String OPERATIONAL_JOB_ROUTE = API_V1 + CASSANDRA + 
PER_OPERATIONAL_JOB;
+
     private ApiEndpointsV1()
     {
         throw new IllegalStateException(getClass() + " is a constants 
container and shall not be instantiated");
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/OperationalJobStatus.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/OperationalJobStatus.java
new file mode 100644
index 00000000..6a2d0600
--- /dev/null
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/OperationalJobStatus.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.data;
+
+/**
+ * Encapsulates the states of the job lifecycle.
+ * Operational jobs are the ones running on Cassandra, e.g. decommission, etc.
+ */
+public enum OperationalJobStatus
+{
+    /**
+     * The operational job is created
+     */
+    CREATED,
+    /**
+     * The operational job is running on Cassandra
+     */
+    RUNNING,
+    /**
+     * The operational job succeeds
+     */
+    SUCCEEDED,
+    /**
+     * The operational job fails
+     */
+    FAILED;
+
+    public boolean isCompleted()
+    {
+        return this == SUCCEEDED || this == FAILED;
+    }
+}
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ListOperationalJobsRequest.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ListOperationalJobsRequest.java
new file mode 100644
index 00000000..b472bd70
--- /dev/null
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ListOperationalJobsRequest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.request;
+
+import io.netty.handler.codec.http.HttpMethod;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import 
org.apache.cassandra.sidecar.common.response.ListOperationalJobsResponse;
+
+/**
+ * Represents a request to retrieve the list of async operational jobs
+ */
+public class ListOperationalJobsRequest extends 
JsonRequest<ListOperationalJobsResponse>
+{
+    /**
+     * Constructs a request to retrieve the list of running operational jobs
+     */
+    public ListOperationalJobsRequest()
+    {
+        super(ApiEndpointsV1.LIST_OPERATIONAL_JOBS_ROUTE);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public HttpMethod method()
+    {
+        return HttpMethod.GET;
+    }
+}
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/OperationalJobRequest.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/OperationalJobRequest.java
new file mode 100644
index 00000000..402c7d86
--- /dev/null
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/OperationalJobRequest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.request;
+
+import java.util.UUID;
+
+import io.netty.handler.codec.http.HttpMethod;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+
+/**
+ * Represents a request to retrieve the status of a async operational job
+ */
+public class OperationalJobRequest extends JsonRequest<OperationalJobResponse>
+{
+
+    /**
+     * Constructs a request to retrieve status for a specified operational job
+     */
+    public OperationalJobRequest(UUID jobId)
+    {
+        super(ApiEndpointsV1.OPERATIONAL_JOB_ROUTE
+              .replaceAll(ApiEndpointsV1.OPERATIONAL_JOB_ID_PATH_PARAM, 
jobId.toString()));
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public HttpMethod method()
+    {
+        return HttpMethod.GET;
+    }
+}
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ListOperationalJobsResponse.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ListOperationalJobsResponse.java
new file mode 100644
index 00000000..1f9f1eaa
--- /dev/null
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ListOperationalJobsResponse.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Response structure of the list operational jobs API
+ */
+public class ListOperationalJobsResponse
+{
+    private final List<OperationalJobResponse> jobs;
+
+    /**
+     * Constructs a {@link ListOperationalJobsResponse} object.
+     */
+    public ListOperationalJobsResponse()
+    {
+        this.jobs = new ArrayList<>();
+    }
+
+    public void addJob(OperationalJobResponse job)
+    {
+        jobs.add(job);
+    }
+
+    @JsonProperty("jobs")
+    public List<OperationalJobResponse> jobs()
+    {
+        return jobs;
+    }
+}
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/OperationalJobResponse.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/OperationalJobResponse.java
new file mode 100644
index 00000000..271f398a
--- /dev/null
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/OperationalJobResponse.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response;
+
+import java.util.UUID;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+
+/**
+ * Response structure of the operational jobs API
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class OperationalJobResponse
+{
+    private final UUID jobId;
+    private final OperationalJobStatus status;
+    private final String operation;
+    private final String reason;
+
+    @JsonCreator
+    public OperationalJobResponse(@JsonProperty("jobId") UUID jobId,
+                                  @JsonProperty("jobStatus") 
OperationalJobStatus status,
+                                  @JsonProperty("operation") String operation,
+                                  @JsonProperty("reason") String reason)
+    {
+        this.jobId = jobId;
+        this.status = status;
+        this.operation = operation;
+        this.reason = reason;
+    }
+
+    /**
+     * @return job id of operational job
+     */
+    @JsonProperty("jobId")
+    public UUID jobId()
+    {
+        return jobId;
+    }
+
+    /**
+     * @return status of the job
+     */
+    @JsonProperty("jobStatus")
+    public OperationalJobStatus status()
+    {
+        return status;
+    }
+
+    /**
+     * @return operation of the job
+     */
+    @JsonProperty("operation")
+    public String operation()
+    {
+        return operation;
+    }
+
+    /**
+     * @return reason for job failure
+     */
+    @JsonProperty("reason")
+    public String reason()
+    {
+        return reason;
+    }
+
+}
diff --git 
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java 
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
index 96082a61..65fad58d 100644
--- 
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
+++ 
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.sidecar.client;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
 
 import org.apache.cassandra.sidecar.client.retry.ExponentialBackoffRetryPolicy;
 import org.apache.cassandra.sidecar.client.retry.NoRetryPolicy;
@@ -35,8 +36,10 @@ import 
org.apache.cassandra.sidecar.common.request.ConnectedClientStatsRequest;
 import org.apache.cassandra.sidecar.common.request.CreateSnapshotRequest;
 import org.apache.cassandra.sidecar.common.request.GossipInfoRequest;
 import org.apache.cassandra.sidecar.common.request.ImportSSTableRequest;
+import org.apache.cassandra.sidecar.common.request.ListOperationalJobsRequest;
 import org.apache.cassandra.sidecar.common.request.ListSnapshotFilesRequest;
 import org.apache.cassandra.sidecar.common.request.NodeSettingsRequest;
+import org.apache.cassandra.sidecar.common.request.OperationalJobRequest;
 import org.apache.cassandra.sidecar.common.request.Request;
 import org.apache.cassandra.sidecar.common.request.RingRequest;
 import org.apache.cassandra.sidecar.common.request.SSTableComponentRequest;
@@ -72,6 +75,7 @@ public class RequestContext
     protected static final NodeSettingsRequest NODE_SETTINGS_REQUEST = new 
NodeSettingsRequest();
     protected static final RingRequest RING_REQUEST = new RingRequest();
     protected static final GossipInfoRequest GOSSIP_INFO_REQUEST = new 
GossipInfoRequest();
+    protected static final ListOperationalJobsRequest LIST_JOBS_REQUEST = new 
ListOperationalJobsRequest();
     protected static final RetryPolicy DEFAULT_RETRY_POLICY = new 
NoRetryPolicy();
     protected static final RetryPolicy 
DEFAULT_EXPONENTIAL_BACKOFF_RETRY_POLICY =
     new ExponentialBackoffRetryPolicy(10, 500L, 60_000L);
@@ -486,6 +490,28 @@ public class RequestContext
             return request(new ConnectedClientStatsRequest());
         }
 
+        /**
+         * Sets the {@code request} to be a {@link OperationalJobRequest} and 
returns a reference to this Builder
+         * enabling method chaining.
+         *
+         * @return a reference to this Builder
+         */
+        public Builder operationalJobRequest(UUID jobId)
+        {
+            return request(new OperationalJobRequest(jobId));
+        }
+
+        /**
+         * Sets the {@code request} to be a {@link ListOperationalJobsRequest} 
and returns a reference to this Builder
+         * enabling method chaining.
+         *
+         * @return a reference to this Builder
+         */
+        public Builder listOperationalJobsRequest()
+        {
+            return request(LIST_JOBS_REQUEST);
+        }
+
         /**
          * Sets the {@code retryPolicy} to be an
          * {@link 
org.apache.cassandra.sidecar.client.retry.ExponentialBackoffRetryPolicy} 
configured with
diff --git 
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java 
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
index 1925e0ad..9e69432c 100644
--- 
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
+++ 
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
@@ -51,8 +51,10 @@ import 
org.apache.cassandra.sidecar.common.request.data.UpdateRestoreJobRequestP
 import 
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
 import org.apache.cassandra.sidecar.common.response.GossipInfoResponse;
 import org.apache.cassandra.sidecar.common.response.HealthResponse;
+import 
org.apache.cassandra.sidecar.common.response.ListOperationalJobsResponse;
 import org.apache.cassandra.sidecar.common.response.ListSnapshotFilesResponse;
 import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
 import org.apache.cassandra.sidecar.common.response.RingResponse;
 import org.apache.cassandra.sidecar.common.response.SSTableImportResponse;
 import org.apache.cassandra.sidecar.common.response.SchemaResponse;
@@ -582,15 +584,46 @@ public class SidecarClient implements AutoCloseable, 
SidecarClientBlobRestoreExt
     }
 
     /**
-     * Executes the connected client stats request using the default retry 
policy and configured selection policy
+     * Executes the connected client stats request using the default retry 
policy and provided {@code instance}.
      *
+     * @param instance the instance where the request will be executed
      * @return a completable future of the connected client stats
      */
-    public CompletableFuture<ConnectedClientStatsResponse> 
connectedClientStats()
+    public CompletableFuture<ConnectedClientStatsResponse> 
connectedClientStats(SidecarInstance instance)
     {
-        return executeRequestAsync(requestBuilder()
-                                   .connectedClientStatsRequest()
-                                   .build());
+        return executor.executeRequestAsync(requestBuilder()
+                                            
.singleInstanceSelectionPolicy(instance)
+                                            .connectedClientStatsRequest()
+                                            .build());
+    }
+
+    /**
+     * Executes the operational job request using the default retry policy and 
provided {@code instance}.
+     *
+     * @param instance the instance where the request will be executed
+     * @param jobId    the unique operational job identifier
+     * @return a completable future of the operational job response
+     */
+    public CompletableFuture<OperationalJobResponse> 
operationalJobs(SidecarInstance instance, UUID jobId)
+    {
+        return executor.executeRequestAsync(requestBuilder()
+                                            
.singleInstanceSelectionPolicy(instance)
+                                            .operationalJobRequest(jobId)
+                                            .build());
+    }
+
+    /**
+     * Executes the list operational jobs request using the default retry 
policy and provided {@code instance}.
+     *
+     * @param instance the instance where the request will be executed
+     * @return a completable future of the list of operational jobs
+     */
+    public CompletableFuture<ListOperationalJobsResponse> 
listOperationalJobs(SidecarInstance instance)
+    {
+        return executor.executeRequestAsync(requestBuilder()
+                                            
.singleInstanceSelectionPolicy(instance)
+                                            .listOperationalJobsRequest()
+                                            .build());
     }
 
     /**
diff --git 
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
 
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
index 8ac0bb3a..3933c066 100644
--- 
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
+++ 
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
@@ -59,6 +59,7 @@ import 
org.apache.cassandra.sidecar.client.request.RequestExecutorTest;
 import org.apache.cassandra.sidecar.client.retry.RetryAction;
 import org.apache.cassandra.sidecar.client.retry.RetryPolicy;
 import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
 import org.apache.cassandra.sidecar.common.data.RestoreJobSecrets;
 import org.apache.cassandra.sidecar.common.request.ImportSSTableRequest;
 import org.apache.cassandra.sidecar.common.request.NodeSettingsRequest;
@@ -69,8 +70,10 @@ import 
org.apache.cassandra.sidecar.common.request.data.XXHash32Digest;
 import 
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
 import org.apache.cassandra.sidecar.common.response.GossipInfoResponse;
 import org.apache.cassandra.sidecar.common.response.HealthResponse;
+import 
org.apache.cassandra.sidecar.common.response.ListOperationalJobsResponse;
 import org.apache.cassandra.sidecar.common.response.ListSnapshotFilesResponse;
 import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
 import org.apache.cassandra.sidecar.common.response.RingResponse;
 import org.apache.cassandra.sidecar.common.response.SSTableImportResponse;
 import org.apache.cassandra.sidecar.common.response.SchemaResponse;
@@ -89,6 +92,7 @@ import static 
io.netty.handler.codec.http.HttpResponseStatus.OK;
 import static io.netty.handler.codec.http.HttpResponseStatus.PARTIAL_CONTENT;
 import static 
org.apache.cassandra.sidecar.common.ApiEndpointsV1.JOB_ID_PATH_PARAM;
 import static 
org.apache.cassandra.sidecar.common.ApiEndpointsV1.KEYSPACE_PATH_PARAM;
+import static 
org.apache.cassandra.sidecar.common.ApiEndpointsV1.OPERATIONAL_JOB_ID_PATH_PARAM;
 import static 
org.apache.cassandra.sidecar.common.ApiEndpointsV1.TABLE_PATH_PARAM;
 import static 
org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32;
 import static 
org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32_SEED;
@@ -1266,6 +1270,56 @@ abstract class SidecarClientTest
         }
     }
 
+    @Test
+    public void testOperationalJobs() throws Exception
+    {
+        UUID jobId = UUID.randomUUID();
+        String jobStatusAsString = "{\"jobId\":\"" + jobId + 
"\",\"jobStatus\":\"RUNNING\",\"operation\":\"test\"}";
+
+        MockResponse response = new MockResponse()
+                                .setResponseCode(OK.code())
+                                .setHeader("content-type", "application/json")
+                                .setBody(jobStatusAsString);
+        enqueue(response);
+
+        for (MockWebServer server : servers)
+        {
+            SidecarInstanceImpl sidecarInstance = 
RequestExecutorTest.newSidecarInstance(server);
+            OperationalJobResponse result = 
client.operationalJobs(sidecarInstance, jobId).get(30, TimeUnit.SECONDS);
+            assertThat(result).isNotNull();
+            assertThat(result.jobId()).isEqualTo(jobId);
+            
assertThat(result.status()).isEqualTo(OperationalJobStatus.RUNNING);
+            assertThat(result.operation()).isEqualTo("test");
+            validateResponseServed(server,
+                                   
ApiEndpointsV1.OPERATIONAL_JOB_ROUTE.replaceAll(OPERATIONAL_JOB_ID_PATH_PARAM, 
jobId.toString()),
+                                   req -> { });
+        }
+    }
+
+    @Test
+    public void testlistOperationalJobs() throws Exception
+    {
+        UUID jobId = UUID.randomUUID();
+        String listJobsString = "{\"jobs\":[{\"jobId\":\"" + jobId + 
"\",\"status\":\"RUNNING\",\"failureReason\":\"\",\"operation\":\"test\"}]}";
+
+        MockResponse response = new MockResponse()
+                                .setResponseCode(OK.code())
+                                .setHeader("content-type", "application/json")
+                                .setBody(listJobsString);
+
+        enqueue(response);
+
+        for (MockWebServer server : servers)
+        {
+            SidecarInstanceImpl sidecarInstance = 
RequestExecutorTest.newSidecarInstance(server);
+            ListOperationalJobsResponse result = 
client.listOperationalJobs(sidecarInstance).get(30, TimeUnit.SECONDS);
+            assertThat(result).isNotNull();
+            assertThat(result.jobs()).isNotNull();
+            assertThat(result.jobs().get(0).jobId()).isEqualTo(jobId);
+            validateResponseServed(server, 
ApiEndpointsV1.LIST_OPERATIONAL_JOBS_ROUTE, req -> { });
+        }
+    }
+
     @Test
     void testFailsWithOneAttemptPerServer()
     {
@@ -1416,28 +1470,33 @@ abstract class SidecarClientTest
 
         MockResponse response = new 
MockResponse().setResponseCode(OK.code()).setBody(connectedClientStatsResponseAsString);
         enqueue(response);
-        ConnectedClientStatsResponse result = 
client.connectedClientStats().get();
 
-        assertThat(result).isNotNull();
-        assertThat(result.clientConnections()).isNotNull().hasSize(1);
-        assertThat(result.totalConnectedClients()).isNotNull().isEqualTo(1);
-        
assertThat(result.connectionsByUser()).isNotNull().containsKey("anonymous");
-        ClientConnectionEntry entry = 
result.clientConnections().iterator().next();
-        assertThat(entry.address()).isEqualTo("127.0.0.1");
-        assertThat(entry.port()).isEqualTo(54628);
-        assertThat(entry.sslEnabled()).isEqualTo(false);
-        
assertThat(entry.sslCipherSuite()).isEqualTo("TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256");
-        assertThat(entry.sslProtocol()).isEqualTo("TLSv1.2");
-        assertThat(entry.protocolVersion()).isEqualTo("5");
-        assertThat(entry.username()).isEqualTo("anonymous");
-        assertThat(entry.requestCount()).isEqualTo(39);
-        assertThat(entry.driverName()).isEqualTo("DataStax Java Driver");
-        assertThat(entry.driverVersion()).isEqualTo("3.11.3");
-        assertThat(entry.keyspaceName()).isEqualTo("test");
-        assertThat(entry.authenticationMode()).isEqualTo("MutualTls");
-        assertThat(entry.authenticationMetadata()).containsKey("identity");
-        assertThat(entry.clientOptions()).containsKeys("CQL_VERSION", 
"DRIVER_NAME", "DRIVER_VERSION");
-        validateResponseServed(ApiEndpointsV1.CONNECTED_CLIENT_STATS_ROUTE);
+        for (MockWebServer server : servers)
+        {
+            SidecarInstanceImpl sidecarInstance = 
RequestExecutorTest.newSidecarInstance(server);
+            ConnectedClientStatsResponse result = 
client.connectedClientStats(sidecarInstance).get();
+
+            assertThat(result).isNotNull();
+            assertThat(result.clientConnections()).isNotNull().hasSize(1);
+            
assertThat(result.totalConnectedClients()).isNotNull().isEqualTo(1);
+            
assertThat(result.connectionsByUser()).isNotNull().containsKey("anonymous");
+            ClientConnectionEntry entry = 
result.clientConnections().iterator().next();
+            assertThat(entry.address()).isEqualTo("127.0.0.1");
+            assertThat(entry.port()).isEqualTo(54628);
+            assertThat(entry.sslEnabled()).isEqualTo(false);
+            
assertThat(entry.sslCipherSuite()).isEqualTo("TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256");
+            assertThat(entry.sslProtocol()).isEqualTo("TLSv1.2");
+            assertThat(entry.protocolVersion()).isEqualTo("5");
+            assertThat(entry.username()).isEqualTo("anonymous");
+            assertThat(entry.requestCount()).isEqualTo(39);
+            assertThat(entry.driverName()).isEqualTo("DataStax Java Driver");
+            assertThat(entry.driverVersion()).isEqualTo("3.11.3");
+            assertThat(entry.keyspaceName()).isEqualTo("test");
+            assertThat(entry.authenticationMode()).isEqualTo("MutualTls");
+            assertThat(entry.authenticationMetadata()).containsKey("identity");
+            assertThat(entry.clientOptions()).containsKeys("CQL_VERSION", 
"DRIVER_NAME", "DRIVER_VERSION");
+            validateResponseServed(server, 
ApiEndpointsV1.CONNECTED_CLIENT_STATS_ROUTE, req -> { });
+        }
     }
 
     private void enqueue(MockResponse response)
@@ -1459,18 +1518,28 @@ abstract class SidecarClientTest
     {
         for (MockWebServer server : servers)
         {
-            if (server.getRequestCount() > 0)
+            if (validateResponseServed(server, expectedEndpointPath, 
serverReceivedRequestVerifier))
             {
-                assertThat(server.getRequestCount()).isEqualTo(1);
-                RecordedRequest request = server.takeRequest();
-                serverReceivedRequestVerifier.accept(request);
-                assertThat(request.getPath()).isEqualTo(expectedEndpointPath);
                 return;
             }
         }
         fail("The request was not served by any of the provided servers");
     }
 
+    private boolean validateResponseServed(MockWebServer server, String 
expectedEndpointPath, Consumer<RecordedRequest> serverReceivedRequestVerifier)
+    throws InterruptedException
+    {
+        if (server.getRequestCount() > 0)
+        {
+            assertThat(server.getRequestCount()).isEqualTo(1);
+            RecordedRequest request = server.takeRequest();
+            serverReceivedRequestVerifier.accept(request);
+            assertThat(request.getPath()).isEqualTo(expectedEndpointPath);
+            return true;
+        }
+        return false;
+    }
+
     private InputStream resourceInputStream(String name)
     {
         InputStream inputStream = 
getClass().getClassLoader().getResourceAsStream(name);
diff --git 
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/exceptions/OperationalJobException.java
 
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/exceptions/OperationalJobException.java
new file mode 100644
index 00000000..7e29bf40
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/exceptions/OperationalJobException.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.server.exceptions;
+
+import org.apache.cassandra.sidecar.common.server.utils.ThrowableUtils;
+
+/**
+ * Exception thrown when an operational job conflict is detected
+ */
+public class OperationalJobException extends RuntimeException
+{
+    public static OperationalJobException wraps(Throwable throwable)
+    {
+        // extract the OperationalJobException from the stacktrace, if exists
+        // otherwise, wraps the throwable as OperationalJobException
+        OperationalJobException oje = ThrowableUtils.getCause(throwable, 
OperationalJobException.class);
+        if (oje != null)
+        {
+            return oje;
+        }
+        else
+        {
+            return new OperationalJobException(throwable.getMessage(), 
throwable);
+        }
+    }
+
+    public OperationalJobException(String message)
+    {
+        super(message);
+    }
+
+    public OperationalJobException(String message, Throwable cause)
+    {
+        super(message, cause);
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/config/ServiceConfiguration.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/config/ServiceConfiguration.java
index a5150f79..341b3c04 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/config/ServiceConfiguration.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/config/ServiceConfiguration.java
@@ -89,6 +89,19 @@ public interface ServiceConfiguration
      */
     int serverVerticleInstances();
 
+    /**
+     * TODO: move operationalJob related configuration to its own class, when 
the number of configurable fields grows in the future
+     * @return the size of the operational job tracker LRU cache
+     */
+    int operationalJobTrackerSize();
+
+    /**
+     * @return the max wait time in milliseconds for operational job to run 
internally before returning the http response;
+     *         if the job finishes before the max wait time, it returns 
immediately on completion;
+     *         otherwise, a response indicating the job is still running is 
returned after the max wait time.
+     */
+    long operationalJobExecutionMaxWaitTimeInMillis();
+
     /**
      * @return the throttling configuration
      */
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/ServiceConfigurationImpl.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/ServiceConfigurationImpl.java
index 8f2c9879..0f442bb0 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/ServiceConfigurationImpl.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/ServiceConfigurationImpl.java
@@ -57,7 +57,11 @@ public class ServiceConfigurationImpl implements 
ServiceConfiguration
     public static final String ALLOWABLE_SKEW_IN_MINUTES_PROPERTY = 
"allowable_time_skew_in_minutes";
     public static final int DEFAULT_ALLOWABLE_SKEW_IN_MINUTES = 60;
     private static final String SERVER_VERTICLE_INSTANCES_PROPERTY = 
"server_verticle_instances";
+    private static final String OPERATIONAL_JOB_TRACKER_SIZE_PROPERTY = 
"operations_job_tracker_size";
+    private static final String 
OPERATIONAL_JOB_EXECUTION_MAX_WAIT_TIME_MILLIS_PROPERTY = 
"operations_job_sync_response_timeout";
     private static final int DEFAULT_SERVER_VERTICLE_INSTANCES = 1;
+    private static final int DEFAULT_OPERATIONAL_JOB_TRACKER_SIZE = 64;
+    private static final long 
DEFAULT_OPERATIONAL_JOB_EXECUTION_MAX_WAIT_TIME_MILLIS = 
TimeUnit.SECONDS.toMillis(5);
     public static final String THROTTLE_PROPERTY = "throttle";
     public static final String SSTABLE_UPLOAD_PROPERTY = "sstable_upload";
     public static final String SSTABLE_IMPORT_PROPERTY = "sstable_import";
@@ -102,6 +106,12 @@ public class ServiceConfigurationImpl implements 
ServiceConfiguration
     @JsonProperty(value = SERVER_VERTICLE_INSTANCES_PROPERTY, defaultValue = 
DEFAULT_SERVER_VERTICLE_INSTANCES + "")
     protected final int serverVerticleInstances;
 
+    @JsonProperty(value = OPERATIONAL_JOB_TRACKER_SIZE_PROPERTY, defaultValue 
= DEFAULT_OPERATIONAL_JOB_TRACKER_SIZE + "")
+    protected final int operationalJobTrackerSize;
+
+    @JsonProperty(value = 
OPERATIONAL_JOB_EXECUTION_MAX_WAIT_TIME_MILLIS_PROPERTY)
+    protected final long operationalJobExecutionMaxWaitTimeMillis;
+
     @JsonProperty(value = THROTTLE_PROPERTY)
     protected final ThrottleConfiguration throttleConfiguration;
 
@@ -152,6 +162,8 @@ public class ServiceConfigurationImpl implements 
ServiceConfiguration
         acceptBacklog = builder.acceptBacklog;
         allowableSkewInMinutes = builder.allowableSkewInMinutes;
         serverVerticleInstances = builder.serverVerticleInstances;
+        operationalJobTrackerSize = builder.operationalJobTrackerSize;
+        operationalJobExecutionMaxWaitTimeMillis = 
builder.operationalJobExecutionMaxWaitTimeMillis;
         throttleConfiguration = builder.throttleConfiguration;
         sstableUploadConfiguration = builder.sstableUploadConfiguration;
         sstableImportConfiguration = builder.sstableImportConfiguration;
@@ -243,6 +255,26 @@ public class ServiceConfigurationImpl implements 
ServiceConfiguration
         return serverVerticleInstances;
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    @JsonProperty(value = OPERATIONAL_JOB_TRACKER_SIZE_PROPERTY)
+    public int operationalJobTrackerSize()
+    {
+        return operationalJobTrackerSize;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    @JsonProperty(value = 
OPERATIONAL_JOB_EXECUTION_MAX_WAIT_TIME_MILLIS_PROPERTY)
+    public long operationalJobExecutionMaxWaitTimeInMillis()
+    {
+        return operationalJobExecutionMaxWaitTimeMillis;
+    }
+
     /**
      * {@inheritDoc}
      */
@@ -352,6 +384,8 @@ public class ServiceConfigurationImpl implements 
ServiceConfiguration
         protected int acceptBacklog = DEFAULT_ACCEPT_BACKLOG;
         protected int allowableSkewInMinutes = 
DEFAULT_ALLOWABLE_SKEW_IN_MINUTES;
         protected int serverVerticleInstances = 
DEFAULT_SERVER_VERTICLE_INSTANCES;
+        protected int operationalJobTrackerSize = 
DEFAULT_OPERATIONAL_JOB_TRACKER_SIZE;
+        protected long operationalJobExecutionMaxWaitTimeMillis = 
DEFAULT_OPERATIONAL_JOB_EXECUTION_MAX_WAIT_TIME_MILLIS;
         protected ThrottleConfiguration throttleConfiguration = new 
ThrottleConfigurationImpl();
         protected SSTableUploadConfiguration sstableUploadConfiguration = new 
SSTableUploadConfigurationImpl();
         protected SSTableImportConfiguration sstableImportConfiguration = new 
SSTableImportConfigurationImpl();
@@ -461,6 +495,29 @@ public class ServiceConfigurationImpl implements 
ServiceConfiguration
             return update(b -> b.serverVerticleInstances = 
serverVerticleInstances);
         }
 
+        /**
+         * Sets the {@code operationalJobTrackerSize} and returns a reference 
to this Builder enabling method chaining.
+         *
+         * @param operationalJobTrackerSize the {@code 
operationalJobTrackerSize} to set
+         * @return a reference to this Builder
+         */
+        public Builder operationalJobTrackerSize(int operationalJobTrackerSize)
+        {
+            return update(b -> b.operationalJobTrackerSize = 
operationalJobTrackerSize);
+        }
+
+        /**
+         * Sets the {@code operationalJobExecutionMaxWaitTimeMillis} and 
returns a reference to this Builder
+         * enabling method chaining.
+         *
+         * @param operationalJobExecutionMaxWaitTimeMillis the {@code 
operationalJobExecutionMaxWaitTimeMillis} to set
+         * @return a reference to this Builder
+         */
+        public Builder operationalJobExecutionMaxWaitTimeMillis(int 
operationalJobExecutionMaxWaitTimeMillis)
+        {
+            return update(b -> b.operationalJobExecutionMaxWaitTimeMillis = 
operationalJobExecutionMaxWaitTimeMillis);
+        }
+
         /**
          * Sets the {@code throttleConfiguration} and returns a reference to 
this Builder enabling method chaining.
          *
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/exceptions/OperationalJobConflictException.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/exceptions/OperationalJobConflictException.java
new file mode 100644
index 00000000..94316fb4
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/exceptions/OperationalJobConflictException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.exceptions;
+
+import 
org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException;
+
+/**
+ * When the operational job to be submitted is conflicting with another one 
that is running on Cassandra
+ */
+public class OperationalJobConflictException extends OperationalJobException
+{
+    public OperationalJobConflictException(String message)
+    {
+        super(message);
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java 
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java
new file mode 100644
index 00000000..1076b44f
--- /dev/null
+++ b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.utils.UUIDs;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import 
org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException;
+import org.apache.cassandra.sidecar.common.utils.Preconditions;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.tasks.Task;
+
+/**
+ * An abstract class representing operational jobs that run on Cassandra
+ */
+public abstract class OperationalJob implements Task<Void>
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(OperationalJob.class);
+
+    // use v1 time-based uuid
+    public final UUID jobId;
+
+    private final Promise<Void> executionPromise;
+    private volatile boolean isExecuting = false;
+
+    /**
+     * Constructs a job with a unique UUID, in Pending state
+     *
+     * @param jobId UUID representing the Job to be created
+     */
+    protected OperationalJob(UUID jobId)
+    {
+        Preconditions.checkArgument(jobId.version() == 1, "OperationalJob 
accepts only time-based UUID");
+        this.jobId = jobId;
+        this.executionPromise = Promise.promise();
+    }
+
+    @Override
+    public final Void result()
+    {
+        // Update when operational jobs can contain result rather than status
+        throw new UnsupportedOperationException("No result is expected from an 
OperationalJob");
+    }
+
+    /**
+     * @return unix timestamp of the job creation time in milliseconds
+     */
+    public long creationTime()
+    {
+        return UUIDs.unixTimestamp(jobId);
+    }
+
+    /**
+     * @return whether the operational job is executing or not.
+     */
+    public boolean isExecuting()
+    {
+        return isExecuting;
+    }
+
+    /**
+     * Determine whether the operational job is stale by considering both the 
referenceTimestampInMillis and the ttlInMillis
+     *
+     * @return true if the job's life duration has exceeded ttlInMillis; 
otherwise, false
+     */
+    public boolean isStale(long referenceTimestampInMillis, long ttlInMillis)
+    {
+        long createdAt = creationTime();
+        Preconditions.checkArgument(referenceTimestampInMillis >= createdAt, 
"Invalid referenceTimestampInMillis");
+        Preconditions.checkArgument(ttlInMillis >= 0, "Invalid ttlInMillis");
+        return referenceTimestampInMillis - createdAt > ttlInMillis;
+    }
+
+    /**
+     * Determines the status of the job. OperationalJob subclasses could 
choose to override the method.
+     * <p>
+     * For long-lived jobs, the implementations should return the {@link 
OperationalJobStatus#RUNNING} status intelligently.
+     * The condition of {@link OperationalJobStatus#RUNNING} is 
implementation-specific.
+     * For example, node decommission is tracked by the operationMode exposed 
from Cassandra.
+     * If the operationMode is LEAVING, the corresponding OperationalJob is 
{@link OperationalJobStatus#RUNNING}.
+     * In this case, even if the OperationalJobStatus determined from this 
method is {@link OperationalJobStatus#CREATED},
+     * the concrete implementation can override and return {@link 
OperationalJobStatus#RUNNING}.
+     * <p>
+     * For short-lived jobs, i.e. the result is known right away, the 
implementations do not return the {@link OperationalJobStatus#RUNNING} status.
+     * They return either {@link OperationalJobStatus#SUCCEEDED} or {@link 
OperationalJobStatus#FAILED}
+     *
+     * @return status of the OperationalJob execution
+     */
+    public OperationalJobStatus status()
+    {
+        Future<Void> fut = asyncResult();
+        if (!isExecuting)
+        {
+            return OperationalJobStatus.CREATED;
+        }
+        if (!fut.isComplete())
+        {
+            return OperationalJobStatus.RUNNING;
+        }
+        else if (fut.failed())
+        {
+            return OperationalJobStatus.FAILED;
+        }
+        else
+        {
+            return OperationalJobStatus.SUCCEEDED;
+        }
+    }
+
+    public Future<Void> asyncResult()
+    {
+        return executionPromise.future();
+    }
+
+    /**
+     * Get the async result with waiting for at most the specified wait time
+     * <p>
+     * Note: This call does not block the calling thread.
+     * The call-site should handle the possible failed future with {@link 
TimeoutException} from this method.
+     *
+     * @param executorPool executor pool to run the timer
+     * @param waitTime     maximum time to wait before returning
+     * @return the async result or a failed future of {@link 
OperationalJobException} with the cause
+     * {@link TimeoutException} after exceeding the wait time
+     */
+    public Future<Void> asyncResult(TaskExecutorPool executorPool, Duration 
waitTime)
+    {
+        Future<Void> resultFut = asyncResult();
+        if (resultFut.isComplete())
+        {
+            return resultFut;
+        }
+
+        // complete the max wait time promise either when exceeding the wait 
time, or the result is available
+        Promise<Boolean> maxWaitTimePromise = Promise.promise();
+        executorPool.setTimer(waitTime.toMillis(), d -> 
maxWaitTimePromise.tryComplete(true)); // complete with true, meaning timeout
+        resultFut.onComplete(res -> maxWaitTimePromise.tryComplete(false)); // 
complete with false, meaning not timeout
+        Future<Boolean> maxWaitTimeFut = maxWaitTimePromise.future();
+        // Completes as soon as any future succeeds, or when all futures fail. 
Note that maxWaitTimePromise is
+        // closed as soon as resultFut completes
+        return Future.any(maxWaitTimeFut, resultFut)
+                     // We want to return the result when applicable, of 
course.
+                     // If this lambda below is evaluated, both futures are 
completed;
+                     // Depending on whether timeout flag is set, it either 
throws or complete with result
+                     .compose(f -> {
+                         boolean isTimeout = maxWaitTimeFut.result();
+                         if (isTimeout)
+                         {
+                             return Future.succeededFuture();
+                         }
+                         // otherwise, the result of the job is available
+                         return resultFut;
+                     });
+    }
+
+    /**
+     * OperationalJob body. The implementation is executed in a blocking 
manner.
+     *
+     * @throws OperationalJobException OperationalJobException that wraps job 
failure
+     */
+    protected abstract void executeInternal() throws OperationalJobException;
+
+    /**
+     * Execute the job behavior as specified in the internal execution {@link 
#executeInternal()},
+     * while tracking the status of the job's lifecycle.
+     */
+    @Override
+    public void execute(Promise<Void> promise)
+    {
+        isExecuting = true;
+        LOGGER.info("Executing job. jobId={}", jobId);
+        try
+        {
+            // Blocking call to perform concrete job-specific execution, 
returning the status
+            executeInternal();
+            executionPromise.tryComplete();
+            promise.tryComplete();
+            if (LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("Complete job execution. jobId={} status={}", 
jobId, status());
+            }
+        }
+        catch (Throwable e)
+        {
+            OperationalJobException oje = OperationalJobException.wraps(e);
+            LOGGER.error("Job execution failed. jobId={} reason='{}'", jobId, 
oje.getMessage(), oje);
+            executionPromise.tryFail(oje);
+            promise.tryFail(oje);
+        }
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java
new file mode 100644
index 00000000..034fd567
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import javax.inject.Inject;
+
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
+
+/**
+ * An abstraction of the management and tracking of long-running jobs running 
on the sidecar.
+ */
+@Singleton
+public class OperationalJobManager
+{
+    private final OperationalJobTracker jobTracker;
+
+    /**
+     * Creates a manager instance with a default sized job-tracker.
+     *
+     * @param jobTracker the tracker for the operational jobs
+     */
+    @Inject
+    public OperationalJobManager(OperationalJobTracker jobTracker)
+    {
+        this.jobTracker = jobTracker;
+    }
+
+    /**
+     * Fetches the inflight jobs being tracked on the sidecar
+     *
+     * @return instances of the jobs that are in pending or running states
+     */
+    public List<OperationalJob> allInflightJobs()
+    {
+        return jobTracker.getJobsView().values()
+                         .stream()
+                         .filter(j -> !j.asyncResult().isComplete())
+                         .collect(Collectors.toList());
+    }
+
+    /**
+     * Fetch the job using its UUID
+     *
+     * @param jobId identifier of the job
+     * @return instance of the job or null
+     */
+    public OperationalJob getJobIfExists(UUID jobId)
+    {
+        return jobTracker.get(jobId);
+    }
+
+    /**
+     * Try to submit the job to execute asynchronously, if it is not currently 
being
+     * tracked and not running. The job is triggered on a separate internal 
thread-pool.
+     * The job execution failure behavior is tracked within the {@link 
OperationalJob}.
+     *
+     * @param job OperationalJob instance to submit
+     * @return OperationalJob instance that is submitted
+     * @throws OperationalJobConflictException when the same operational job 
is already running on Cassandra
+     */
+    public OperationalJob trySubmitJob(OperationalJob job) throws 
OperationalJobConflictException
+    {
+        checkConflict(job);
+
+        // New job is submitted for all cases when we do not have a 
corresponding downstream job
+        return jobTracker.computeIfAbsent(job.jobId, jobId -> job);
+    }
+
+    private void checkConflict(OperationalJob job) throws 
OperationalJobConflictException
+    {
+        // The job is not yet submitted (and running), but its status 
indicates that there is an identical job running on Cassandra already
+        // In this case, this job submission is rejected.
+        if (job.status() == OperationalJobStatus.RUNNING)
+        {
+            throw new OperationalJobConflictException("The same operational 
job is already running on Cassandra. operationName='" + job.name() + '\'');
+        }
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
new file mode 100644
index 00000000..649a8a0b
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * Tracks and stores the results of long-running jobs running on the sidecar
+ */
+@Singleton
+public class OperationalJobTracker
+{
+    public static final long ONE_DAY_TTL = TimeUnit.DAYS.toMillis(1); // todo: 
consider making it configurable
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(OperationalJobTracker.class);
+    private final Map<UUID, OperationalJob> map;
+
+    @Inject
+    public OperationalJobTracker(ServiceConfiguration serviceConfiguration)
+    {
+        this(serviceConfiguration.operationalJobTrackerSize());
+    }
+
+    public OperationalJobTracker(int initialCapacity)
+    {
+        map = Collections.synchronizedMap(new LinkedHashMap<UUID, 
OperationalJob>(initialCapacity)
+        {
+            /**
+             * {@inheritDoc}
+             */
+            @Override
+            protected boolean removeEldestEntry(Map.Entry<UUID, 
OperationalJob> eldest)
+            {
+                // We have reached capacity and the oldest entry is either 
ready for cleanup or stale
+                if (map.size() > initialCapacity)
+                {
+                    OperationalJob job = eldest.getValue();
+                    if (job.status().isCompleted() && 
job.isStale(System.currentTimeMillis(), ONE_DAY_TTL))
+                    {
+                        LOGGER.debug("Expiring completed and stale job due to 
job tracker has reached max size. jobId={} status={} createdAt={}",
+                                     job.jobId, job.status(), 
job.creationTime());
+                        return true;
+                    }
+                    else
+                    {
+                        LOGGER.warn("Job tracker reached max size, but the 
eldest job is not completed yet. " +
+                                    "Not evicting. jobId={} status={}", 
job.jobId, job.status());
+                        // TODO: Optionally trigger cleanup to fetch next 
oldest to evict
+                    }
+                }
+
+                return false;
+            }
+        });
+    }
+
+    public OperationalJob computeIfAbsent(UUID key, Function<UUID, 
OperationalJob> mappingFunction)
+    {
+        return map.computeIfAbsent(key, mappingFunction);
+    }
+
+    public OperationalJob get(UUID key)
+    {
+        return map.get(key);
+    }
+
+    /**
+     * Returns an immutable copy of the underlying map, to provide a 
consistent view of the map, minimizing contention
+     *
+     * @return an immutable copy of the underlying mapping
+     */
+    @NotNull
+    Map<UUID, OperationalJob> getJobsView()
+    {
+        return Collections.unmodifiableMap(map);
+    }
+
+    @VisibleForTesting
+    OperationalJob put(OperationalJob job)
+    {
+        return map.put(job.jobId, job);
+    }
+
+    @VisibleForTesting
+    int size()
+    {
+        return map.size();
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/routes/ListOperationalJobsHandler.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/routes/ListOperationalJobsHandler.java
new file mode 100644
index 00000000..ba7a24dc
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/routes/ListOperationalJobsHandler.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import javax.inject.Inject;
+
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import 
org.apache.cassandra.sidecar.common.response.ListOperationalJobsResponse;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.job.OperationalJobManager;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
+
+/**
+ * Handler for retrieving the all the jobs running on the sidecar
+ */
+public class ListOperationalJobsHandler extends AbstractHandler<Void>
+{
+    private final OperationalJobManager jobManager;
+
+    @Inject
+    public ListOperationalJobsHandler(InstanceMetadataFetcher metadataFetcher,
+                                      ExecutorPools executorPools,
+                                      CassandraInputValidator validator,
+                                      OperationalJobManager jobManager)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.jobManager = jobManager;
+    }
+
+    @Override
+    protected Void extractParamsOrThrow(RoutingContext context)
+    {
+        return null;
+    }
+
+    @Override
+    protected void handleInternal(RoutingContext context, HttpServerRequest 
httpRequest, String host, SocketAddress remoteAddress, Void request)
+    {
+        ListOperationalJobsResponse listResponse = new 
ListOperationalJobsResponse();
+        jobManager.allInflightJobs()
+                  .stream()
+                  .map(job -> new OperationalJobResponse(job.jobId, RUNNING, 
job.name(), null))
+                  .forEach(listResponse::addJob);
+        context.json(listResponse);
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/routes/OperationalJobHandler.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/routes/OperationalJobHandler.java
new file mode 100644
index 00000000..162903af
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/routes/OperationalJobHandler.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.UUID;
+import javax.inject.Inject;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.job.OperationalJob;
+import org.apache.cassandra.sidecar.job.OperationalJobManager;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+import static 
org.apache.cassandra.sidecar.common.ApiEndpointsV1.OPERATIONAL_JOB_ID_PATH_PARAM;
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED;
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+
+/**
+ * Handler for retrieving the status of async operational jobs running on the 
sidecar
+ */
+public class OperationalJobHandler extends AbstractHandler<Void>
+{
+    private final OperationalJobManager jobManager;
+
+    @Inject
+    public OperationalJobHandler(InstanceMetadataFetcher metadataFetcher,
+                                 ExecutorPools executorPools,
+                                 CassandraInputValidator validator,
+                                 OperationalJobManager jobManager)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.jobManager = jobManager;
+    }
+
+    @Override
+    protected Void extractParamsOrThrow(RoutingContext context)
+    {
+        return null;
+    }
+
+    @Override
+    public void handleInternal(RoutingContext context, HttpServerRequest 
httpRequest, String host, SocketAddress remoteAddress, Void request)
+    {
+        UUID jobId = validatedJobIdParam(context);
+        executorPools.service()
+                     .executeBlocking(() -> {
+                         OperationalJob job = jobManager.getJobIfExists(jobId);
+                         if (job == null)
+                         {
+                             logger.info("No operational job found with the 
jobId. jobId={}", jobId);
+                             throw 
wrapHttpException(HttpResponseStatus.NOT_FOUND,
+                                                     String.format("Unknown 
job with ID: %s. Please retry the operation.", jobId));
+                         }
+                         return job;
+                     })
+                     .onFailure(cause -> processFailure(cause, context, host, 
remoteAddress, request))
+                     .onSuccess(job -> sendStatusBasedResponse(context, jobId, 
job));
+    }
+
+    UUID validatedJobIdParam(RoutingContext context)
+    {
+        String requestJobId = 
context.pathParam(OPERATIONAL_JOB_ID_PATH_PARAM.substring(1));
+        if (requestJobId == null)
+        {
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST,
+                                    OPERATIONAL_JOB_ID_PATH_PARAM + " is 
required but not supplied");
+        }
+
+        UUID jobId;
+        try
+        {
+            jobId = UUID.fromString(requestJobId);
+        }
+        catch (IllegalArgumentException e)
+        {
+            logger.info("Invalid jobId. jobId={}", requestJobId);
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, 
String.format("Invalid job ID provided :%s.", requestJobId));
+        }
+        return jobId;
+    }
+
+    public void sendStatusBasedResponse(RoutingContext context, UUID jobId, 
OperationalJob job)
+    {
+        OperationalJobStatus status = job.status();
+        if (status.isCompleted())
+        {
+            context.response().setStatusCode(HttpResponseStatus.OK.code());
+        }
+        else
+        {
+            
context.response().setStatusCode(HttpResponseStatus.ACCEPTED.code());
+        }
+
+        String reason = status == FAILED ? 
job.asyncResult().cause().getMessage() : null;
+        context.json(new OperationalJobResponse(jobId, status, job.name(), 
reason));
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java 
b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
index e135e6a1..31499527 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
@@ -99,6 +99,8 @@ import 
org.apache.cassandra.sidecar.routes.DiskSpaceProtectionHandler;
 import org.apache.cassandra.sidecar.routes.FileStreamHandler;
 import org.apache.cassandra.sidecar.routes.GossipInfoHandler;
 import org.apache.cassandra.sidecar.routes.JsonErrorHandler;
+import org.apache.cassandra.sidecar.routes.ListOperationalJobsHandler;
+import org.apache.cassandra.sidecar.routes.OperationalJobHandler;
 import org.apache.cassandra.sidecar.routes.RingHandler;
 import org.apache.cassandra.sidecar.routes.RoutingOrder;
 import org.apache.cassandra.sidecar.routes.SchemaHandler;
@@ -269,6 +271,8 @@ public class MainModule extends AbstractModule
                               CreateRestoreSliceHandler 
createRestoreSliceHandler,
                               RestoreJobProgressHandler 
restoreJobProgressHandler,
                               ConnectedClientStatsHandler 
connectedClientStatsHandler,
+                              OperationalJobHandler operationalJobHandler,
+                              ListOperationalJobsHandler 
listOperationalJobsHandler,
                               ErrorHandler errorHandler)
     {
         Router router = Router.router(vertx);
@@ -362,6 +366,12 @@ public class MainModule extends AbstractModule
         router.get(ApiEndpointsV1.CONNECTED_CLIENT_STATS_ROUTE)
               .handler(connectedClientStatsHandler);
 
+        router.get(ApiEndpointsV1.OPERATIONAL_JOB_ROUTE)
+              .handler(operationalJobHandler);
+
+        router.get(ApiEndpointsV1.LIST_OPERATIONAL_JOBS_ROUTE)
+              .handler(listOperationalJobsHandler);
+
         router.get(ApiEndpointsV1.RING_ROUTE_PER_KEYSPACE)
               .handler(ringHandler);
 
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/tasks/PeriodicTask.java 
b/server/src/main/java/org/apache/cassandra/sidecar/tasks/PeriodicTask.java
index 7f7c53dc..6525b2a8 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/tasks/PeriodicTask.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/tasks/PeriodicTask.java
@@ -20,12 +20,10 @@ package org.apache.cassandra.sidecar.tasks;
 
 import java.util.concurrent.TimeUnit;
 
-import io.vertx.core.Promise;
-
 /**
  * An interface that defines a periodic task that will be executed during the 
lifecycle of Cassandra Sidecar
  */
-public interface PeriodicTask
+public interface PeriodicTask extends Task<Void>
 {
     /**
      * @return delay in the specified {@link #delayUnit()} for periodic task
@@ -56,18 +54,6 @@ public interface PeriodicTask
         return delayUnit();
     }
 
-    /**
-     * Defines the task body.
-     * The method can be considered as executing in a single thread.
-     *
-     * <br><b>NOTE:</b> the {@code promise} must be completed (as either 
succeeded or failed) at the end of the run.
-     * Failing to do so, the {@link PeriodicTaskExecutor} will not be able to 
schedule a new run.
-     * See {@link PeriodicTaskExecutor#executeInternal} for details.
-     *
-     * @param promise a promise when the execution completes
-     */
-    void execute(Promise<Void> promise);
-
     /**
      * Register the periodic task executor at the task. By default, it is 
no-op.
      * If the reference to the executor is needed, the concrete {@link 
PeriodicTask} can implement this method
@@ -88,21 +74,9 @@ public interface PeriodicTask
         return false;
     }
 
-    /**
-     * Close any resources it opened.
-     * Implementation note: it is encouraged to handle the exceptions during 
close()
-     */
-    default void close()
-    {
-    }
-
-    /**
-     * @return descriptive name of the task. It prefers simple class name, if 
it is non-empty;
-     * otherwise, it returns the full class name
-     */
-    default String name()
+    @Override
+    default Void result()
     {
-        String simpleName = this.getClass().getSimpleName();
-        return simpleName.isEmpty() ? this.getClass().getName() : simpleName;
+        throw new UnsupportedOperationException("No result is expected from a 
Periodic task");
     }
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/tasks/PeriodicTask.java 
b/server/src/main/java/org/apache/cassandra/sidecar/tasks/Task.java
similarity index 50%
copy from 
server/src/main/java/org/apache/cassandra/sidecar/tasks/PeriodicTask.java
copy to server/src/main/java/org/apache/cassandra/sidecar/tasks/Task.java
index 7f7c53dc..d63d3bbd 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/tasks/PeriodicTask.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/tasks/Task.java
@@ -18,75 +18,31 @@
 
 package org.apache.cassandra.sidecar.tasks;
 
-import java.util.concurrent.TimeUnit;
-
 import io.vertx.core.Promise;
+import org.jetbrains.annotations.Nullable;
 
 /**
- * An interface that defines a periodic task that will be executed during the 
lifecycle of Cassandra Sidecar
+ *  * An interface that defines a task that will be executed during the 
lifecycle of Cassandra Sidecar
+ * @param <T>
  */
-public interface PeriodicTask
+public interface Task<T>
 {
-    /**
-     * @return delay in the specified {@link #delayUnit()} for periodic task
-     */
-    long delay();
-
-    /**
-     * @return the unit for the {@link #delay()}, if not specified defaults to 
milliseconds
-     */
-    default TimeUnit delayUnit()
-    {
-        return TimeUnit.MILLISECONDS;
-    }
-
-    /**
-     * @return the initial delay for the task, defaults to the {@link #delay()}
-     */
-    default long initialDelay()
-    {
-        return delay();
-    }
-
-    /**
-     * @return the units for the {@link #initialDelay()}, if not specified 
defaults to {@link #delayUnit()}
-     */
-    default TimeUnit initialDelayUnit()
-    {
-        return delayUnit();
-    }
-
     /**
      * Defines the task body.
      * The method can be considered as executing in a single thread.
      *
      * <br><b>NOTE:</b> the {@code promise} must be completed (as either 
succeeded or failed) at the end of the run.
-     * Failing to do so, the {@link PeriodicTaskExecutor} will not be able to 
schedule a new run.
-     * See {@link PeriodicTaskExecutor#executeInternal} for details.
+     * Failing to do so, the executor will not be able to trigger a new run.
      *
      * @param promise a promise when the execution completes
      */
-    void execute(Promise<Void> promise);
+    void execute(Promise<T> promise);
 
     /**
-     * Register the periodic task executor at the task. By default, it is 
no-op.
-     * If the reference to the executor is needed, the concrete {@link 
PeriodicTask} can implement this method
-     *
-     * @param executor the executor that manages the task
+     * @return the result of task execution. Null is returned if there is no 
result
      */
-    default void registerPeriodicTaskExecutor(PeriodicTaskExecutor executor)
-    {
-    }
-
-    /**
-     * Specify whether the task should be skipped.
-     * // TODO: consider returning the reason to skip, instead of just a 
boolean value
-     * @return {@code true} to skip; otherwise, return {@code false}
-     */
-    default boolean shouldSkip()
-    {
-        return false;
-    }
+    @Nullable
+    T result();
 
     /**
      * Close any resources it opened.
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java
new file mode 100644
index 00000000..f2f8307c
--- /dev/null
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.time.Duration;
+import java.util.UUID;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.datastax.driver.core.utils.UUIDs;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import 
org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING;
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests to validate the Job submission behavior for scenarios which are a 
combination of values for
+ *
+ * <ul>
+ * <ol> 1) Downstream job existence,</ol>
+ * <ol> 2) Cached job (null (not in cache), Completed/Failed job, Running 
job), and</ol>
+ * <ol> 3) Request UUID (null (no header), UUID)</ol>
+ * </ul>
+ */
+class OperationalJobManagerTest
+{
+    @Mock
+    SidecarConfiguration mockConfig;
+
+    protected Vertx vertx;
+
+    @BeforeEach
+    void setup()
+    {
+        vertx = Vertx.vertx();
+        MockitoAnnotations.openMocks(this);
+        ServiceConfiguration mockServiceConfig = 
mock(ServiceConfiguration.class);
+        when(mockConfig.serviceConfiguration()).thenReturn(mockServiceConfig);
+        
when(mockServiceConfig.operationalJobExecutionMaxWaitTimeInMillis()).thenReturn(5000L);
+    }
+
+    @Test
+    void testWithNoDownstreamJob()
+    {
+        OperationalJobTracker tracker = new OperationalJobTracker(4);
+        OperationalJobManager manager = new OperationalJobManager(tracker);
+
+        OperationalJob testJob = 
OperationalJobTest.createOperationalJob(SUCCEEDED);
+        manager.trySubmitJob(testJob);
+        testJob.execute(Promise.promise());
+        assertThat(testJob.asyncResult().isComplete()).isTrue();
+        assertThat(testJob.status()).isEqualTo(SUCCEEDED);
+        assertThat(tracker.get(testJob.jobId)).isNotNull();
+    }
+
+    @Test
+    void testWithRunningDownstreamJob()
+    {
+        OperationalJob runningJob = 
OperationalJobTest.createOperationalJob(RUNNING);
+        OperationalJobTracker tracker = new OperationalJobTracker(4);
+        ExecutorPools mockPools = mock(ExecutorPools.class);
+        TaskExecutorPool mockExecPool = mock(TaskExecutorPool.class);
+        when(mockPools.internal()).thenReturn(mockExecPool);
+        when(mockExecPool.runBlocking(any())).thenReturn(null);
+        OperationalJobManager manager = new OperationalJobManager(tracker);
+        assertThatThrownBy(() -> manager.trySubmitJob(runningJob))
+        .isExactlyInstanceOf(OperationalJobConflictException.class)
+        .hasMessage("The same operational job is already running on Cassandra. 
operationName='Operation X'");
+    }
+
+    @Test
+    void testWithLongRunningJob()
+    {
+        UUID jobId = UUIDs.timeBased();
+
+        OperationalJobTracker tracker = new OperationalJobTracker(4);
+        OperationalJobManager manager = new OperationalJobManager(tracker);
+
+        OperationalJob testJob = 
OperationalJobTest.createOperationalJob(jobId, Duration.ofSeconds(10));
+
+        manager.trySubmitJob(testJob);
+        // execute the job async.
+        vertx.executeBlocking(testJob::execute);
+        // by the time of checking, the job should still be running. It runs 
for 10 seconds.
+        assertThat(testJob.asyncResult().isComplete()).isFalse();
+        assertThat(tracker.get(jobId)).isNotNull();
+    }
+
+    @Test
+    void testWithFailingJob()
+    {
+        UUID jobId = UUIDs.timeBased();
+
+        OperationalJobTracker tracker = new OperationalJobTracker(4);
+        OperationalJobManager manager = new OperationalJobManager(tracker);
+
+        String msg = "Test Job failed";
+        OperationalJob failingJob = new OperationalJob(jobId)
+        {
+            @Override
+            protected void executeInternal() throws OperationalJobException
+            {
+                throw new OperationalJobException(msg);
+            }
+        };
+
+        manager.trySubmitJob(failingJob);
+        failingJob.execute(Promise.promise());
+        assertThat(failingJob.asyncResult().isComplete()).isTrue();
+        assertThat(failingJob.asyncResult().failed()).isTrue();
+        assertThat(tracker.get(jobId)).isNotNull();
+    }
+}
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java 
b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java
new file mode 100644
index 00000000..9e2bd51f
--- /dev/null
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.Test;
+
+import com.datastax.driver.core.utils.UUIDs;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import 
org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
+
+import static org.apache.cassandra.sidecar.AssertionUtils.loopAssert;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests to validate the Job APIs
+ */
+class OperationalJobTest
+{
+    private final TaskExecutorPool executorPool = new 
ExecutorPools(Vertx.vertx(), new ServiceConfigurationImpl()).internal();
+
+    public static OperationalJob createOperationalJob(OperationalJobStatus 
jobStatus)
+    {
+        return createOperationalJob(UUIDs.timeBased(), jobStatus);
+    }
+
+    public static OperationalJob createOperationalJob(UUID jobId, 
OperationalJobStatus jobStatus)
+    {
+        return new OperationalJob(jobId)
+        {
+            @Override
+            protected void executeInternal() throws OperationalJobException
+            {
+            }
+
+            @Override
+            public OperationalJobStatus status()
+            {
+                return jobStatus;
+            }
+
+            @Override
+            public String name()
+            {
+                return "Operation X";
+            }
+        };
+    }
+
+    public static OperationalJob createOperationalJob(UUID jobId, Duration 
jobDuration)
+    {
+        return createOperationalJob(jobId, jobDuration, null);
+    }
+
+    public static OperationalJob createOperationalJob(UUID jobId, Duration 
jobDuration, OperationalJobException jobFailure)
+    {
+        return new OperationalJob(jobId)
+        {
+            @Override
+            protected void executeInternal() throws OperationalJobException
+            {
+                if (jobDuration != null)
+                {
+                    
Uninterruptibles.sleepUninterruptibly(jobDuration.toMillis(), 
TimeUnit.MILLISECONDS);
+                }
+
+                if (jobFailure != null)
+                {
+                    throw jobFailure;
+                }
+            }
+
+            @Override
+            public String name()
+            {
+                return "Operation X";
+            }
+        };
+    }
+
+    @Test
+    void testJobCompletion()
+    {
+        OperationalJob job = 
createOperationalJob(OperationalJobStatus.SUCCEEDED);
+        Promise<Void> p = Promise.promise();
+        job.execute(p);
+        Future<Void> future = p.future();
+        assertThat(future.succeeded()).isTrue();
+        assertThat(job.asyncResult().succeeded()).isTrue();
+        assertThat(job.status()).isEqualTo(OperationalJobStatus.SUCCEEDED);
+    }
+
+    @Test
+    void testJobFailed()
+    {
+        String msg = "Test Job failed";
+        OperationalJob failingJob = new OperationalJob(UUIDs.timeBased())
+        {
+            @Override
+            protected void executeInternal() throws OperationalJobException
+            {
+                throw new OperationalJobException(msg);
+            }
+        };
+
+        Promise<Void> p = Promise.promise();
+        failingJob.execute(p);
+
+        Future<Void> future = p.future();
+        assertThat(future.failed()).isTrue();
+        assertThat(future.cause())
+        .isExactlyInstanceOf(OperationalJobException.class)
+        .hasMessage(msg);
+        assertThat(failingJob.status()).isEqualTo(OperationalJobStatus.FAILED);
+        assertThat(failingJob.asyncResult().failed()).isTrue();
+        assertThat(failingJob.asyncResult().cause())
+        .isExactlyInstanceOf(OperationalJobException.class)
+        .hasMessage(msg);
+    }
+
+    @Test
+    void testGetAsyncResultInWaitTime()
+    {
+        OperationalJob longRunning = createOperationalJob(UUIDs.timeBased(), 
Duration.ofMillis(500L));
+        executorPool.executeBlocking(longRunning::execute);
+        Duration waitTime = Duration.ofSeconds(2);
+        Future<Void> result = longRunning.asyncResult(executorPool, waitTime);
+        // it should finish in around 500 ms.
+        loopAssert(1, () -> assertThat(result.succeeded()).isTrue());
+    }
+
+    @Test
+    void testGetFailedAsyncResultInWaitTime()
+    {
+        OperationalJobException jobFailure = new OperationalJobException("Job 
fails");
+        OperationalJob longButFailedJob = 
createOperationalJob(UUIDs.timeBased(), Duration.ofMillis(500L), jobFailure);
+        executorPool.executeBlocking(longButFailedJob::execute);
+        Duration waitTime = Duration.ofSeconds(2);
+        Future<Void> result = longButFailedJob.asyncResult(executorPool, 
waitTime);
+        // it should finish in around 500 ms.
+        loopAssert(1, () -> {
+            assertThat(result.failed()).isTrue();
+            assertThat(result.cause()).isEqualTo(jobFailure);
+        });
+    }
+
+    @Test
+    void testGetAsyncResultExceedsWaitTime()
+    {
+        OperationalJob longRunning = createOperationalJob(UUIDs.timeBased(), 
Duration.ofMillis(5000L));
+        executorPool.executeBlocking(longRunning::execute);
+        Duration waitTime = Duration.ofMillis(200L);
+        Future<Void> result = longRunning.asyncResult(executorPool, waitTime);
+        loopAssert(1, () -> {
+            // the composite future is completed in 200ms. The operational job 
is still running, so the isExecuting should return true too.
+            assertThat(result.succeeded()).isTrue();
+            assertThat(longRunning.isExecuting()).isTrue();
+        });
+    }
+}
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTrackerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTrackerTest.java
new file mode 100644
index 00000000..f6a49d4c
--- /dev/null
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTrackerTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.datastax.driver.core.utils.UUIDs;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import 
org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException;
+
+import static 
org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED;
+import static 
org.apache.cassandra.sidecar.job.OperationalJobTest.createOperationalJob;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Tests to validate job tracking
+ */
+class OperationalJobTrackerTest
+{
+    private OperationalJobTracker jobTracker;
+    private static final int trackerSize = 3;
+
+    OperationalJob job1 = createOperationalJob(SUCCEEDED);
+    OperationalJob job2 = createOperationalJob(SUCCEEDED);
+    OperationalJob job3 = createOperationalJob(SUCCEEDED);
+    OperationalJob job4 = createOperationalJob(SUCCEEDED);
+
+    long twoDaysAgo = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2);
+    OperationalJob jobWithStaleCreationTime = new 
OperationalJob(UUIDs.startOf(twoDaysAgo))
+    {
+        @Override
+        protected void executeInternal() throws OperationalJobException
+        {
+        }
+
+        @Override
+        public OperationalJobStatus status()
+        {
+            return SUCCEEDED;
+        }
+    };
+
+    @BeforeEach
+    void setUp()
+    {
+        jobTracker = new OperationalJobTracker(trackerSize);
+    }
+
+    @Test
+    void testPutAndGet()
+    {
+        jobTracker.put(job1);
+        jobTracker.put(job2);
+        assertThat(jobTracker.get(job1.jobId)).isSameAs(job1);
+        assertThat(jobTracker.get(job2.jobId)).isSameAs(job2);
+    }
+
+    @Test
+    void testComputeIfAbsent()
+    {
+        jobTracker.put(job1);
+        OperationalJob job = jobTracker.computeIfAbsent(job1.jobId, v -> job3);
+        assertThat(job).isNotSameAs(job3);
+        assertThat(job).isSameAs(job1);
+        assertThat(jobTracker.get(job1.jobId)).isSameAs(job1);
+    }
+
+    @Test
+    void testNoEviction()
+    {
+        jobTracker.put(job1);
+        jobTracker.put(job2);
+        jobTracker.put(job3);
+        jobTracker.put(job4);
+
+        assertThat(jobTracker.size())
+        .describedAs("Although the tracker initial size is 3, no job is 
evicted since all jobs are still running")
+        .isEqualTo(4);
+        assertThat(jobTracker.get(job1.jobId)).isNotNull();
+        assertThat(jobTracker.get(job2.jobId)).isNotNull();
+        assertThat(jobTracker.get(job3.jobId)).isNotNull();
+        assertThat(jobTracker.get(job4.jobId)).isNotNull();
+    }
+
+    @Test
+    void testRemoveEldestEntryEvictionOnExpiry()
+    {
+        jobTracker.put(jobWithStaleCreationTime);
+        jobTracker.put(job1);
+        jobTracker.put(job2);
+        jobTracker.put(job3);
+
+        assertThat(jobTracker.size()).isEqualTo(3);
+        assertThat(jobTracker.get(job1.jobId)).isNotNull();
+        assertThat(jobTracker.get(job2.jobId)).isNotNull();
+        assertThat(jobTracker.get(job3.jobId)).isNotNull();
+        assertThat(jobTracker.get(jobWithStaleCreationTime.jobId)).isNull();
+    }
+
+    @Test
+    void testGetViewImmutable()
+    {
+        // Test the immutable view returned by getView
+        jobTracker.put(job1);
+        jobTracker.put(job2);
+
+        Map<UUID, OperationalJob> view = jobTracker.getJobsView();
+        assertThat(view.size()).isEqualTo(2);
+        assertThatThrownBy(() -> view.put(job3.jobId, job3))
+        .isExactlyInstanceOf(UnsupportedOperationException.class);
+    }
+
+    @Test
+    void testConcurrentAccess() throws Exception
+    {
+        int one = 1;
+        long pastTimestamp = System.currentTimeMillis() - 
OperationalJobTracker.ONE_DAY_TTL - 1000L;
+        OperationalJobTracker tracker = new OperationalJobTracker(one);
+        ExecutorService executorService = 
Executors.newFixedThreadPool(trackerSize);
+        List<OperationalJob> sortedJobs = IntStream.range(0, trackerSize + 10)
+                                                   .boxed()
+                                                   .map(i -> 
createOperationalJob(UUIDs.startOf(pastTimestamp + i), SUCCEEDED))
+                                                   
.collect(Collectors.toList());
+        sortedJobs.forEach(tracker::put);
+        executorService.shutdown();
+        executorService.awaitTermination(5, TimeUnit.SECONDS);
+        assertThat(tracker.size()).isEqualTo(one);
+        assertThat(tracker.getJobsView().values().iterator().next())
+        .describedAs("Only the last job is kept")
+        .isSameAs(sortedJobs.get(sortedJobs.size() - 1));
+    }
+}
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/routes/ListOperationalJobsHandlerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/routes/ListOperationalJobsHandlerTest.java
new file mode 100644
index 00000000..57b9ff4b
--- /dev/null
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/routes/ListOperationalJobsHandlerTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.TestModule;
+import 
org.apache.cassandra.sidecar.common.response.ListOperationalJobsResponse;
+import 
org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException;
+import org.apache.cassandra.sidecar.job.OperationalJob;
+import org.apache.cassandra.sidecar.job.OperationalJobManager;
+import org.apache.cassandra.sidecar.server.MainModule;
+import org.apache.cassandra.sidecar.server.Server;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link ListOperationalJobsHandler}
+ */
+@ExtendWith(VertxExtension.class)
+class ListOperationalJobsHandlerTest
+{
+    static final Logger LOGGER = 
LoggerFactory.getLogger(ListOperationalJobsHandlerTest.class);
+    Vertx vertx;
+    Server server;
+
+    static UUID runningUuid = UUIDs.timeBased();
+    static UUID runningUuid2 = UUIDs.timeBased();
+
+    static SampleOperationalJob running = new 
SampleOperationalJob(runningUuid);
+    static SampleOperationalJob running2 = new 
SampleOperationalJob(runningUuid2);
+
+    @BeforeEach
+    void before() throws InterruptedException
+    {
+        Injector injector;
+        Module testOverride = Modules.override(new TestModule())
+                                     .with(new 
ListOperationalJobsHandlerTest.ListJobsTestModule());
+        injector = Guice.createInjector(Modules.override(new MainModule())
+                                               .with(testOverride));
+        vertx = injector.getInstance(Vertx.class);
+        server = injector.getInstance(Server.class);
+        VertxTestContext context = new VertxTestContext();
+        server.start()
+              .onSuccess(s -> context.completeNow())
+              .onFailure(context::failNow);
+        context.awaitCompletion(5, TimeUnit.SECONDS);
+    }
+
+    @AfterEach
+    void after() throws InterruptedException
+    {
+        CountDownLatch closeLatch = new CountDownLatch(1);
+        server.close().onSuccess(res -> closeLatch.countDown());
+        if (closeLatch.await(60, TimeUnit.SECONDS))
+            LOGGER.info("Close event received before timeout.");
+        else
+            LOGGER.error("Close event timed out.");
+    }
+
+    @Test
+    void testListJobs(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/api/v1/cassandra/operational-jobs";
+        client.get(server.actualPort(), "127.0.0.1", testRoute)
+              .expect(ResponsePredicate.SC_OK)
+              .send(context.succeeding(response -> {
+                  assertThat(response.statusCode()).isEqualTo(OK.code());
+                  ListOperationalJobsResponse listJobs = 
response.bodyAsJson(ListOperationalJobsResponse.class);
+                  assertThat(listJobs).isNotNull();
+                  assertThat(listJobs.jobs()).isNotNull();
+                  assertThat(listJobs.jobs().size()).isEqualTo(2);
+                  assertThat(listJobs.jobs().get(0).jobId()).isIn(runningUuid, 
runningUuid2);
+                  assertThat(listJobs.jobs().get(1).jobId()).isIn(runningUuid, 
runningUuid2);
+                  context.completeNow();
+              }));
+    }
+
+    static class ListJobsTestModule extends AbstractModule
+    {
+        @Provides
+        @Singleton
+        public OperationalJobManager jobManager()
+        {
+            List<OperationalJob> testJobs = Arrays.asList(running, running2);
+            OperationalJobManager mockManager = 
mock(OperationalJobManager.class);
+            when(mockManager.allInflightJobs()).thenReturn(testJobs);
+            return mockManager;
+        }
+    }
+
+    /**
+     * Concrete test implementation of the OperationalJob to be used by 
handler tests
+     */
+    public static class SampleOperationalJob extends OperationalJob
+    {
+        /**
+         * Constructs a job with a unique UUID, in Pending state
+         *
+         * @param jobId UUID representing the Job to be created
+         */
+        protected SampleOperationalJob(UUID jobId)
+        {
+            super(jobId);
+        }
+
+        @Override
+        protected void executeInternal() throws OperationalJobException
+        {
+        }
+    }
+}
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/routes/OperationalJobHandlerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/routes/OperationalJobHandlerTest.java
new file mode 100644
index 00000000..39becc6f
--- /dev/null
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/routes/OperationalJobHandlerTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.job.OperationalJob;
+import org.apache.cassandra.sidecar.job.OperationalJobManager;
+import org.apache.cassandra.sidecar.server.MainModule;
+import org.apache.cassandra.sidecar.server.Server;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED;
+import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link OperationalJobHandler}
+ */
+@ExtendWith(VertxExtension.class)
+class OperationalJobHandlerTest
+{
+    static final Logger LOGGER = 
LoggerFactory.getLogger(GossipInfoHandlerTest.class);
+    Vertx vertx;
+    Server server;
+
+    static UUID runningUuid = UUIDs.timeBased();
+    static UUID completedUuid = UUIDs.timeBased();
+    static UUID failedUuid = UUIDs.timeBased();
+
+    @BeforeEach
+    void before() throws InterruptedException
+    {
+        Injector injector;
+        Module testOverride = Modules.override(new TestModule())
+                                     .with(new 
OperationalJobsHandlerTestModule());
+        injector = Guice.createInjector(Modules.override(new MainModule())
+                                               .with(testOverride));
+        vertx = injector.getInstance(Vertx.class);
+        server = injector.getInstance(Server.class);
+        VertxTestContext context = new VertxTestContext();
+        server.start()
+              .onSuccess(s -> context.completeNow())
+              .onFailure(context::failNow);
+        context.awaitCompletion(5, TimeUnit.SECONDS);
+    }
+
+    @AfterEach
+    void after() throws InterruptedException
+    {
+        CountDownLatch closeLatch = new CountDownLatch(1);
+        server.close().onSuccess(res -> closeLatch.countDown());
+        if (closeLatch.await(60, TimeUnit.SECONDS))
+            LOGGER.info("Close event received before timeout.");
+        else
+            LOGGER.error("Close event timed out.");
+    }
+
+    @Test
+    void testGetJobStatusNonExistentJob(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String uuid = UUIDs.timeBased().toString();
+        String testRoute = "/api/v1/cassandra/operational-jobs/" + uuid;
+        client.get(server.actualPort(), "127.0.0.1", testRoute)
+              .expect(ResponsePredicate.SC_NOT_FOUND)
+              .send(context.succeeding(response -> {
+                  
assertThat(response.statusCode()).isEqualTo(NOT_FOUND.code());
+                  context.completeNow();
+              }));
+    }
+
+    @Test
+    void testGetJobStatusRunningJob(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/api/v1/cassandra/operational-jobs/" + runningUuid;
+        client.get(server.actualPort(), "127.0.0.1", testRoute)
+              .expect(ResponsePredicate.SC_ACCEPTED)
+              .send(context.succeeding(response -> {
+                  assertThat(response.statusCode()).isEqualTo(ACCEPTED.code());
+                  context.completeNow();
+              }));
+    }
+
+    @Test
+    void testGetJobStatusCompletedJob(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/api/v1/cassandra/operational-jobs/" + 
completedUuid;
+        client.get(server.actualPort(), "127.0.0.1", testRoute)
+              .expect(ResponsePredicate.SC_OK)
+              .send(context.succeeding(response -> {
+                  assertThat(response.statusCode()).isEqualTo(OK.code());
+                  OperationalJobResponse jobStatus = 
response.bodyAsJson(OperationalJobResponse.class);
+                  assertThat(jobStatus.jobId()).isEqualTo(completedUuid);
+                  
assertThat(jobStatus.status()).isEqualTo(OperationalJobStatus.SUCCEEDED);
+                  assertThat(jobStatus.operation()).isEqualTo("testCompleted");
+                  context.completeNow();
+              }));
+    }
+
+    @Test
+    void testGetJobStatusFailedJob(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/api/v1/cassandra/operational-jobs/" + failedUuid;
+        client.get(server.actualPort(), "127.0.0.1", testRoute)
+              .expect(ResponsePredicate.SC_OK)
+              .send(context.succeeding(response -> {
+                  assertThat(response.statusCode()).isEqualTo(OK.code());
+                  OperationalJobResponse jobStatus = 
response.bodyAsJson(OperationalJobResponse.class);
+                  assertThat(jobStatus.jobId()).isEqualTo(failedUuid);
+                  
assertThat(jobStatus.status()).isEqualTo(OperationalJobStatus.FAILED);
+                  assertThat(jobStatus.operation()).isEqualTo("testFailed");
+                  assertThat(jobStatus.reason()).isEqualTo("Test failed");
+
+                  context.completeNow();
+              }));
+    }
+
+    static class OperationalJobsHandlerTestModule extends AbstractModule
+    {
+        @Provides
+        @Singleton
+        public OperationalJobManager jobManager()
+        {
+            OperationalJobManager mockManager = 
mock(OperationalJobManager.class);
+            OperationalJob runningMock = mock(OperationalJob.class);
+            Promise<Void> p = Promise.promise();
+            
when(runningMock.status()).thenReturn(OperationalJobStatus.RUNNING);
+            when(runningMock.asyncResult()).thenReturn(p.future());
+            OperationalJob completedMock = mock(OperationalJob.class);
+            
when(completedMock.status()).thenReturn(OperationalJobStatus.SUCCEEDED);
+            when(completedMock.name()).thenReturn("testCompleted");
+            OperationalJob failedMock = mock(OperationalJob.class);
+            when(failedMock.status()).thenReturn(OperationalJobStatus.FAILED);
+            
when(failedMock.asyncResult()).thenReturn(Future.failedFuture("Test failed"));
+            when(failedMock.name()).thenReturn("testFailed");
+
+            
when(mockManager.getJobIfExists(runningUuid)).thenReturn(runningMock);
+            
when(mockManager.getJobIfExists(completedUuid)).thenReturn(completedMock);
+            
when(mockManager.getJobIfExists(failedUuid)).thenReturn(failedMock);
+            return mockManager;
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to