This is an automated email from the ASF dual-hosted git repository. frankgh pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push: new f45bbacc CASSSIDECAR-151: Adds sidecar endpoint for node decommissioning operation (#144) f45bbacc is described below commit f45bbacc42990e5196c927ed6169d82cd9faf5f6 Author: Arjun Ashok <arjun_as...@apple.com> AuthorDate: Wed Jan 15 15:11:04 2025 -0800 CASSSIDECAR-151: Adds sidecar endpoint for node decommissioning operation (#144) Patch by Arjun Ashok; Reviewed by Shailaja Koppu, Yifan Cai, Francisco Guerrero for CASSSIDECAR-151 --- CHANGES.txt | 1 + .../adapters/base/CassandraStorageOperations.java | 20 ++ .../base/GossipDependentStorageJmxOperations.java | 12 ++ .../adapters/base/StorageJmxOperations.java | 12 ++ .../cassandra/sidecar/common/ApiEndpointsV1.java | 2 +- .../common/request/NodeDecommissionRequest.java | 46 +++++ .../cassandra/sidecar/client/RequestContext.java | 13 ++ .../cassandra/sidecar/client/SidecarClient.java | 13 ++ .../sidecar/client/SidecarClientTest.java | 19 ++ .../sidecar/common/server/StorageOperations.java | 12 ++ .../cassandra/sidecar/job/NodeDecommissionJob.java | 100 ++++++++++ .../cassandra/sidecar/job/OperationalJob.java | 35 ++-- .../sidecar/job/OperationalJobManager.java | 30 ++- .../sidecar/job/OperationalJobTracker.java | 29 ++- .../sidecar/routes/ListOperationalJobsHandler.java | 2 +- .../sidecar/routes/NodeDecommissionHandler.java | 110 +++++++++++ .../sidecar/routes/OperationalJobHandler.java | 56 +++--- .../cassandra/sidecar/server/MainModule.java | 5 + .../sidecar/utils/OperationalJobUtils.java | 61 ++++++ ...kenZeroElectorateMembershipIntegrationTest.java | 2 + .../routes/NodeDecommissionIntegrationTest.java | 130 +++++++++++++ .../sidecar/testing/BootstrapBBUtils.java | 16 ++ .../sidecar/testing/IntegrationTestBase.java | 38 +++- .../sidecar/job/OperationalJobManagerTest.java | 29 +-- .../cassandra/sidecar/job/OperationalJobTest.java | 18 ++ .../sidecar/job/OperationalJobTrackerTest.java | 46 ++--- .../routes/ListOperationalJobsHandlerTest.java | 6 + .../routes/NodeDecommissionHandlerTest.java | 207 +++++++++++++++++++++ .../sidecar/routes/OperationalJobHandlerTest.java | 3 + 29 files changed, 965 insertions(+), 108 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 0e71aa10..0bde3ab1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 1.0.0 ----- + * Adds sidecar endpoint for node decommissioning operation (CASSANDRASC-151) * Rename field in ListCdcSegmentsResponse (CASSSIDECAR-184) * Ensure memory consistency from PeriodicTask executions and expose richer ScheduleDecision (CASSSIDECAR-181) * Refactor access to delegate methods to simplify (CASSSIDECAR-182) diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java index 4f2e1b4f..49b3ec9d 100644 --- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java +++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java @@ -213,4 +213,24 @@ public class CassandraStorageOperations implements StorageOperations jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME) .forceKeyspaceCleanup(concurrency, keyspace, table); } + + /** + * {@inheritDoc} + */ + @Override + public String operationMode() + { + StorageJmxOperations ssProxy = jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME); + return ssProxy.getOperationMode(); + } + + /** + * {@inheritDoc} + */ + @Override + public void decommission(boolean force) + { + StorageJmxOperations ssProxy = jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME); + ssProxy.decommission(force); + } } diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java index 5b641ee8..bbdb29c4 100644 --- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java +++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java @@ -159,4 +159,16 @@ public class GossipDependentStorageJmxOperations implements StorageJmxOperations LOGGER.warn("Gossip is disabled and unavailable for the operation"); throw new OperationUnavailableException("Gossip is required for the operation but it is disabled"); } + + @Override + public void decommission(boolean force) throws IllegalStateException, IllegalArgumentException, UnsupportedOperationException + { + delegate.decommission(force); + } + + @Override + public String getOperationMode() + { + return delegate.getOperationMode(); + } } diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java index d8f94125..ca18db04 100644 --- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java +++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java @@ -159,4 +159,16 @@ public interface StorageJmxOperations * @throws InterruptedException it does not really throw but declared in MBean */ int forceKeyspaceCleanup(int jobs, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException; + + /** + * Triggers the node decommission operation + * @param force force decommission, bypassing RF checks, when this flag is set + */ + void decommission(boolean force) throws IllegalStateException, IllegalArgumentException, UnsupportedOperationException; + + /** + * Fetch the operation-mode of the node + * @return string representation of theoperation-mode + */ + String getOperationMode(); } diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java index 108bc6e1..e608913e 100644 --- a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java @@ -125,7 +125,7 @@ public final class ApiEndpointsV1 public static final String PER_OPERATIONAL_JOB = OPERATIONAL_JOBS + '/' + OPERATIONAL_JOB_ID_PATH_PARAM; public static final String LIST_OPERATIONAL_JOBS_ROUTE = API_V1 + CASSANDRA + OPERATIONAL_JOBS; public static final String OPERATIONAL_JOB_ROUTE = API_V1 + CASSANDRA + PER_OPERATIONAL_JOB; - + public static final String NODE_DECOMMISSION_ROUTE = API_V1 + CASSANDRA + "/operations/decommission"; private ApiEndpointsV1() { throw new IllegalStateException(getClass() + " is a constants container and shall not be instantiated"); diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeDecommissionRequest.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeDecommissionRequest.java new file mode 100644 index 00000000..48269f28 --- /dev/null +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeDecommissionRequest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.request; + +import io.netty.handler.codec.http.HttpMethod; +import org.apache.cassandra.sidecar.common.ApiEndpointsV1; +import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; + +/** + * Represents a request to execute node decommission operation + */ +public class NodeDecommissionRequest extends JsonRequest<OperationalJobResponse> +{ + /** + * Constructs a request to execute a node decommission operation + */ + public NodeDecommissionRequest() + { + super(ApiEndpointsV1.NODE_DECOMMISSION_ROUTE); + } + + /** + * {@inheritDoc} + */ + @Override + public HttpMethod method() + { + return HttpMethod.PUT; + } +} diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java index 65fad58d..938ef0ac 100644 --- a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java +++ b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java @@ -38,6 +38,7 @@ import org.apache.cassandra.sidecar.common.request.GossipInfoRequest; import org.apache.cassandra.sidecar.common.request.ImportSSTableRequest; import org.apache.cassandra.sidecar.common.request.ListOperationalJobsRequest; import org.apache.cassandra.sidecar.common.request.ListSnapshotFilesRequest; +import org.apache.cassandra.sidecar.common.request.NodeDecommissionRequest; import org.apache.cassandra.sidecar.common.request.NodeSettingsRequest; import org.apache.cassandra.sidecar.common.request.OperationalJobRequest; import org.apache.cassandra.sidecar.common.request.Request; @@ -76,6 +77,7 @@ public class RequestContext protected static final RingRequest RING_REQUEST = new RingRequest(); protected static final GossipInfoRequest GOSSIP_INFO_REQUEST = new GossipInfoRequest(); protected static final ListOperationalJobsRequest LIST_JOBS_REQUEST = new ListOperationalJobsRequest(); + protected static final NodeDecommissionRequest NODE_DECOMMISSION_REQUEST = new NodeDecommissionRequest(); protected static final RetryPolicy DEFAULT_RETRY_POLICY = new NoRetryPolicy(); protected static final RetryPolicy DEFAULT_EXPONENTIAL_BACKOFF_RETRY_POLICY = new ExponentialBackoffRetryPolicy(10, 500L, 60_000L); @@ -512,6 +514,17 @@ public class RequestContext return request(LIST_JOBS_REQUEST); } + /** + * Sets the {@code request} to be a {@link NodeDecommissionRequest} and returns a reference to this Builder + * enabling method chaining. + * + * @return a reference to this Builder + */ + public Builder nodeDecommissionRequest() + { + return request(NODE_DECOMMISSION_REQUEST); + } + /** * Sets the {@code retryPolicy} to be an * {@link org.apache.cassandra.sidecar.client.retry.ExponentialBackoffRetryPolicy} configured with diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java index f896a79e..7be50b5d 100644 --- a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java +++ b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java @@ -664,6 +664,19 @@ public class SidecarClient implements AutoCloseable, SidecarClientBlobRestoreExt .build()); } + /** + * Executes the node decommission request using the default retry policy and configured selection policy + * @param instance the instance where the request will be executed + * @return a completable future of the jobs list + */ + public CompletableFuture<OperationalJobResponse> nodeDecommission(SidecarInstance instance) + { + return executor.executeRequestAsync(requestBuilder() + .singleInstanceSelectionPolicy(instance) + .nodeDecommissionRequest() + .build()); + } + /** * Returns a copy of the request builder with the default parameters configured for the client. * diff --git a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java index f32356ae..7fc1f908 100644 --- a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java +++ b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java @@ -1324,6 +1324,25 @@ abstract class SidecarClientTest } } + @Test + public void testNodeDecommission() throws Exception + { + UUID jobId = UUID.randomUUID(); + String nodeDecommissionString = "{\"jobId\":\"" + jobId + "\",\"jobStatus\":\"SUCCEEDED\",\"instance\":\"127.0.0.1\"}"; + + MockResponse response = new MockResponse() + .setResponseCode(OK.code()) + .setHeader("content-type", "application/json") + .setBody(nodeDecommissionString); + enqueue(response); + + SidecarInstanceImpl sidecarInstance = RequestExecutorTest.newSidecarInstance(servers.get(0)); + OperationalJobResponse result = client.nodeDecommission(sidecarInstance).get(30, TimeUnit.SECONDS); + assertThat(result).isNotNull(); + assertThat(result.status()).isEqualTo(OperationalJobStatus.SUCCEEDED); + validateResponseServed(ApiEndpointsV1.NODE_DECOMMISSION_ROUTE); + } + @Test void testFailsWithOneAttemptPerServer() { diff --git a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java index c13c2a99..26113e3d 100644 --- a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java +++ b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java @@ -102,4 +102,16 @@ public interface StorageOperations { outOfRangeDataCleanup(keyspace, table, 1); } + + /** + * @return the operation-mode of the Cassandra instance + */ + String operationMode(); + + /** + * Triggers the node decommission operation + * + * @param force force decommission, bypassing RF checks, when this flag is set + */ + void decommission(boolean force); } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/job/NodeDecommissionJob.java b/server/src/main/java/org/apache/cassandra/sidecar/job/NodeDecommissionJob.java new file mode 100644 index 00000000..32a05fa9 --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/job/NodeDecommissionJob.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.job; + +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; +import org.apache.cassandra.sidecar.common.server.StorageOperations; + +/** + * Implementation of {@link OperationalJob} to perform node decommission operation. + */ +public class NodeDecommissionJob extends OperationalJob +{ + private static final Logger LOGGER = LoggerFactory.getLogger(NodeDecommissionJob.class); + private static final String OPERATION = "decommission"; + private final boolean isForce; + protected StorageOperations storageOperations; + + public NodeDecommissionJob(UUID jobId, StorageOperations storageOps, boolean isForce) + { + super(jobId); + this.storageOperations = storageOps; + this.isForce = isForce; + } + + @Override + public boolean isRunningOnCassandra() + { + String operationMode = storageOperations.operationMode(); + return "LEAVING".equals(operationMode) || "DECOMMISSIONED".equals(operationMode); + } + + /** + * {@inheritDoc} + */ + @Override + public OperationalJobStatus status() + { + String operationMode = storageOperations.operationMode(); + + if ("LEAVING".equals(operationMode)) + { + return OperationalJobStatus.RUNNING; + } + else if ("DECOMMISSIONED".equals(operationMode)) + { + return OperationalJobStatus.SUCCEEDED; + } + else + { + return super.status(); + } + } + + /** + * {@inheritDoc} + */ + @Override + protected void executeInternal() + { + if (isRunningOnCassandra()) + { + LOGGER.info("Not executing job as an ongoing or completed decommission operation was found jobId={}", this.jobId()); + return; + } + + LOGGER.info("Executing decommission operation. jobId={}", this.jobId()); + storageOperations.decommission(isForce); + } + + /** + * {@inheritDoc} + */ + @Override + public String name() + { + return OPERATION; + } +} + diff --git a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java index 80ec374c..0955176e 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java @@ -20,7 +20,6 @@ package org.apache.cassandra.sidecar.job; import java.time.Duration; import java.util.UUID; -import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +41,7 @@ public abstract class OperationalJob implements Task<Void> private static final Logger LOGGER = LoggerFactory.getLogger(OperationalJob.class); // use v1 time-based uuid - public final UUID jobId; + private final UUID jobId; private final Promise<Void> executionPromise; private volatile boolean isExecuting = false; @@ -59,6 +58,11 @@ public abstract class OperationalJob implements Task<Void> this.executionPromise = Promise.promise(); } + public UUID jobId() + { + return jobId; + } + @Override public final Void result() { @@ -95,12 +99,17 @@ public abstract class OperationalJob implements Task<Void> return referenceTimestampInMillis - createdAt > ttlInMillis; } + /** + * The concrete-job-specific implementation to determine if the job is running on the Cassandra node. + * @return true if the job is running on the Cassandra node. For example, node decommission is tracked by the + * operationMode exposed from Cassandra. + */ + public abstract boolean isRunningOnCassandra(); + /** * Determines the status of the job. OperationalJob subclasses could choose to override the method. * <p> * For long-lived jobs, the implementations should return the {@link OperationalJobStatus#RUNNING} status intelligently. - * The condition of {@link OperationalJobStatus#RUNNING} is implementation-specific. - * For example, node decommission is tracked by the operationMode exposed from Cassandra. * If the operationMode is LEAVING, the corresponding OperationalJob is {@link OperationalJobStatus#RUNNING}. * In this case, even if the OperationalJobStatus determined from this method is {@link OperationalJobStatus#CREATED}, * the concrete implementation can override and return {@link OperationalJobStatus#RUNNING}. @@ -113,7 +122,8 @@ public abstract class OperationalJob implements Task<Void> public OperationalJobStatus status() { Future<Void> fut = asyncResult(); - if (!isExecuting) + // Jobs that are created and yet to be picked up by the executor thread + if (!isExecuting && !fut.isComplete()) { return OperationalJobStatus.CREATED; } @@ -140,12 +150,12 @@ public abstract class OperationalJob implements Task<Void> * Get the async result with waiting for at most the specified wait time * <p> * Note: This call does not block the calling thread. - * The call-site should handle the possible failed future with {@link TimeoutException} from this method. * * @param executorPool executor pool to run the timer * @param waitTime maximum time to wait before returning - * @return the async result or a failed future of {@link OperationalJobException} with the cause - * {@link TimeoutException} after exceeding the wait time + * @return a future that is either the result of the configured timeout based on {@code waitTime} or the async + * result. A succeeded future here, represents either a timeout or the result of the job and a failure is + * represented by an exception thrown by the job execution, within the configured timeout. */ public Future<Void> asyncResult(TaskExecutorPool executorPool, Duration waitTime) { @@ -163,9 +173,8 @@ public abstract class OperationalJob implements Task<Void> // Completes as soon as any future succeeds, or when all futures fail. Note that maxWaitTimePromise is // closed as soon as resultFut completes return Future.any(maxWaitTimeFut, resultFut) - // We want to return the result when applicable, of course. - // If this lambda below is evaluated, both futures are completed; - // Depending on whether timeout flag is set, it either throws or complete with result + // If this lambda below is evaluated, either one of the futures have completed; + // In either case, the future corresponding to the job execution is returned .compose(f -> { boolean isTimeout = maxWaitTimeFut.result(); if (isTimeout) @@ -179,10 +188,8 @@ public abstract class OperationalJob implements Task<Void> /** * OperationalJob body. The implementation is executed in a blocking manner. - * - * @throws OperationalJobException OperationalJobException that wraps job failure */ - protected abstract void executeInternal() throws OperationalJobException; + protected abstract void executeInternal(); /** * Execute the job behavior as specified in the internal execution {@link #executeInternal()}, diff --git a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java index 034fd567..dd5d199e 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java @@ -24,7 +24,8 @@ import java.util.stream.Collectors; import javax.inject.Inject; import com.google.inject.Singleton; -import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException; /** @@ -35,15 +36,18 @@ public class OperationalJobManager { private final OperationalJobTracker jobTracker; + private final TaskExecutorPool internalExecutorPool; + /** * Creates a manager instance with a default sized job-tracker. * * @param jobTracker the tracker for the operational jobs */ @Inject - public OperationalJobManager(OperationalJobTracker jobTracker) + public OperationalJobManager(OperationalJobTracker jobTracker, ExecutorPools executorPools) { this.jobTracker = jobTracker; + this.internalExecutorPool = executorPools.internal(); } /** @@ -53,7 +57,7 @@ public class OperationalJobManager */ public List<OperationalJob> allInflightJobs() { - return jobTracker.getJobsView().values() + return jobTracker.jobsView().values() .stream() .filter(j -> !j.asyncResult().isComplete()) .collect(Collectors.toList()); @@ -76,22 +80,30 @@ public class OperationalJobManager * The job execution failure behavior is tracked within the {@link OperationalJob}. * * @param job OperationalJob instance to submit - * @return OperationalJob instance that is submitted * @throws OperationalJobConflictException when the same operational job is already running on Cassandra */ - public OperationalJob trySubmitJob(OperationalJob job) throws OperationalJobConflictException + public void trySubmitJob(OperationalJob job) throws OperationalJobConflictException { checkConflict(job); // New job is submitted for all cases when we do not have a corresponding downstream job - return jobTracker.computeIfAbsent(job.jobId, jobId -> job); + jobTracker.computeIfAbsent(job.jobId(), jobId -> { + internalExecutorPool.executeBlocking(job::execute); + return job; + }); } + /** + * Checks the job tracker for existing inflight jobs with the same operation before checking downstream for + * corresponding running job on the Cassandra node as a conflict of the job being submitted. + * @param job instance of the job to check conflicts for + * @throws OperationalJobConflictException when a conflicting inflight job is found + */ private void checkConflict(OperationalJob job) throws OperationalJobConflictException { - // The job is not yet submitted (and running), but its status indicates that there is an identical job running on Cassandra already - // In this case, this job submission is rejected. - if (job.status() == OperationalJobStatus.RUNNING) + // If there are no tracked running jobs for same operation, then we confirm downstream + // Downstream check is done in most cases - by design + if (!jobTracker.inflightJobsByOperation(job.name()).isEmpty() || job.isRunningOnCassandra()) { throw new OperationalJobConflictException("The same operational job is already running on Cassandra. operationName='" + job.name() + '\''); } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java index 649a8a0b..6d87510b 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java @@ -20,16 +20,19 @@ package org.apache.cassandra.sidecar.job; import java.util.Collections; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.inject.Inject; import com.google.inject.Singleton; +import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; import org.apache.cassandra.sidecar.config.ServiceConfiguration; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.VisibleForTesting; @@ -65,16 +68,17 @@ public class OperationalJobTracker if (map.size() > initialCapacity) { OperationalJob job = eldest.getValue(); - if (job.status().isCompleted() && job.isStale(System.currentTimeMillis(), ONE_DAY_TTL)) + OperationalJobStatus status = job.status(); + if (status.isCompleted() && job.isStale(System.currentTimeMillis(), ONE_DAY_TTL)) { LOGGER.debug("Expiring completed and stale job due to job tracker has reached max size. jobId={} status={} createdAt={}", - job.jobId, job.status(), job.creationTime()); + job.jobId(), status, job.creationTime()); return true; } else { LOGGER.warn("Job tracker reached max size, but the eldest job is not completed yet. " + - "Not evicting. jobId={} status={}", job.jobId, job.status()); + "Not evicting. jobId={} status={}", job.jobId(), status); // TODO: Optionally trigger cleanup to fetch next oldest to evict } } @@ -100,15 +104,30 @@ public class OperationalJobTracker * @return an immutable copy of the underlying mapping */ @NotNull - Map<UUID, OperationalJob> getJobsView() + Map<UUID, OperationalJob> jobsView() { return Collections.unmodifiableMap(map); } + /** + * Filters the inflight (created or running) jobs matching the job name from the jobsView + * @return list of inflight jobs being tracked + */ + @NotNull + List<OperationalJob> inflightJobsByOperation(String operation) + { + return jobsView().values() + .stream() + .filter(j -> (j.name().equals(operation)) && + (j.status() == OperationalJobStatus.RUNNING || + j.status() == OperationalJobStatus.CREATED)) + .collect(Collectors.toList()); + } + @VisibleForTesting OperationalJob put(OperationalJob job) { - return map.put(job.jobId, job); + return map.put(job.jobId(), job); } @VisibleForTesting diff --git a/server/src/main/java/org/apache/cassandra/sidecar/routes/ListOperationalJobsHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/routes/ListOperationalJobsHandler.java index ba7a24dc..93a0861d 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/routes/ListOperationalJobsHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/routes/ListOperationalJobsHandler.java @@ -61,7 +61,7 @@ public class ListOperationalJobsHandler extends AbstractHandler<Void> ListOperationalJobsResponse listResponse = new ListOperationalJobsResponse(); jobManager.allInflightJobs() .stream() - .map(job -> new OperationalJobResponse(job.jobId, RUNNING, job.name(), null)) + .map(job -> new OperationalJobResponse(job.jobId(), RUNNING, job.name(), null)) .forEach(listResponse::addJob); context.json(listResponse); } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/routes/NodeDecommissionHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/routes/NodeDecommissionHandler.java new file mode 100644 index 00000000..231d8fdb --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/routes/NodeDecommissionHandler.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; + +import com.datastax.driver.core.utils.UUIDs; +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; +import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.config.ServiceConfiguration; +import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException; +import org.apache.cassandra.sidecar.job.NodeDecommissionJob; +import org.apache.cassandra.sidecar.job.OperationalJobManager; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.apache.cassandra.sidecar.utils.OperationalJobUtils; + +import static org.apache.cassandra.sidecar.utils.RequestUtils.parseBooleanQueryParam; + +/** + * Provides REST API for asynchronously decommissioning the corresponding Cassandra node + */ +public class NodeDecommissionHandler extends AbstractHandler<Boolean> +{ + private final OperationalJobManager jobManager; + private final ServiceConfiguration config; + + /** + * Constructs a handler with the provided {@code metadataFetcher} + * + * @param metadataFetcher the interface to retrieve instance metadata + * @param executorPools the executor pools for blocking executions + * @param validator a validator instance to validate Cassandra-specific input + */ + @Inject + protected NodeDecommissionHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + ServiceConfiguration serviceConfiguration, + CassandraInputValidator validator, + OperationalJobManager jobManager) + { + super(metadataFetcher, executorPools, validator); + this.jobManager = jobManager; + this.config = serviceConfiguration; + } + + /** + * {@inheritDoc} + */ + @Override + public void handleInternal(RoutingContext context, + HttpServerRequest httpRequest, + String host, + SocketAddress remoteAddress, + Boolean isForce) + { + StorageOperations operations = metadataFetcher.delegate(host).storageOperations(); + NodeDecommissionJob job = new NodeDecommissionJob(UUIDs.timeBased(), operations, isForce); + try + { + jobManager.trySubmitJob(job); + } + catch (OperationalJobConflictException oje) + { + String reason = oje.getMessage(); + logger.error("Conflicting job encountered. reason={}", reason); + context.response().setStatusCode(HttpResponseStatus.CONFLICT.code()); + context.json(new OperationalJobResponse(job.jobId(), OperationalJobStatus.FAILED, job.name(), reason)); + return; + } + + // Get the result, waiting for the specified wait time for result + job.asyncResult(executorPools.service(), + Duration.of(config.operationalJobExecutionMaxWaitTimeInMillis(), ChronoUnit.MILLIS)) + .onComplete(v -> OperationalJobUtils.sendStatusBasedResponse(context, job)); + } + + /** + * {@inheritDoc} + */ + @Override + protected Boolean extractParamsOrThrow(RoutingContext context) + { + return parseBooleanQueryParam(context.request(), "force", false); + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/routes/OperationalJobHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/routes/OperationalJobHandler.java index 162903af..1f0eb12f 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/routes/OperationalJobHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/routes/OperationalJobHandler.java @@ -25,22 +25,20 @@ import io.netty.handler.codec.http.HttpResponseStatus; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.net.SocketAddress; import io.vertx.ext.web.RoutingContext; -import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; -import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.job.OperationalJob; import org.apache.cassandra.sidecar.job.OperationalJobManager; import org.apache.cassandra.sidecar.utils.CassandraInputValidator; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.apache.cassandra.sidecar.utils.OperationalJobUtils; import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.OPERATIONAL_JOB_ID_PATH_PARAM; -import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED; import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; /** * Handler for retrieving the status of async operational jobs running on the sidecar */ -public class OperationalJobHandler extends AbstractHandler<Void> +public class OperationalJobHandler extends AbstractHandler<UUID> { private final OperationalJobManager jobManager; @@ -54,16 +52,16 @@ public class OperationalJobHandler extends AbstractHandler<Void> this.jobManager = jobManager; } + /** + * {@inheritDoc} + */ @Override - protected Void extractParamsOrThrow(RoutingContext context) + public void handleInternal(RoutingContext context, + HttpServerRequest httpRequest, + String host, + SocketAddress remoteAddress, + UUID jobId) { - return null; - } - - @Override - public void handleInternal(RoutingContext context, HttpServerRequest httpRequest, String host, SocketAddress remoteAddress, Void request) - { - UUID jobId = validatedJobIdParam(context); executorPools.service() .executeBlocking(() -> { OperationalJob job = jobManager.getJobIfExists(jobId); @@ -75,11 +73,20 @@ public class OperationalJobHandler extends AbstractHandler<Void> } return job; }) - .onFailure(cause -> processFailure(cause, context, host, remoteAddress, request)) - .onSuccess(job -> sendStatusBasedResponse(context, jobId, job)); + .onFailure(cause -> processFailure(cause, context, host, remoteAddress, jobId)) + .onSuccess(job -> OperationalJobUtils.sendStatusBasedResponse(context, job)); } - UUID validatedJobIdParam(RoutingContext context) + /** + * {@inheritDoc} + */ + @Override + protected UUID extractParamsOrThrow(RoutingContext context) + { + return validatedJobIdParam(context); + } + + private UUID validatedJobIdParam(RoutingContext context) { String requestJobId = context.pathParam(OPERATIONAL_JOB_ID_PATH_PARAM.substring(1)); if (requestJobId == null) @@ -96,24 +103,9 @@ public class OperationalJobHandler extends AbstractHandler<Void> catch (IllegalArgumentException e) { logger.info("Invalid jobId. jobId={}", requestJobId); - throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, String.format("Invalid job ID provided :%s.", requestJobId)); + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, + String.format("Invalid job ID provided: %s.", requestJobId)); } return jobId; } - - public void sendStatusBasedResponse(RoutingContext context, UUID jobId, OperationalJob job) - { - OperationalJobStatus status = job.status(); - if (status.isCompleted()) - { - context.response().setStatusCode(HttpResponseStatus.OK.code()); - } - else - { - context.response().setStatusCode(HttpResponseStatus.ACCEPTED.code()); - } - - String reason = status == FAILED ? job.asyncResult().cause().getMessage() : null; - context.json(new OperationalJobResponse(jobId, status, job.name(), reason)); - } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java index e8051006..10a9b321 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java @@ -106,6 +106,7 @@ import org.apache.cassandra.sidecar.routes.FileStreamHandler; import org.apache.cassandra.sidecar.routes.GossipInfoHandler; import org.apache.cassandra.sidecar.routes.JsonErrorHandler; import org.apache.cassandra.sidecar.routes.ListOperationalJobsHandler; +import org.apache.cassandra.sidecar.routes.NodeDecommissionHandler; import org.apache.cassandra.sidecar.routes.OperationalJobHandler; import org.apache.cassandra.sidecar.routes.RingHandler; import org.apache.cassandra.sidecar.routes.RoutingOrder; @@ -280,6 +281,7 @@ public class MainModule extends AbstractModule ConnectedClientStatsHandler connectedClientStatsHandler, OperationalJobHandler operationalJobHandler, ListOperationalJobsHandler listOperationalJobsHandler, + NodeDecommissionHandler nodeDecommissionHandler, ErrorHandler errorHandler) { Router router = Router.router(vertx); @@ -379,6 +381,9 @@ public class MainModule extends AbstractModule router.get(ApiEndpointsV1.LIST_OPERATIONAL_JOBS_ROUTE) .handler(listOperationalJobsHandler); + router.put(ApiEndpointsV1.NODE_DECOMMISSION_ROUTE) + .handler(nodeDecommissionHandler); + router.get(ApiEndpointsV1.RING_ROUTE_PER_KEYSPACE) .handler(ringHandler); diff --git a/server/src/main/java/org/apache/cassandra/sidecar/utils/OperationalJobUtils.java b/server/src/main/java/org/apache/cassandra/sidecar/utils/OperationalJobUtils.java new file mode 100644 index 00000000..e36f1a89 --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/utils/OperationalJobUtils.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; +import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; +import org.apache.cassandra.sidecar.job.OperationalJob; + +import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED; + +/** + * Utility class for OperationalJob framework operations. + */ +public class OperationalJobUtils +{ + private static final Logger LOGGER = LoggerFactory.getLogger(OperationalJobUtils.class); + + /** + * In the operational job context, sends a {@link OperationalJobResponse} based on the status of the job. + * + * @param context the request context + * @param job the operational job to reports status on + */ + public static void sendStatusBasedResponse(RoutingContext context, OperationalJob job) + { + OperationalJobStatus status = job.status(); + LOGGER.info("Job completion status={} jobId={}", status, job.jobId()); + if (status.isCompleted()) + { + context.response().setStatusCode(HttpResponseStatus.OK.code()); + } + else + { + context.response().setStatusCode(HttpResponseStatus.ACCEPTED.code()); + } + + String reason = status == FAILED ? job.asyncResult().cause().getMessage() : null; + context.json(new OperationalJobResponse(job.jobId(), status, job.name(), reason)); + } +} diff --git a/server/src/test/integration/org/apache/cassandra/sidecar/coordination/MostReplicatedKeyspaceTokenZeroElectorateMembershipIntegrationTest.java b/server/src/test/integration/org/apache/cassandra/sidecar/coordination/MostReplicatedKeyspaceTokenZeroElectorateMembershipIntegrationTest.java index fdb0c45b..6f335b85 100644 --- a/server/src/test/integration/org/apache/cassandra/sidecar/coordination/MostReplicatedKeyspaceTokenZeroElectorateMembershipIntegrationTest.java +++ b/server/src/test/integration/org/apache/cassandra/sidecar/coordination/MostReplicatedKeyspaceTokenZeroElectorateMembershipIntegrationTest.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import org.slf4j.Logger; @@ -68,6 +69,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** * Integration tests for the {@link MostReplicatedKeyspaceTokenZeroElectorateMembership} class */ +@Tag("heavy") class MostReplicatedKeyspaceTokenZeroElectorateMembershipIntegrationTest { private static final Logger LOGGER = LoggerFactory.getLogger(MostReplicatedKeyspaceTokenZeroElectorateMembershipIntegrationTest.class); diff --git a/server/src/test/integration/org/apache/cassandra/sidecar/routes/NodeDecommissionIntegrationTest.java b/server/src/test/integration/org/apache/cassandra/sidecar/routes/NodeDecommissionIntegrationTest.java new file mode 100644 index 00000000..fadd012e --- /dev/null +++ b/server/src/test/integration/org/apache/cassandra/sidecar/routes/NodeDecommissionIntegrationTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.jupiter.api.extension.ExtendWith; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; +import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; +import org.apache.cassandra.sidecar.testing.IntegrationTestBase; +import org.apache.cassandra.testing.CassandraIntegrationTest; + +import static org.apache.cassandra.sidecar.AssertionUtils.loopAssert; +import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED; +import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING; +import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +/** + * Test the node decommission endpoint with cassandra container. + */ +@ExtendWith(VertxExtension.class) +public class NodeDecommissionIntegrationTest extends IntegrationTestBase +{ + @CassandraIntegrationTest(nodesPerDc = 2) + void decommissionNodeDefault(VertxTestContext context) + { + final AtomicReference<String> jobId = new AtomicReference<>(); + String testRoute = "/api/v1/cassandra/operations/decommission?force=true"; + testWithClient(client -> client.put(server.actualPort(), "127.0.0.1", testRoute) + .send(context.succeeding(response -> { + OperationalJobResponse decommissionResponse = response.bodyAsJson(OperationalJobResponse.class); + assertThat(decommissionResponse.status()).isEqualTo(RUNNING); + jobId.set(String.valueOf(decommissionResponse.jobId())); + }))); + Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS); + assertThat(jobId.get()).isNotNull(); + pollStatusForState(jobId.get(), SUCCEEDED, null); + context.completeNow(); + } + + @CassandraIntegrationTest(nodesPerDc = 2) + void decommissionNodeWithFailure(VertxTestContext context) + { + String testRoute = "/api/v1/cassandra/operations/decommission"; + testWithClient(client -> client.put(server.actualPort(), "127.0.0.1", testRoute) + .send(context.succeeding(response -> { + OperationalJobResponse decommissionResponse = response.bodyAsJson(OperationalJobResponse.class); + assertThat(decommissionResponse.status()).isEqualTo(FAILED); + assertThat(decommissionResponse.jobId()).isNotNull(); + String jobId = String.valueOf(decommissionResponse.jobId()); + assertThat(jobId).isNotNull(); + context.completeNow(); + }))); + + } + + private void pollStatusForState(String uuid, + OperationalJobStatus expectedStatus, + String expectedReason) + { + String status = "/api/v1/cassandra/operational-jobs/" + uuid; + AtomicBoolean stateReached = new AtomicBoolean(false); + AtomicInteger counter = new AtomicInteger(0); + loopAssert(30, () -> { + counter.incrementAndGet(); + // TODO: optionally create a helper method in the base class to get response in the blocking manner + HttpResponse<Buffer> resp; + try + { + resp = client.get(server.actualPort(), "127.0.0.1", status) + .send() + .toCompletionStage() + .toCompletableFuture() + .get(); + logger.info("Success Status Response code: {}", resp.statusCode()); + logger.info("Status Response: {}", resp.bodyAsString()); + if (resp.statusCode() == HttpResponseStatus.OK.code()) + { + stateReached.set(true); + OperationalJobResponse jobStatusResp = resp.bodyAsJson(OperationalJobResponse.class); + assertThat(jobStatusResp.jobId()).isEqualTo(UUID.fromString(uuid)); + assertThat(jobStatusResp.status()).isEqualTo(expectedStatus); + assertThat(jobStatusResp.reason()).isEqualTo(expectedReason); + assertThat(jobStatusResp.operation()).isEqualTo("decommission"); + } + else + { + assertThat(resp.statusCode()).isEqualTo(HttpResponseStatus.ACCEPTED.code()); + OperationalJobResponse jobStatusResp = resp.bodyAsJson(OperationalJobResponse.class); + assertThat(jobStatusResp.jobId()).isEqualTo(UUID.fromString(uuid)); + } + logger.info("Request completed"); + assertThat(stateReached.get()).isTrue(); + } + catch (InterruptedException | ExecutionException e) + { + throw new RuntimeException(e); + } + }); + } +} diff --git a/server/src/test/integration/org/apache/cassandra/sidecar/testing/BootstrapBBUtils.java b/server/src/test/integration/org/apache/cassandra/sidecar/testing/BootstrapBBUtils.java index ab5ec788..a3420e2f 100644 --- a/server/src/test/integration/org/apache/cassandra/sidecar/testing/BootstrapBBUtils.java +++ b/server/src/test/integration/org/apache/cassandra/sidecar/testing/BootstrapBBUtils.java @@ -50,4 +50,20 @@ public class BootstrapBBUtils .make(TypeResolutionStrategy.Lazy.INSTANCE, typePool) .load(cl, ClassLoadingStrategy.Default.INJECTION); } + + public static void installDecommissionIntercepter(ClassLoader cl, Class<?> interceptor) + { + + // "org.apache.cassandra.service.StorageService" "operationMode" + // "org.apache.cassandra.tcm.sequences.InProgressSequences" "isLeave" + TypePool typePool = TypePool.Default.of(cl); + TypeDescription description = typePool.describe("org.apache.cassandra.service.StorageService") + .resolve(); + new ByteBuddy().rebase(description, ClassFileLocator.ForClassLoader.of(cl)) + .method(named("operationMode")) + .intercept(MethodDelegation.to(interceptor)) + // Defer class loading until all dependencies are loaded + .make(TypeResolutionStrategy.Lazy.INSTANCE, typePool) + .load(cl, ClassLoadingStrategy.Default.INJECTION); + } } diff --git a/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java b/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java index 7cdedaf7..5934e504 100644 --- a/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java +++ b/server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java @@ -195,6 +195,11 @@ public abstract class IntegrationTestBase return -1; } + protected void testWithClient(Consumer<WebClient> tester) + { + testWithClient(true, tester); + } + protected void testWithClient(VertxTestContext context, Consumer<WebClient> tester) throws Exception { testWithClient(context, true, tester); @@ -204,6 +209,37 @@ public abstract class IntegrationTestBase boolean waitForCluster, Consumer<WebClient> tester) throws Exception + { + testWithClient(waitForCluster, tester); + // wait until the test completes + assertThat(context.awaitCompletion(2, TimeUnit.MINUTES)).isTrue(); + } + + protected void testWithClient(boolean waitForCluster, + Consumer<WebClient> tester) + { + CassandraAdapterDelegate delegate = sidecarTestContext.instancesMetadata() + .instanceFromId(1) + .delegate(); + + assertThat(delegate).isNotNull(); + if (delegate.isNativeUp() || !waitForCluster) + { + tester.accept(client); + } + else + { + vertx.eventBus().localConsumer(ON_CASSANDRA_CQL_READY.address(), (Message<JsonObject> message) -> { + if (message.body().getInteger("cassandraInstanceId") == 1) + { + tester.accept(client); + } + }); + } + } + + protected void testWithClientBlocking(boolean waitForCluster, + Consumer<WebClient> tester) { CassandraAdapterDelegate delegate = sidecarTestContext.instancesMetadata() .instanceFromId(1) @@ -224,8 +260,6 @@ public abstract class IntegrationTestBase }); } - // wait until the test completes - assertThat(context.awaitCompletion(2, TimeUnit.MINUTES)).isTrue(); } protected void createTestKeyspace() diff --git a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java index f2f8307c..b680e5e9 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java @@ -30,10 +30,8 @@ import io.vertx.core.Vertx; import org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; -import org.apache.cassandra.sidecar.config.ServiceConfiguration; -import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl; import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException; -import org.mockito.Mock; import org.mockito.MockitoAnnotations; import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING; @@ -55,33 +53,30 @@ import static org.mockito.Mockito.when; */ class OperationalJobManagerTest { - @Mock - SidecarConfiguration mockConfig; - protected Vertx vertx; + protected ExecutorPools executorPool; + @BeforeEach void setup() { vertx = Vertx.vertx(); + executorPool = new ExecutorPools(vertx, new ServiceConfigurationImpl()); MockitoAnnotations.openMocks(this); - ServiceConfiguration mockServiceConfig = mock(ServiceConfiguration.class); - when(mockConfig.serviceConfiguration()).thenReturn(mockServiceConfig); - when(mockServiceConfig.operationalJobExecutionMaxWaitTimeInMillis()).thenReturn(5000L); } @Test void testWithNoDownstreamJob() { OperationalJobTracker tracker = new OperationalJobTracker(4); - OperationalJobManager manager = new OperationalJobManager(tracker); + OperationalJobManager manager = new OperationalJobManager(tracker, executorPool); OperationalJob testJob = OperationalJobTest.createOperationalJob(SUCCEEDED); manager.trySubmitJob(testJob); testJob.execute(Promise.promise()); assertThat(testJob.asyncResult().isComplete()).isTrue(); assertThat(testJob.status()).isEqualTo(SUCCEEDED); - assertThat(tracker.get(testJob.jobId)).isNotNull(); + assertThat(tracker.get(testJob.jobId())).isNotNull(); } @Test @@ -93,7 +88,7 @@ class OperationalJobManagerTest TaskExecutorPool mockExecPool = mock(TaskExecutorPool.class); when(mockPools.internal()).thenReturn(mockExecPool); when(mockExecPool.runBlocking(any())).thenReturn(null); - OperationalJobManager manager = new OperationalJobManager(tracker); + OperationalJobManager manager = new OperationalJobManager(tracker, executorPool); assertThatThrownBy(() -> manager.trySubmitJob(runningJob)) .isExactlyInstanceOf(OperationalJobConflictException.class) .hasMessage("The same operational job is already running on Cassandra. operationName='Operation X'"); @@ -105,7 +100,7 @@ class OperationalJobManagerTest UUID jobId = UUIDs.timeBased(); OperationalJobTracker tracker = new OperationalJobTracker(4); - OperationalJobManager manager = new OperationalJobManager(tracker); + OperationalJobManager manager = new OperationalJobManager(tracker, executorPool); OperationalJob testJob = OperationalJobTest.createOperationalJob(jobId, Duration.ofSeconds(10)); @@ -123,11 +118,17 @@ class OperationalJobManagerTest UUID jobId = UUIDs.timeBased(); OperationalJobTracker tracker = new OperationalJobTracker(4); - OperationalJobManager manager = new OperationalJobManager(tracker); + OperationalJobManager manager = new OperationalJobManager(tracker, executorPool); String msg = "Test Job failed"; OperationalJob failingJob = new OperationalJob(jobId) { + @Override + public boolean isRunningOnCassandra() + { + return false; + } + @Override protected void executeInternal() throws OperationalJobException { diff --git a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java index 9e2bd51f..81f9c9ad 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java @@ -59,6 +59,12 @@ class OperationalJobTest { } + @Override + public boolean isRunningOnCassandra() + { + return jobStatus == OperationalJobStatus.RUNNING; + } + @Override public OperationalJobStatus status() { @@ -82,6 +88,12 @@ class OperationalJobTest { return new OperationalJob(jobId) { + @Override + public boolean isRunningOnCassandra() + { + return false; + } + @Override protected void executeInternal() throws OperationalJobException { @@ -122,6 +134,12 @@ class OperationalJobTest String msg = "Test Job failed"; OperationalJob failingJob = new OperationalJob(UUIDs.timeBased()) { + @Override + public boolean isRunningOnCassandra() + { + return false; + } + @Override protected void executeInternal() throws OperationalJobException { diff --git a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTrackerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTrackerTest.java index f6a49d4c..1840ab97 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTrackerTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTrackerTest.java @@ -31,8 +31,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import com.datastax.driver.core.utils.UUIDs; -import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; -import org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException; import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED; import static org.apache.cassandra.sidecar.job.OperationalJobTest.createOperationalJob; @@ -53,19 +51,7 @@ class OperationalJobTrackerTest OperationalJob job4 = createOperationalJob(SUCCEEDED); long twoDaysAgo = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2); - OperationalJob jobWithStaleCreationTime = new OperationalJob(UUIDs.startOf(twoDaysAgo)) - { - @Override - protected void executeInternal() throws OperationalJobException - { - } - - @Override - public OperationalJobStatus status() - { - return SUCCEEDED; - } - }; + OperationalJob jobWithStaleCreationTime = createOperationalJob(UUIDs.startOf(twoDaysAgo), SUCCEEDED); @BeforeEach void setUp() @@ -78,18 +64,18 @@ class OperationalJobTrackerTest { jobTracker.put(job1); jobTracker.put(job2); - assertThat(jobTracker.get(job1.jobId)).isSameAs(job1); - assertThat(jobTracker.get(job2.jobId)).isSameAs(job2); + assertThat(jobTracker.get(job1.jobId())).isSameAs(job1); + assertThat(jobTracker.get(job2.jobId())).isSameAs(job2); } @Test void testComputeIfAbsent() { jobTracker.put(job1); - OperationalJob job = jobTracker.computeIfAbsent(job1.jobId, v -> job3); + OperationalJob job = jobTracker.computeIfAbsent(job1.jobId(), v -> job3); assertThat(job).isNotSameAs(job3); assertThat(job).isSameAs(job1); - assertThat(jobTracker.get(job1.jobId)).isSameAs(job1); + assertThat(jobTracker.get(job1.jobId())).isSameAs(job1); } @Test @@ -103,10 +89,10 @@ class OperationalJobTrackerTest assertThat(jobTracker.size()) .describedAs("Although the tracker initial size is 3, no job is evicted since all jobs are still running") .isEqualTo(4); - assertThat(jobTracker.get(job1.jobId)).isNotNull(); - assertThat(jobTracker.get(job2.jobId)).isNotNull(); - assertThat(jobTracker.get(job3.jobId)).isNotNull(); - assertThat(jobTracker.get(job4.jobId)).isNotNull(); + assertThat(jobTracker.get(job1.jobId())).isNotNull(); + assertThat(jobTracker.get(job2.jobId())).isNotNull(); + assertThat(jobTracker.get(job3.jobId())).isNotNull(); + assertThat(jobTracker.get(job4.jobId())).isNotNull(); } @Test @@ -118,10 +104,10 @@ class OperationalJobTrackerTest jobTracker.put(job3); assertThat(jobTracker.size()).isEqualTo(3); - assertThat(jobTracker.get(job1.jobId)).isNotNull(); - assertThat(jobTracker.get(job2.jobId)).isNotNull(); - assertThat(jobTracker.get(job3.jobId)).isNotNull(); - assertThat(jobTracker.get(jobWithStaleCreationTime.jobId)).isNull(); + assertThat(jobTracker.get(job1.jobId())).isNotNull(); + assertThat(jobTracker.get(job2.jobId())).isNotNull(); + assertThat(jobTracker.get(job3.jobId())).isNotNull(); + assertThat(jobTracker.get(jobWithStaleCreationTime.jobId())).isNull(); } @Test @@ -131,9 +117,9 @@ class OperationalJobTrackerTest jobTracker.put(job1); jobTracker.put(job2); - Map<UUID, OperationalJob> view = jobTracker.getJobsView(); + Map<UUID, OperationalJob> view = jobTracker.jobsView(); assertThat(view.size()).isEqualTo(2); - assertThatThrownBy(() -> view.put(job3.jobId, job3)) + assertThatThrownBy(() -> view.put(job3.jobId(), job3)) .isExactlyInstanceOf(UnsupportedOperationException.class); } @@ -152,7 +138,7 @@ class OperationalJobTrackerTest executorService.shutdown(); executorService.awaitTermination(5, TimeUnit.SECONDS); assertThat(tracker.size()).isEqualTo(one); - assertThat(tracker.getJobsView().values().iterator().next()) + assertThat(tracker.jobsView().values().iterator().next()) .describedAs("Only the last job is kept") .isSameAs(sortedJobs.get(sortedJobs.size() - 1)); } diff --git a/server/src/test/java/org/apache/cassandra/sidecar/routes/ListOperationalJobsHandlerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/routes/ListOperationalJobsHandlerTest.java index 57b9ff4b..c2a5640d 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/routes/ListOperationalJobsHandlerTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/routes/ListOperationalJobsHandlerTest.java @@ -149,6 +149,12 @@ class ListOperationalJobsHandlerTest super(jobId); } + @Override + public boolean isRunningOnCassandra() + { + return false; + } + @Override protected void executeInternal() throws OperationalJobException { diff --git a/server/src/test/java/org/apache/cassandra/sidecar/routes/NodeDecommissionHandlerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/routes/NodeDecommissionHandlerTest.java new file mode 100644 index 00000000..5b2dde64 --- /dev/null +++ b/server/src/test/java/org/apache/cassandra/sidecar/routes/NodeDecommissionHandlerTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import com.google.inject.util.Modules; +import io.vertx.core.Vertx; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.predicate.ResponsePredicate; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.apache.cassandra.sidecar.TestModule; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.server.MainModule; +import org.apache.cassandra.sidecar.server.Server; +import org.mockito.AdditionalAnswers; + +import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED; +import static io.netty.handler.codec.http.HttpResponseStatus.CONFLICT; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING; +import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link NodeDecommissionHandler} + */ +@ExtendWith(VertxExtension.class) +public class NodeDecommissionHandlerTest +{ + static final Logger LOGGER = LoggerFactory.getLogger(NodeDecommissionHandlerTest.class); + Vertx vertx; + Server server; + StorageOperations mockStorageOperations = mock(StorageOperations.class); + @BeforeEach + void before() throws InterruptedException + { + Injector injector; + Module testOverride = Modules.override(new TestModule()) + .with(new NodeDecommissionHandlerTest.NodeDecommissionTestModule()); + injector = Guice.createInjector(Modules.override(new MainModule()) + .with(testOverride)); + vertx = injector.getInstance(Vertx.class); + server = injector.getInstance(Server.class); + VertxTestContext context = new VertxTestContext(); + server.start() + .onSuccess(s -> context.completeNow()) + .onFailure(context::failNow); + context.awaitCompletion(5, TimeUnit.SECONDS); + } + + @AfterEach + void after() throws InterruptedException + { + CountDownLatch closeLatch = new CountDownLatch(1); + server.close().onSuccess(res -> closeLatch.countDown()); + if (closeLatch.await(60, TimeUnit.SECONDS)) + LOGGER.info("Close event received before timeout."); + else + LOGGER.error("Close event timed out."); + } + + @Test + void testDecommissionLongRunning(VertxTestContext context) + { + when(mockStorageOperations.operationMode()).thenReturn("NORMAL"); + doAnswer(AdditionalAnswers.answersWithDelay(6000, invocation -> null)) + .when(mockStorageOperations).decommission(anyBoolean()); + + WebClient client = WebClient.create(vertx); + String testRoute = "/api/v1/cassandra/operations/decommission"; + client.put(server.actualPort(), "127.0.0.1", testRoute) + .expect(ResponsePredicate.SC_ACCEPTED) + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(ACCEPTED.code()); + OperationalJobResponse decommissionResponse = response.bodyAsJson(OperationalJobResponse.class); + assertThat(decommissionResponse).isNotNull(); + assertThat(decommissionResponse.status()).isEqualTo(RUNNING); + context.completeNow(); + })); + } + + @Test + void testDecommissionCompleted(VertxTestContext context) + { + when(mockStorageOperations.operationMode()).thenReturn("NORMAL"); + WebClient client = WebClient.create(vertx); + String testRoute = "/api/v1/cassandra/operations/decommission"; + client.put(server.actualPort(), "127.0.0.1", testRoute) + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + LOGGER.info("Decommission Response: {}", response.bodyAsString()); + + OperationalJobResponse decommissionResponse = response.bodyAsJson(OperationalJobResponse.class); + assertThat(decommissionResponse).isNotNull(); + assertThat(decommissionResponse.status()).isEqualTo(SUCCEEDED); + context.completeNow(); + })); + } + + @Test + void testDecommissionFailed(VertxTestContext context) + { + when(mockStorageOperations.operationMode()).thenReturn("NORMAL"); + doThrow(new RuntimeException("Simulated failure")).when(mockStorageOperations).decommission(anyBoolean()); + WebClient client = WebClient.create(vertx); + String testRoute = "/api/v1/cassandra/operations/decommission"; + client.put(server.actualPort(), "127.0.0.1", testRoute) + .expect(ResponsePredicate.SC_OK) + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + context.completeNow(); + })); + } + + @Test + void testDecommissionConflict(VertxTestContext context) + { + when(mockStorageOperations.operationMode()).thenReturn("LEAVING"); + WebClient client = WebClient.create(vertx); + String testRoute = "/api/v1/cassandra/operations/decommission"; + client.put(server.actualPort(), "127.0.0.1", testRoute) + .expect(ResponsePredicate.SC_CONFLICT) + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(CONFLICT.code()); + + LOGGER.info("Decommission Response: {}", response.bodyAsString()); + OperationalJobResponse decommissionResponse = response.bodyAsJson(OperationalJobResponse.class); + assertThat(decommissionResponse).isNotNull(); + assertThat(decommissionResponse.jobId()).isNotNull(); + context.completeNow(); + })); + } + + /** + * Test guice module for Node Decommission handler tests + */ + class NodeDecommissionTestModule extends AbstractModule + { + @Provides + @Singleton + public InstancesMetadata instanceMetadata() + { + final int instanceId = 100; + final String host = "127.0.0.1"; + final InstanceMetadata instanceMetadata = mock(InstanceMetadata.class); + when(instanceMetadata.host()).thenReturn(host); + when(instanceMetadata.port()).thenReturn(9042); + when(instanceMetadata.id()).thenReturn(instanceId); + when(instanceMetadata.stagingDir()).thenReturn(""); + + CassandraAdapterDelegate delegate = mock(CassandraAdapterDelegate.class); + + when(delegate.storageOperations()).thenReturn(mockStorageOperations); + when(instanceMetadata.delegate()).thenReturn(delegate); + + InstancesMetadata mockInstancesMetadata = mock(InstancesMetadata.class); + when(mockInstancesMetadata.instances()).thenReturn(Collections.singletonList(instanceMetadata)); + when(mockInstancesMetadata.instanceFromId(instanceId)).thenReturn(instanceMetadata); + when(mockInstancesMetadata.instanceFromHost(host)).thenReturn(instanceMetadata); + + return mockInstancesMetadata; + } + } + +} diff --git a/server/src/test/java/org/apache/cassandra/sidecar/routes/OperationalJobHandlerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/routes/OperationalJobHandlerTest.java index 39becc6f..f4d0e98c 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/routes/OperationalJobHandlerTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/routes/OperationalJobHandlerTest.java @@ -173,12 +173,15 @@ class OperationalJobHandlerTest OperationalJobManager mockManager = mock(OperationalJobManager.class); OperationalJob runningMock = mock(OperationalJob.class); Promise<Void> p = Promise.promise(); + when(runningMock.jobId()).thenReturn(runningUuid); when(runningMock.status()).thenReturn(OperationalJobStatus.RUNNING); when(runningMock.asyncResult()).thenReturn(p.future()); OperationalJob completedMock = mock(OperationalJob.class); + when(completedMock.jobId()).thenReturn(completedUuid); when(completedMock.status()).thenReturn(OperationalJobStatus.SUCCEEDED); when(completedMock.name()).thenReturn("testCompleted"); OperationalJob failedMock = mock(OperationalJob.class); + when(failedMock.jobId()).thenReturn(failedUuid); when(failedMock.status()).thenReturn(OperationalJobStatus.FAILED); when(failedMock.asyncResult()).thenReturn(Future.failedFuture("Test failed")); when(failedMock.name()).thenReturn("testFailed"); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org