This is an automated email from the ASF dual-hosted git repository. frankgh pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push: new 35138a46 CASSSIDECAR-150: Adds job management framework and APIs (#139) 35138a46 is described below commit 35138a46905db3b3cf5f1c3920f46e126b0b3259 Author: Arjun Ashok <arjun_as...@apple.com> AuthorDate: Thu Dec 12 15:29:52 2024 -0800 CASSSIDECAR-150: Adds job management framework and APIs (#139) Patch by Arjun Ashok; Reviewed by Shailaja Koppu, Yifan Cai, Venkata Harikrishna Nukala, Francisco Guerrero for CASSSIDECAR-150 --- CHANGES.txt | 1 + .../cassandra/sidecar/common/ApiEndpointsV1.java | 8 + .../sidecar/common/data/OperationalJobStatus.java | 48 +++++ .../common/request/ListOperationalJobsRequest.java | 46 +++++ .../common/request/OperationalJobRequest.java | 50 +++++ .../response/ListOperationalJobsResponse.java | 51 +++++ .../common/response/OperationalJobResponse.java | 89 +++++++++ .../cassandra/sidecar/client/RequestContext.java | 26 +++ .../cassandra/sidecar/client/SidecarClient.java | 43 ++++- .../sidecar/client/SidecarClientTest.java | 121 +++++++++--- .../server/exceptions/OperationalJobException.java | 52 +++++ .../sidecar/config/ServiceConfiguration.java | 13 ++ .../config/yaml/ServiceConfigurationImpl.java | 57 ++++++ .../OperationalJobConflictException.java | 32 +++ .../cassandra/sidecar/job/OperationalJob.java | 215 +++++++++++++++++++++ .../sidecar/job/OperationalJobManager.java | 99 ++++++++++ .../sidecar/job/OperationalJobTracker.java | 119 ++++++++++++ .../sidecar/routes/ListOperationalJobsHandler.java | 68 +++++++ .../sidecar/routes/OperationalJobHandler.java | 119 ++++++++++++ .../cassandra/sidecar/server/MainModule.java | 10 + .../cassandra/sidecar/tasks/PeriodicTask.java | 34 +--- .../sidecar/tasks/{PeriodicTask.java => Task.java} | 62 +----- .../sidecar/job/OperationalJobManagerTest.java | 144 ++++++++++++++ .../cassandra/sidecar/job/OperationalJobTest.java | 186 ++++++++++++++++++ .../sidecar/job/OperationalJobTrackerTest.java | 159 +++++++++++++++ .../routes/ListOperationalJobsHandlerTest.java | 157 +++++++++++++++ .../sidecar/routes/OperationalJobHandlerTest.java | 192 ++++++++++++++++++ 27 files changed, 2087 insertions(+), 114 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 6cc3016f..1c8d31a7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 1.0.0 ----- + * Add support for operational job management of long-running node operations (CASSSIDECAR-150) * CDC directory not initialized properly in InstanceMetadata (CASSSIDECAR-171) * yaml configuration defaults to a file that doesn't exist (CASSSIDECAR-122) * Add advanced driver settings to allow taking in password or certificates for Cassandra connection (CASSSIDECAR-159) diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java index be49b511..108bc6e1 100644 --- a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java @@ -29,6 +29,7 @@ public final class ApiEndpointsV1 public static final String HEALTH = "/__health"; public static final String CASSANDRA = "/cassandra"; + public static final String NATIVE = "/native"; public static final String JMX = "/jmx"; public static final String KEYSPACE_PATH_PARAM = ":keyspace"; @@ -39,6 +40,8 @@ public final class ApiEndpointsV1 public static final String UPLOAD_ID_PATH_PARAM = ":uploadId"; public static final String JOB_ID_PATH_PARAM = ":jobId"; + public static final String OPERATIONAL_JOB_ID_PATH_PARAM = ":operationId"; + public static final String PER_KEYSPACE = "/keyspaces/" + KEYSPACE_PATH_PARAM; public static final String PER_TABLE = "/tables/" + TABLE_PATH_PARAM; public static final String PER_SNAPSHOT = "/snapshots/" + SNAPSHOT_PATH_PARAM; @@ -118,6 +121,11 @@ public final class ApiEndpointsV1 public static final String CONNECTED_CLIENT_STATS_ROUTE = API_V1 + CASSANDRA + "/stats/connected-clients"; + public static final String OPERATIONAL_JOBS = "/operational-jobs"; + public static final String PER_OPERATIONAL_JOB = OPERATIONAL_JOBS + '/' + OPERATIONAL_JOB_ID_PATH_PARAM; + public static final String LIST_OPERATIONAL_JOBS_ROUTE = API_V1 + CASSANDRA + OPERATIONAL_JOBS; + public static final String OPERATIONAL_JOB_ROUTE = API_V1 + CASSANDRA + PER_OPERATIONAL_JOB; + private ApiEndpointsV1() { throw new IllegalStateException(getClass() + " is a constants container and shall not be instantiated"); diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/OperationalJobStatus.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/OperationalJobStatus.java new file mode 100644 index 00000000..6a2d0600 --- /dev/null +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/data/OperationalJobStatus.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.data; + +/** + * Encapsulates the states of the job lifecycle. + * Operational jobs are the ones running on Cassandra, e.g. decommission, etc. + */ +public enum OperationalJobStatus +{ + /** + * The operational job is created + */ + CREATED, + /** + * The operational job is running on Cassandra + */ + RUNNING, + /** + * The operational job succeeds + */ + SUCCEEDED, + /** + * The operational job fails + */ + FAILED; + + public boolean isCompleted() + { + return this == SUCCEEDED || this == FAILED; + } +} diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ListOperationalJobsRequest.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ListOperationalJobsRequest.java new file mode 100644 index 00000000..b472bd70 --- /dev/null +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ListOperationalJobsRequest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.request; + +import io.netty.handler.codec.http.HttpMethod; +import org.apache.cassandra.sidecar.common.ApiEndpointsV1; +import org.apache.cassandra.sidecar.common.response.ListOperationalJobsResponse; + +/** + * Represents a request to retrieve the list of async operational jobs + */ +public class ListOperationalJobsRequest extends JsonRequest<ListOperationalJobsResponse> +{ + /** + * Constructs a request to retrieve the list of running operational jobs + */ + public ListOperationalJobsRequest() + { + super(ApiEndpointsV1.LIST_OPERATIONAL_JOBS_ROUTE); + } + + /** + * {@inheritDoc} + */ + @Override + public HttpMethod method() + { + return HttpMethod.GET; + } +} diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/OperationalJobRequest.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/OperationalJobRequest.java new file mode 100644 index 00000000..402c7d86 --- /dev/null +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/OperationalJobRequest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.request; + +import java.util.UUID; + +import io.netty.handler.codec.http.HttpMethod; +import org.apache.cassandra.sidecar.common.ApiEndpointsV1; +import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; + +/** + * Represents a request to retrieve the status of a async operational job + */ +public class OperationalJobRequest extends JsonRequest<OperationalJobResponse> +{ + + /** + * Constructs a request to retrieve status for a specified operational job + */ + public OperationalJobRequest(UUID jobId) + { + super(ApiEndpointsV1.OPERATIONAL_JOB_ROUTE + .replaceAll(ApiEndpointsV1.OPERATIONAL_JOB_ID_PATH_PARAM, jobId.toString())); + } + + /** + * {@inheritDoc} + */ + @Override + public HttpMethod method() + { + return HttpMethod.GET; + } +} diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ListOperationalJobsResponse.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ListOperationalJobsResponse.java new file mode 100644 index 00000000..1f9f1eaa --- /dev/null +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ListOperationalJobsResponse.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.response; + +import java.util.ArrayList; +import java.util.List; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Response structure of the list operational jobs API + */ +public class ListOperationalJobsResponse +{ + private final List<OperationalJobResponse> jobs; + + /** + * Constructs a {@link ListOperationalJobsResponse} object. + */ + public ListOperationalJobsResponse() + { + this.jobs = new ArrayList<>(); + } + + public void addJob(OperationalJobResponse job) + { + jobs.add(job); + } + + @JsonProperty("jobs") + public List<OperationalJobResponse> jobs() + { + return jobs; + } +} diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/OperationalJobResponse.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/OperationalJobResponse.java new file mode 100644 index 00000000..271f398a --- /dev/null +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/OperationalJobResponse.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.response; + +import java.util.UUID; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; + +/** + * Response structure of the operational jobs API + */ +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class OperationalJobResponse +{ + private final UUID jobId; + private final OperationalJobStatus status; + private final String operation; + private final String reason; + + @JsonCreator + public OperationalJobResponse(@JsonProperty("jobId") UUID jobId, + @JsonProperty("jobStatus") OperationalJobStatus status, + @JsonProperty("operation") String operation, + @JsonProperty("reason") String reason) + { + this.jobId = jobId; + this.status = status; + this.operation = operation; + this.reason = reason; + } + + /** + * @return job id of operational job + */ + @JsonProperty("jobId") + public UUID jobId() + { + return jobId; + } + + /** + * @return status of the job + */ + @JsonProperty("jobStatus") + public OperationalJobStatus status() + { + return status; + } + + /** + * @return operation of the job + */ + @JsonProperty("operation") + public String operation() + { + return operation; + } + + /** + * @return reason for job failure + */ + @JsonProperty("reason") + public String reason() + { + return reason; + } + +} diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java index 96082a61..65fad58d 100644 --- a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java +++ b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java @@ -21,6 +21,7 @@ package org.apache.cassandra.sidecar.client; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.UUID; import org.apache.cassandra.sidecar.client.retry.ExponentialBackoffRetryPolicy; import org.apache.cassandra.sidecar.client.retry.NoRetryPolicy; @@ -35,8 +36,10 @@ import org.apache.cassandra.sidecar.common.request.ConnectedClientStatsRequest; import org.apache.cassandra.sidecar.common.request.CreateSnapshotRequest; import org.apache.cassandra.sidecar.common.request.GossipInfoRequest; import org.apache.cassandra.sidecar.common.request.ImportSSTableRequest; +import org.apache.cassandra.sidecar.common.request.ListOperationalJobsRequest; import org.apache.cassandra.sidecar.common.request.ListSnapshotFilesRequest; import org.apache.cassandra.sidecar.common.request.NodeSettingsRequest; +import org.apache.cassandra.sidecar.common.request.OperationalJobRequest; import org.apache.cassandra.sidecar.common.request.Request; import org.apache.cassandra.sidecar.common.request.RingRequest; import org.apache.cassandra.sidecar.common.request.SSTableComponentRequest; @@ -72,6 +75,7 @@ public class RequestContext protected static final NodeSettingsRequest NODE_SETTINGS_REQUEST = new NodeSettingsRequest(); protected static final RingRequest RING_REQUEST = new RingRequest(); protected static final GossipInfoRequest GOSSIP_INFO_REQUEST = new GossipInfoRequest(); + protected static final ListOperationalJobsRequest LIST_JOBS_REQUEST = new ListOperationalJobsRequest(); protected static final RetryPolicy DEFAULT_RETRY_POLICY = new NoRetryPolicy(); protected static final RetryPolicy DEFAULT_EXPONENTIAL_BACKOFF_RETRY_POLICY = new ExponentialBackoffRetryPolicy(10, 500L, 60_000L); @@ -486,6 +490,28 @@ public class RequestContext return request(new ConnectedClientStatsRequest()); } + /** + * Sets the {@code request} to be a {@link OperationalJobRequest} and returns a reference to this Builder + * enabling method chaining. + * + * @return a reference to this Builder + */ + public Builder operationalJobRequest(UUID jobId) + { + return request(new OperationalJobRequest(jobId)); + } + + /** + * Sets the {@code request} to be a {@link ListOperationalJobsRequest} and returns a reference to this Builder + * enabling method chaining. + * + * @return a reference to this Builder + */ + public Builder listOperationalJobsRequest() + { + return request(LIST_JOBS_REQUEST); + } + /** * Sets the {@code retryPolicy} to be an * {@link org.apache.cassandra.sidecar.client.retry.ExponentialBackoffRetryPolicy} configured with diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java index 1925e0ad..9e69432c 100644 --- a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java +++ b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java @@ -51,8 +51,10 @@ import org.apache.cassandra.sidecar.common.request.data.UpdateRestoreJobRequestP import org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse; import org.apache.cassandra.sidecar.common.response.GossipInfoResponse; import org.apache.cassandra.sidecar.common.response.HealthResponse; +import org.apache.cassandra.sidecar.common.response.ListOperationalJobsResponse; import org.apache.cassandra.sidecar.common.response.ListSnapshotFilesResponse; import org.apache.cassandra.sidecar.common.response.NodeSettings; +import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; import org.apache.cassandra.sidecar.common.response.RingResponse; import org.apache.cassandra.sidecar.common.response.SSTableImportResponse; import org.apache.cassandra.sidecar.common.response.SchemaResponse; @@ -582,15 +584,46 @@ public class SidecarClient implements AutoCloseable, SidecarClientBlobRestoreExt } /** - * Executes the connected client stats request using the default retry policy and configured selection policy + * Executes the connected client stats request using the default retry policy and provided {@code instance}. * + * @param instance the instance where the request will be executed * @return a completable future of the connected client stats */ - public CompletableFuture<ConnectedClientStatsResponse> connectedClientStats() + public CompletableFuture<ConnectedClientStatsResponse> connectedClientStats(SidecarInstance instance) { - return executeRequestAsync(requestBuilder() - .connectedClientStatsRequest() - .build()); + return executor.executeRequestAsync(requestBuilder() + .singleInstanceSelectionPolicy(instance) + .connectedClientStatsRequest() + .build()); + } + + /** + * Executes the operational job request using the default retry policy and provided {@code instance}. + * + * @param instance the instance where the request will be executed + * @param jobId the unique operational job identifier + * @return a completable future of the operational job response + */ + public CompletableFuture<OperationalJobResponse> operationalJobs(SidecarInstance instance, UUID jobId) + { + return executor.executeRequestAsync(requestBuilder() + .singleInstanceSelectionPolicy(instance) + .operationalJobRequest(jobId) + .build()); + } + + /** + * Executes the list operational jobs request using the default retry policy and provided {@code instance}. + * + * @param instance the instance where the request will be executed + * @return a completable future of the list of operational jobs + */ + public CompletableFuture<ListOperationalJobsResponse> listOperationalJobs(SidecarInstance instance) + { + return executor.executeRequestAsync(requestBuilder() + .singleInstanceSelectionPolicy(instance) + .listOperationalJobsRequest() + .build()); } /** diff --git a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java index 8ac0bb3a..3933c066 100644 --- a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java +++ b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java @@ -59,6 +59,7 @@ import org.apache.cassandra.sidecar.client.request.RequestExecutorTest; import org.apache.cassandra.sidecar.client.retry.RetryAction; import org.apache.cassandra.sidecar.client.retry.RetryPolicy; import org.apache.cassandra.sidecar.common.ApiEndpointsV1; +import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; import org.apache.cassandra.sidecar.common.data.RestoreJobSecrets; import org.apache.cassandra.sidecar.common.request.ImportSSTableRequest; import org.apache.cassandra.sidecar.common.request.NodeSettingsRequest; @@ -69,8 +70,10 @@ import org.apache.cassandra.sidecar.common.request.data.XXHash32Digest; import org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse; import org.apache.cassandra.sidecar.common.response.GossipInfoResponse; import org.apache.cassandra.sidecar.common.response.HealthResponse; +import org.apache.cassandra.sidecar.common.response.ListOperationalJobsResponse; import org.apache.cassandra.sidecar.common.response.ListSnapshotFilesResponse; import org.apache.cassandra.sidecar.common.response.NodeSettings; +import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; import org.apache.cassandra.sidecar.common.response.RingResponse; import org.apache.cassandra.sidecar.common.response.SSTableImportResponse; import org.apache.cassandra.sidecar.common.response.SchemaResponse; @@ -89,6 +92,7 @@ import static io.netty.handler.codec.http.HttpResponseStatus.OK; import static io.netty.handler.codec.http.HttpResponseStatus.PARTIAL_CONTENT; import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.JOB_ID_PATH_PARAM; import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.KEYSPACE_PATH_PARAM; +import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.OPERATIONAL_JOB_ID_PATH_PARAM; import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.TABLE_PATH_PARAM; import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32; import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.CONTENT_XXHASH32_SEED; @@ -1266,6 +1270,56 @@ abstract class SidecarClientTest } } + @Test + public void testOperationalJobs() throws Exception + { + UUID jobId = UUID.randomUUID(); + String jobStatusAsString = "{\"jobId\":\"" + jobId + "\",\"jobStatus\":\"RUNNING\",\"operation\":\"test\"}"; + + MockResponse response = new MockResponse() + .setResponseCode(OK.code()) + .setHeader("content-type", "application/json") + .setBody(jobStatusAsString); + enqueue(response); + + for (MockWebServer server : servers) + { + SidecarInstanceImpl sidecarInstance = RequestExecutorTest.newSidecarInstance(server); + OperationalJobResponse result = client.operationalJobs(sidecarInstance, jobId).get(30, TimeUnit.SECONDS); + assertThat(result).isNotNull(); + assertThat(result.jobId()).isEqualTo(jobId); + assertThat(result.status()).isEqualTo(OperationalJobStatus.RUNNING); + assertThat(result.operation()).isEqualTo("test"); + validateResponseServed(server, + ApiEndpointsV1.OPERATIONAL_JOB_ROUTE.replaceAll(OPERATIONAL_JOB_ID_PATH_PARAM, jobId.toString()), + req -> { }); + } + } + + @Test + public void testlistOperationalJobs() throws Exception + { + UUID jobId = UUID.randomUUID(); + String listJobsString = "{\"jobs\":[{\"jobId\":\"" + jobId + "\",\"status\":\"RUNNING\",\"failureReason\":\"\",\"operation\":\"test\"}]}"; + + MockResponse response = new MockResponse() + .setResponseCode(OK.code()) + .setHeader("content-type", "application/json") + .setBody(listJobsString); + + enqueue(response); + + for (MockWebServer server : servers) + { + SidecarInstanceImpl sidecarInstance = RequestExecutorTest.newSidecarInstance(server); + ListOperationalJobsResponse result = client.listOperationalJobs(sidecarInstance).get(30, TimeUnit.SECONDS); + assertThat(result).isNotNull(); + assertThat(result.jobs()).isNotNull(); + assertThat(result.jobs().get(0).jobId()).isEqualTo(jobId); + validateResponseServed(server, ApiEndpointsV1.LIST_OPERATIONAL_JOBS_ROUTE, req -> { }); + } + } + @Test void testFailsWithOneAttemptPerServer() { @@ -1416,28 +1470,33 @@ abstract class SidecarClientTest MockResponse response = new MockResponse().setResponseCode(OK.code()).setBody(connectedClientStatsResponseAsString); enqueue(response); - ConnectedClientStatsResponse result = client.connectedClientStats().get(); - assertThat(result).isNotNull(); - assertThat(result.clientConnections()).isNotNull().hasSize(1); - assertThat(result.totalConnectedClients()).isNotNull().isEqualTo(1); - assertThat(result.connectionsByUser()).isNotNull().containsKey("anonymous"); - ClientConnectionEntry entry = result.clientConnections().iterator().next(); - assertThat(entry.address()).isEqualTo("127.0.0.1"); - assertThat(entry.port()).isEqualTo(54628); - assertThat(entry.sslEnabled()).isEqualTo(false); - assertThat(entry.sslCipherSuite()).isEqualTo("TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256"); - assertThat(entry.sslProtocol()).isEqualTo("TLSv1.2"); - assertThat(entry.protocolVersion()).isEqualTo("5"); - assertThat(entry.username()).isEqualTo("anonymous"); - assertThat(entry.requestCount()).isEqualTo(39); - assertThat(entry.driverName()).isEqualTo("DataStax Java Driver"); - assertThat(entry.driverVersion()).isEqualTo("3.11.3"); - assertThat(entry.keyspaceName()).isEqualTo("test"); - assertThat(entry.authenticationMode()).isEqualTo("MutualTls"); - assertThat(entry.authenticationMetadata()).containsKey("identity"); - assertThat(entry.clientOptions()).containsKeys("CQL_VERSION", "DRIVER_NAME", "DRIVER_VERSION"); - validateResponseServed(ApiEndpointsV1.CONNECTED_CLIENT_STATS_ROUTE); + for (MockWebServer server : servers) + { + SidecarInstanceImpl sidecarInstance = RequestExecutorTest.newSidecarInstance(server); + ConnectedClientStatsResponse result = client.connectedClientStats(sidecarInstance).get(); + + assertThat(result).isNotNull(); + assertThat(result.clientConnections()).isNotNull().hasSize(1); + assertThat(result.totalConnectedClients()).isNotNull().isEqualTo(1); + assertThat(result.connectionsByUser()).isNotNull().containsKey("anonymous"); + ClientConnectionEntry entry = result.clientConnections().iterator().next(); + assertThat(entry.address()).isEqualTo("127.0.0.1"); + assertThat(entry.port()).isEqualTo(54628); + assertThat(entry.sslEnabled()).isEqualTo(false); + assertThat(entry.sslCipherSuite()).isEqualTo("TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256"); + assertThat(entry.sslProtocol()).isEqualTo("TLSv1.2"); + assertThat(entry.protocolVersion()).isEqualTo("5"); + assertThat(entry.username()).isEqualTo("anonymous"); + assertThat(entry.requestCount()).isEqualTo(39); + assertThat(entry.driverName()).isEqualTo("DataStax Java Driver"); + assertThat(entry.driverVersion()).isEqualTo("3.11.3"); + assertThat(entry.keyspaceName()).isEqualTo("test"); + assertThat(entry.authenticationMode()).isEqualTo("MutualTls"); + assertThat(entry.authenticationMetadata()).containsKey("identity"); + assertThat(entry.clientOptions()).containsKeys("CQL_VERSION", "DRIVER_NAME", "DRIVER_VERSION"); + validateResponseServed(server, ApiEndpointsV1.CONNECTED_CLIENT_STATS_ROUTE, req -> { }); + } } private void enqueue(MockResponse response) @@ -1459,18 +1518,28 @@ abstract class SidecarClientTest { for (MockWebServer server : servers) { - if (server.getRequestCount() > 0) + if (validateResponseServed(server, expectedEndpointPath, serverReceivedRequestVerifier)) { - assertThat(server.getRequestCount()).isEqualTo(1); - RecordedRequest request = server.takeRequest(); - serverReceivedRequestVerifier.accept(request); - assertThat(request.getPath()).isEqualTo(expectedEndpointPath); return; } } fail("The request was not served by any of the provided servers"); } + private boolean validateResponseServed(MockWebServer server, String expectedEndpointPath, Consumer<RecordedRequest> serverReceivedRequestVerifier) + throws InterruptedException + { + if (server.getRequestCount() > 0) + { + assertThat(server.getRequestCount()).isEqualTo(1); + RecordedRequest request = server.takeRequest(); + serverReceivedRequestVerifier.accept(request); + assertThat(request.getPath()).isEqualTo(expectedEndpointPath); + return true; + } + return false; + } + private InputStream resourceInputStream(String name) { InputStream inputStream = getClass().getClassLoader().getResourceAsStream(name); diff --git a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/exceptions/OperationalJobException.java b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/exceptions/OperationalJobException.java new file mode 100644 index 00000000..7e29bf40 --- /dev/null +++ b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/exceptions/OperationalJobException.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.server.exceptions; + +import org.apache.cassandra.sidecar.common.server.utils.ThrowableUtils; + +/** + * Exception thrown when an operational job conflict is detected + */ +public class OperationalJobException extends RuntimeException +{ + public static OperationalJobException wraps(Throwable throwable) + { + // extract the OperationalJobException from the stacktrace, if exists + // otherwise, wraps the throwable as OperationalJobException + OperationalJobException oje = ThrowableUtils.getCause(throwable, OperationalJobException.class); + if (oje != null) + { + return oje; + } + else + { + return new OperationalJobException(throwable.getMessage(), throwable); + } + } + + public OperationalJobException(String message) + { + super(message); + } + + public OperationalJobException(String message, Throwable cause) + { + super(message, cause); + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/config/ServiceConfiguration.java b/server/src/main/java/org/apache/cassandra/sidecar/config/ServiceConfiguration.java index a5150f79..341b3c04 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/config/ServiceConfiguration.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/config/ServiceConfiguration.java @@ -89,6 +89,19 @@ public interface ServiceConfiguration */ int serverVerticleInstances(); + /** + * TODO: move operationalJob related configuration to its own class, when the number of configurable fields grows in the future + * @return the size of the operational job tracker LRU cache + */ + int operationalJobTrackerSize(); + + /** + * @return the max wait time in milliseconds for operational job to run internally before returning the http response; + * if the job finishes before the max wait time, it returns immediately on completion; + * otherwise, a response indicating the job is still running is returned after the max wait time. + */ + long operationalJobExecutionMaxWaitTimeInMillis(); + /** * @return the throttling configuration */ diff --git a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/ServiceConfigurationImpl.java b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/ServiceConfigurationImpl.java index 8f2c9879..0f442bb0 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/ServiceConfigurationImpl.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/ServiceConfigurationImpl.java @@ -57,7 +57,11 @@ public class ServiceConfigurationImpl implements ServiceConfiguration public static final String ALLOWABLE_SKEW_IN_MINUTES_PROPERTY = "allowable_time_skew_in_minutes"; public static final int DEFAULT_ALLOWABLE_SKEW_IN_MINUTES = 60; private static final String SERVER_VERTICLE_INSTANCES_PROPERTY = "server_verticle_instances"; + private static final String OPERATIONAL_JOB_TRACKER_SIZE_PROPERTY = "operations_job_tracker_size"; + private static final String OPERATIONAL_JOB_EXECUTION_MAX_WAIT_TIME_MILLIS_PROPERTY = "operations_job_sync_response_timeout"; private static final int DEFAULT_SERVER_VERTICLE_INSTANCES = 1; + private static final int DEFAULT_OPERATIONAL_JOB_TRACKER_SIZE = 64; + private static final long DEFAULT_OPERATIONAL_JOB_EXECUTION_MAX_WAIT_TIME_MILLIS = TimeUnit.SECONDS.toMillis(5); public static final String THROTTLE_PROPERTY = "throttle"; public static final String SSTABLE_UPLOAD_PROPERTY = "sstable_upload"; public static final String SSTABLE_IMPORT_PROPERTY = "sstable_import"; @@ -102,6 +106,12 @@ public class ServiceConfigurationImpl implements ServiceConfiguration @JsonProperty(value = SERVER_VERTICLE_INSTANCES_PROPERTY, defaultValue = DEFAULT_SERVER_VERTICLE_INSTANCES + "") protected final int serverVerticleInstances; + @JsonProperty(value = OPERATIONAL_JOB_TRACKER_SIZE_PROPERTY, defaultValue = DEFAULT_OPERATIONAL_JOB_TRACKER_SIZE + "") + protected final int operationalJobTrackerSize; + + @JsonProperty(value = OPERATIONAL_JOB_EXECUTION_MAX_WAIT_TIME_MILLIS_PROPERTY) + protected final long operationalJobExecutionMaxWaitTimeMillis; + @JsonProperty(value = THROTTLE_PROPERTY) protected final ThrottleConfiguration throttleConfiguration; @@ -152,6 +162,8 @@ public class ServiceConfigurationImpl implements ServiceConfiguration acceptBacklog = builder.acceptBacklog; allowableSkewInMinutes = builder.allowableSkewInMinutes; serverVerticleInstances = builder.serverVerticleInstances; + operationalJobTrackerSize = builder.operationalJobTrackerSize; + operationalJobExecutionMaxWaitTimeMillis = builder.operationalJobExecutionMaxWaitTimeMillis; throttleConfiguration = builder.throttleConfiguration; sstableUploadConfiguration = builder.sstableUploadConfiguration; sstableImportConfiguration = builder.sstableImportConfiguration; @@ -243,6 +255,26 @@ public class ServiceConfigurationImpl implements ServiceConfiguration return serverVerticleInstances; } + /** + * {@inheritDoc} + */ + @Override + @JsonProperty(value = OPERATIONAL_JOB_TRACKER_SIZE_PROPERTY) + public int operationalJobTrackerSize() + { + return operationalJobTrackerSize; + } + + /** + * {@inheritDoc} + */ + @Override + @JsonProperty(value = OPERATIONAL_JOB_EXECUTION_MAX_WAIT_TIME_MILLIS_PROPERTY) + public long operationalJobExecutionMaxWaitTimeInMillis() + { + return operationalJobExecutionMaxWaitTimeMillis; + } + /** * {@inheritDoc} */ @@ -352,6 +384,8 @@ public class ServiceConfigurationImpl implements ServiceConfiguration protected int acceptBacklog = DEFAULT_ACCEPT_BACKLOG; protected int allowableSkewInMinutes = DEFAULT_ALLOWABLE_SKEW_IN_MINUTES; protected int serverVerticleInstances = DEFAULT_SERVER_VERTICLE_INSTANCES; + protected int operationalJobTrackerSize = DEFAULT_OPERATIONAL_JOB_TRACKER_SIZE; + protected long operationalJobExecutionMaxWaitTimeMillis = DEFAULT_OPERATIONAL_JOB_EXECUTION_MAX_WAIT_TIME_MILLIS; protected ThrottleConfiguration throttleConfiguration = new ThrottleConfigurationImpl(); protected SSTableUploadConfiguration sstableUploadConfiguration = new SSTableUploadConfigurationImpl(); protected SSTableImportConfiguration sstableImportConfiguration = new SSTableImportConfigurationImpl(); @@ -461,6 +495,29 @@ public class ServiceConfigurationImpl implements ServiceConfiguration return update(b -> b.serverVerticleInstances = serverVerticleInstances); } + /** + * Sets the {@code operationalJobTrackerSize} and returns a reference to this Builder enabling method chaining. + * + * @param operationalJobTrackerSize the {@code operationalJobTrackerSize} to set + * @return a reference to this Builder + */ + public Builder operationalJobTrackerSize(int operationalJobTrackerSize) + { + return update(b -> b.operationalJobTrackerSize = operationalJobTrackerSize); + } + + /** + * Sets the {@code operationalJobExecutionMaxWaitTimeMillis} and returns a reference to this Builder + * enabling method chaining. + * + * @param operationalJobExecutionMaxWaitTimeMillis the {@code operationalJobExecutionMaxWaitTimeMillis} to set + * @return a reference to this Builder + */ + public Builder operationalJobExecutionMaxWaitTimeMillis(int operationalJobExecutionMaxWaitTimeMillis) + { + return update(b -> b.operationalJobExecutionMaxWaitTimeMillis = operationalJobExecutionMaxWaitTimeMillis); + } + /** * Sets the {@code throttleConfiguration} and returns a reference to this Builder enabling method chaining. * diff --git a/server/src/main/java/org/apache/cassandra/sidecar/exceptions/OperationalJobConflictException.java b/server/src/main/java/org/apache/cassandra/sidecar/exceptions/OperationalJobConflictException.java new file mode 100644 index 00000000..94316fb4 --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/exceptions/OperationalJobConflictException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.exceptions; + +import org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException; + +/** + * When the operational job to be submitted is conflicting with another one that is running on Cassandra + */ +public class OperationalJobConflictException extends OperationalJobException +{ + public OperationalJobConflictException(String message) + { + super(message); + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java new file mode 100644 index 00000000..1076b44f --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.job; + +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.TimeoutException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.driver.core.utils.UUIDs; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; +import org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException; +import org.apache.cassandra.sidecar.common.utils.Preconditions; +import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; +import org.apache.cassandra.sidecar.tasks.Task; + +/** + * An abstract class representing operational jobs that run on Cassandra + */ +public abstract class OperationalJob implements Task<Void> +{ + private static final Logger LOGGER = LoggerFactory.getLogger(OperationalJob.class); + + // use v1 time-based uuid + public final UUID jobId; + + private final Promise<Void> executionPromise; + private volatile boolean isExecuting = false; + + /** + * Constructs a job with a unique UUID, in Pending state + * + * @param jobId UUID representing the Job to be created + */ + protected OperationalJob(UUID jobId) + { + Preconditions.checkArgument(jobId.version() == 1, "OperationalJob accepts only time-based UUID"); + this.jobId = jobId; + this.executionPromise = Promise.promise(); + } + + @Override + public final Void result() + { + // Update when operational jobs can contain result rather than status + throw new UnsupportedOperationException("No result is expected from an OperationalJob"); + } + + /** + * @return unix timestamp of the job creation time in milliseconds + */ + public long creationTime() + { + return UUIDs.unixTimestamp(jobId); + } + + /** + * @return whether the operational job is executing or not. + */ + public boolean isExecuting() + { + return isExecuting; + } + + /** + * Determine whether the operational job is stale by considering both the referenceTimestampInMillis and the ttlInMillis + * + * @return true if the job's life duration has exceeded ttlInMillis; otherwise, false + */ + public boolean isStale(long referenceTimestampInMillis, long ttlInMillis) + { + long createdAt = creationTime(); + Preconditions.checkArgument(referenceTimestampInMillis >= createdAt, "Invalid referenceTimestampInMillis"); + Preconditions.checkArgument(ttlInMillis >= 0, "Invalid ttlInMillis"); + return referenceTimestampInMillis - createdAt > ttlInMillis; + } + + /** + * Determines the status of the job. OperationalJob subclasses could choose to override the method. + * <p> + * For long-lived jobs, the implementations should return the {@link OperationalJobStatus#RUNNING} status intelligently. + * The condition of {@link OperationalJobStatus#RUNNING} is implementation-specific. + * For example, node decommission is tracked by the operationMode exposed from Cassandra. + * If the operationMode is LEAVING, the corresponding OperationalJob is {@link OperationalJobStatus#RUNNING}. + * In this case, even if the OperationalJobStatus determined from this method is {@link OperationalJobStatus#CREATED}, + * the concrete implementation can override and return {@link OperationalJobStatus#RUNNING}. + * <p> + * For short-lived jobs, i.e. the result is known right away, the implementations do not return the {@link OperationalJobStatus#RUNNING} status. + * They return either {@link OperationalJobStatus#SUCCEEDED} or {@link OperationalJobStatus#FAILED} + * + * @return status of the OperationalJob execution + */ + public OperationalJobStatus status() + { + Future<Void> fut = asyncResult(); + if (!isExecuting) + { + return OperationalJobStatus.CREATED; + } + if (!fut.isComplete()) + { + return OperationalJobStatus.RUNNING; + } + else if (fut.failed()) + { + return OperationalJobStatus.FAILED; + } + else + { + return OperationalJobStatus.SUCCEEDED; + } + } + + public Future<Void> asyncResult() + { + return executionPromise.future(); + } + + /** + * Get the async result with waiting for at most the specified wait time + * <p> + * Note: This call does not block the calling thread. + * The call-site should handle the possible failed future with {@link TimeoutException} from this method. + * + * @param executorPool executor pool to run the timer + * @param waitTime maximum time to wait before returning + * @return the async result or a failed future of {@link OperationalJobException} with the cause + * {@link TimeoutException} after exceeding the wait time + */ + public Future<Void> asyncResult(TaskExecutorPool executorPool, Duration waitTime) + { + Future<Void> resultFut = asyncResult(); + if (resultFut.isComplete()) + { + return resultFut; + } + + // complete the max wait time promise either when exceeding the wait time, or the result is available + Promise<Boolean> maxWaitTimePromise = Promise.promise(); + executorPool.setTimer(waitTime.toMillis(), d -> maxWaitTimePromise.tryComplete(true)); // complete with true, meaning timeout + resultFut.onComplete(res -> maxWaitTimePromise.tryComplete(false)); // complete with false, meaning not timeout + Future<Boolean> maxWaitTimeFut = maxWaitTimePromise.future(); + // Completes as soon as any future succeeds, or when all futures fail. Note that maxWaitTimePromise is + // closed as soon as resultFut completes + return Future.any(maxWaitTimeFut, resultFut) + // We want to return the result when applicable, of course. + // If this lambda below is evaluated, both futures are completed; + // Depending on whether timeout flag is set, it either throws or complete with result + .compose(f -> { + boolean isTimeout = maxWaitTimeFut.result(); + if (isTimeout) + { + return Future.succeededFuture(); + } + // otherwise, the result of the job is available + return resultFut; + }); + } + + /** + * OperationalJob body. The implementation is executed in a blocking manner. + * + * @throws OperationalJobException OperationalJobException that wraps job failure + */ + protected abstract void executeInternal() throws OperationalJobException; + + /** + * Execute the job behavior as specified in the internal execution {@link #executeInternal()}, + * while tracking the status of the job's lifecycle. + */ + @Override + public void execute(Promise<Void> promise) + { + isExecuting = true; + LOGGER.info("Executing job. jobId={}", jobId); + try + { + // Blocking call to perform concrete job-specific execution, returning the status + executeInternal(); + executionPromise.tryComplete(); + promise.tryComplete(); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Complete job execution. jobId={} status={}", jobId, status()); + } + } + catch (Throwable e) + { + OperationalJobException oje = OperationalJobException.wraps(e); + LOGGER.error("Job execution failed. jobId={} reason='{}'", jobId, oje.getMessage(), oje); + executionPromise.tryFail(oje); + promise.tryFail(oje); + } + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java new file mode 100644 index 00000000..034fd567 --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.job; + +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import javax.inject.Inject; + +import com.google.inject.Singleton; +import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; +import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException; + +/** + * An abstraction of the management and tracking of long-running jobs running on the sidecar. + */ +@Singleton +public class OperationalJobManager +{ + private final OperationalJobTracker jobTracker; + + /** + * Creates a manager instance with a default sized job-tracker. + * + * @param jobTracker the tracker for the operational jobs + */ + @Inject + public OperationalJobManager(OperationalJobTracker jobTracker) + { + this.jobTracker = jobTracker; + } + + /** + * Fetches the inflight jobs being tracked on the sidecar + * + * @return instances of the jobs that are in pending or running states + */ + public List<OperationalJob> allInflightJobs() + { + return jobTracker.getJobsView().values() + .stream() + .filter(j -> !j.asyncResult().isComplete()) + .collect(Collectors.toList()); + } + + /** + * Fetch the job using its UUID + * + * @param jobId identifier of the job + * @return instance of the job or null + */ + public OperationalJob getJobIfExists(UUID jobId) + { + return jobTracker.get(jobId); + } + + /** + * Try to submit the job to execute asynchronously, if it is not currently being + * tracked and not running. The job is triggered on a separate internal thread-pool. + * The job execution failure behavior is tracked within the {@link OperationalJob}. + * + * @param job OperationalJob instance to submit + * @return OperationalJob instance that is submitted + * @throws OperationalJobConflictException when the same operational job is already running on Cassandra + */ + public OperationalJob trySubmitJob(OperationalJob job) throws OperationalJobConflictException + { + checkConflict(job); + + // New job is submitted for all cases when we do not have a corresponding downstream job + return jobTracker.computeIfAbsent(job.jobId, jobId -> job); + } + + private void checkConflict(OperationalJob job) throws OperationalJobConflictException + { + // The job is not yet submitted (and running), but its status indicates that there is an identical job running on Cassandra already + // In this case, this job submission is rejected. + if (job.status() == OperationalJobStatus.RUNNING) + { + throw new OperationalJobConflictException("The same operational job is already running on Cassandra. operationName='" + job.name() + '\''); + } + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java new file mode 100644 index 00000000..649a8a0b --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobTracker.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.job; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.cassandra.sidecar.config.ServiceConfiguration; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.VisibleForTesting; + +/** + * Tracks and stores the results of long-running jobs running on the sidecar + */ +@Singleton +public class OperationalJobTracker +{ + public static final long ONE_DAY_TTL = TimeUnit.DAYS.toMillis(1); // todo: consider making it configurable + + private static final Logger LOGGER = LoggerFactory.getLogger(OperationalJobTracker.class); + private final Map<UUID, OperationalJob> map; + + @Inject + public OperationalJobTracker(ServiceConfiguration serviceConfiguration) + { + this(serviceConfiguration.operationalJobTrackerSize()); + } + + public OperationalJobTracker(int initialCapacity) + { + map = Collections.synchronizedMap(new LinkedHashMap<UUID, OperationalJob>(initialCapacity) + { + /** + * {@inheritDoc} + */ + @Override + protected boolean removeEldestEntry(Map.Entry<UUID, OperationalJob> eldest) + { + // We have reached capacity and the oldest entry is either ready for cleanup or stale + if (map.size() > initialCapacity) + { + OperationalJob job = eldest.getValue(); + if (job.status().isCompleted() && job.isStale(System.currentTimeMillis(), ONE_DAY_TTL)) + { + LOGGER.debug("Expiring completed and stale job due to job tracker has reached max size. jobId={} status={} createdAt={}", + job.jobId, job.status(), job.creationTime()); + return true; + } + else + { + LOGGER.warn("Job tracker reached max size, but the eldest job is not completed yet. " + + "Not evicting. jobId={} status={}", job.jobId, job.status()); + // TODO: Optionally trigger cleanup to fetch next oldest to evict + } + } + + return false; + } + }); + } + + public OperationalJob computeIfAbsent(UUID key, Function<UUID, OperationalJob> mappingFunction) + { + return map.computeIfAbsent(key, mappingFunction); + } + + public OperationalJob get(UUID key) + { + return map.get(key); + } + + /** + * Returns an immutable copy of the underlying map, to provide a consistent view of the map, minimizing contention + * + * @return an immutable copy of the underlying mapping + */ + @NotNull + Map<UUID, OperationalJob> getJobsView() + { + return Collections.unmodifiableMap(map); + } + + @VisibleForTesting + OperationalJob put(OperationalJob job) + { + return map.put(job.jobId, job); + } + + @VisibleForTesting + int size() + { + return map.size(); + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/routes/ListOperationalJobsHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/routes/ListOperationalJobsHandler.java new file mode 100644 index 00000000..ba7a24dc --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/routes/ListOperationalJobsHandler.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import javax.inject.Inject; + +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.common.response.ListOperationalJobsResponse; +import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.job.OperationalJobManager; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; + +import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING; + +/** + * Handler for retrieving the all the jobs running on the sidecar + */ +public class ListOperationalJobsHandler extends AbstractHandler<Void> +{ + private final OperationalJobManager jobManager; + + @Inject + public ListOperationalJobsHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + CassandraInputValidator validator, + OperationalJobManager jobManager) + { + super(metadataFetcher, executorPools, validator); + this.jobManager = jobManager; + } + + @Override + protected Void extractParamsOrThrow(RoutingContext context) + { + return null; + } + + @Override + protected void handleInternal(RoutingContext context, HttpServerRequest httpRequest, String host, SocketAddress remoteAddress, Void request) + { + ListOperationalJobsResponse listResponse = new ListOperationalJobsResponse(); + jobManager.allInflightJobs() + .stream() + .map(job -> new OperationalJobResponse(job.jobId, RUNNING, job.name(), null)) + .forEach(listResponse::addJob); + context.json(listResponse); + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/routes/OperationalJobHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/routes/OperationalJobHandler.java new file mode 100644 index 00000000..162903af --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/routes/OperationalJobHandler.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.UUID; +import javax.inject.Inject; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; +import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.job.OperationalJob; +import org.apache.cassandra.sidecar.job.OperationalJobManager; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; + +import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.OPERATIONAL_JOB_ID_PATH_PARAM; +import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Handler for retrieving the status of async operational jobs running on the sidecar + */ +public class OperationalJobHandler extends AbstractHandler<Void> +{ + private final OperationalJobManager jobManager; + + @Inject + public OperationalJobHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + CassandraInputValidator validator, + OperationalJobManager jobManager) + { + super(metadataFetcher, executorPools, validator); + this.jobManager = jobManager; + } + + @Override + protected Void extractParamsOrThrow(RoutingContext context) + { + return null; + } + + @Override + public void handleInternal(RoutingContext context, HttpServerRequest httpRequest, String host, SocketAddress remoteAddress, Void request) + { + UUID jobId = validatedJobIdParam(context); + executorPools.service() + .executeBlocking(() -> { + OperationalJob job = jobManager.getJobIfExists(jobId); + if (job == null) + { + logger.info("No operational job found with the jobId. jobId={}", jobId); + throw wrapHttpException(HttpResponseStatus.NOT_FOUND, + String.format("Unknown job with ID: %s. Please retry the operation.", jobId)); + } + return job; + }) + .onFailure(cause -> processFailure(cause, context, host, remoteAddress, request)) + .onSuccess(job -> sendStatusBasedResponse(context, jobId, job)); + } + + UUID validatedJobIdParam(RoutingContext context) + { + String requestJobId = context.pathParam(OPERATIONAL_JOB_ID_PATH_PARAM.substring(1)); + if (requestJobId == null) + { + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, + OPERATIONAL_JOB_ID_PATH_PARAM + " is required but not supplied"); + } + + UUID jobId; + try + { + jobId = UUID.fromString(requestJobId); + } + catch (IllegalArgumentException e) + { + logger.info("Invalid jobId. jobId={}", requestJobId); + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, String.format("Invalid job ID provided :%s.", requestJobId)); + } + return jobId; + } + + public void sendStatusBasedResponse(RoutingContext context, UUID jobId, OperationalJob job) + { + OperationalJobStatus status = job.status(); + if (status.isCompleted()) + { + context.response().setStatusCode(HttpResponseStatus.OK.code()); + } + else + { + context.response().setStatusCode(HttpResponseStatus.ACCEPTED.code()); + } + + String reason = status == FAILED ? job.asyncResult().cause().getMessage() : null; + context.json(new OperationalJobResponse(jobId, status, job.name(), reason)); + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java index e135e6a1..31499527 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java @@ -99,6 +99,8 @@ import org.apache.cassandra.sidecar.routes.DiskSpaceProtectionHandler; import org.apache.cassandra.sidecar.routes.FileStreamHandler; import org.apache.cassandra.sidecar.routes.GossipInfoHandler; import org.apache.cassandra.sidecar.routes.JsonErrorHandler; +import org.apache.cassandra.sidecar.routes.ListOperationalJobsHandler; +import org.apache.cassandra.sidecar.routes.OperationalJobHandler; import org.apache.cassandra.sidecar.routes.RingHandler; import org.apache.cassandra.sidecar.routes.RoutingOrder; import org.apache.cassandra.sidecar.routes.SchemaHandler; @@ -269,6 +271,8 @@ public class MainModule extends AbstractModule CreateRestoreSliceHandler createRestoreSliceHandler, RestoreJobProgressHandler restoreJobProgressHandler, ConnectedClientStatsHandler connectedClientStatsHandler, + OperationalJobHandler operationalJobHandler, + ListOperationalJobsHandler listOperationalJobsHandler, ErrorHandler errorHandler) { Router router = Router.router(vertx); @@ -362,6 +366,12 @@ public class MainModule extends AbstractModule router.get(ApiEndpointsV1.CONNECTED_CLIENT_STATS_ROUTE) .handler(connectedClientStatsHandler); + router.get(ApiEndpointsV1.OPERATIONAL_JOB_ROUTE) + .handler(operationalJobHandler); + + router.get(ApiEndpointsV1.LIST_OPERATIONAL_JOBS_ROUTE) + .handler(listOperationalJobsHandler); + router.get(ApiEndpointsV1.RING_ROUTE_PER_KEYSPACE) .handler(ringHandler); diff --git a/server/src/main/java/org/apache/cassandra/sidecar/tasks/PeriodicTask.java b/server/src/main/java/org/apache/cassandra/sidecar/tasks/PeriodicTask.java index 7f7c53dc..6525b2a8 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/tasks/PeriodicTask.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/tasks/PeriodicTask.java @@ -20,12 +20,10 @@ package org.apache.cassandra.sidecar.tasks; import java.util.concurrent.TimeUnit; -import io.vertx.core.Promise; - /** * An interface that defines a periodic task that will be executed during the lifecycle of Cassandra Sidecar */ -public interface PeriodicTask +public interface PeriodicTask extends Task<Void> { /** * @return delay in the specified {@link #delayUnit()} for periodic task @@ -56,18 +54,6 @@ public interface PeriodicTask return delayUnit(); } - /** - * Defines the task body. - * The method can be considered as executing in a single thread. - * - * <br><b>NOTE:</b> the {@code promise} must be completed (as either succeeded or failed) at the end of the run. - * Failing to do so, the {@link PeriodicTaskExecutor} will not be able to schedule a new run. - * See {@link PeriodicTaskExecutor#executeInternal} for details. - * - * @param promise a promise when the execution completes - */ - void execute(Promise<Void> promise); - /** * Register the periodic task executor at the task. By default, it is no-op. * If the reference to the executor is needed, the concrete {@link PeriodicTask} can implement this method @@ -88,21 +74,9 @@ public interface PeriodicTask return false; } - /** - * Close any resources it opened. - * Implementation note: it is encouraged to handle the exceptions during close() - */ - default void close() - { - } - - /** - * @return descriptive name of the task. It prefers simple class name, if it is non-empty; - * otherwise, it returns the full class name - */ - default String name() + @Override + default Void result() { - String simpleName = this.getClass().getSimpleName(); - return simpleName.isEmpty() ? this.getClass().getName() : simpleName; + throw new UnsupportedOperationException("No result is expected from a Periodic task"); } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/tasks/PeriodicTask.java b/server/src/main/java/org/apache/cassandra/sidecar/tasks/Task.java similarity index 50% copy from server/src/main/java/org/apache/cassandra/sidecar/tasks/PeriodicTask.java copy to server/src/main/java/org/apache/cassandra/sidecar/tasks/Task.java index 7f7c53dc..d63d3bbd 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/tasks/PeriodicTask.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/tasks/Task.java @@ -18,75 +18,31 @@ package org.apache.cassandra.sidecar.tasks; -import java.util.concurrent.TimeUnit; - import io.vertx.core.Promise; +import org.jetbrains.annotations.Nullable; /** - * An interface that defines a periodic task that will be executed during the lifecycle of Cassandra Sidecar + * * An interface that defines a task that will be executed during the lifecycle of Cassandra Sidecar + * @param <T> */ -public interface PeriodicTask +public interface Task<T> { - /** - * @return delay in the specified {@link #delayUnit()} for periodic task - */ - long delay(); - - /** - * @return the unit for the {@link #delay()}, if not specified defaults to milliseconds - */ - default TimeUnit delayUnit() - { - return TimeUnit.MILLISECONDS; - } - - /** - * @return the initial delay for the task, defaults to the {@link #delay()} - */ - default long initialDelay() - { - return delay(); - } - - /** - * @return the units for the {@link #initialDelay()}, if not specified defaults to {@link #delayUnit()} - */ - default TimeUnit initialDelayUnit() - { - return delayUnit(); - } - /** * Defines the task body. * The method can be considered as executing in a single thread. * * <br><b>NOTE:</b> the {@code promise} must be completed (as either succeeded or failed) at the end of the run. - * Failing to do so, the {@link PeriodicTaskExecutor} will not be able to schedule a new run. - * See {@link PeriodicTaskExecutor#executeInternal} for details. + * Failing to do so, the executor will not be able to trigger a new run. * * @param promise a promise when the execution completes */ - void execute(Promise<Void> promise); + void execute(Promise<T> promise); /** - * Register the periodic task executor at the task. By default, it is no-op. - * If the reference to the executor is needed, the concrete {@link PeriodicTask} can implement this method - * - * @param executor the executor that manages the task + * @return the result of task execution. Null is returned if there is no result */ - default void registerPeriodicTaskExecutor(PeriodicTaskExecutor executor) - { - } - - /** - * Specify whether the task should be skipped. - * // TODO: consider returning the reason to skip, instead of just a boolean value - * @return {@code true} to skip; otherwise, return {@code false} - */ - default boolean shouldSkip() - { - return false; - } + @Nullable + T result(); /** * Close any resources it opened. diff --git a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java new file mode 100644 index 00000000..f2f8307c --- /dev/null +++ b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.job; + +import java.time.Duration; +import java.util.UUID; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.datastax.driver.core.utils.UUIDs; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; +import org.apache.cassandra.sidecar.config.ServiceConfiguration; +import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING; +import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests to validate the Job submission behavior for scenarios which are a combination of values for + * + * <ul> + * <ol> 1) Downstream job existence,</ol> + * <ol> 2) Cached job (null (not in cache), Completed/Failed job, Running job), and</ol> + * <ol> 3) Request UUID (null (no header), UUID)</ol> + * </ul> + */ +class OperationalJobManagerTest +{ + @Mock + SidecarConfiguration mockConfig; + + protected Vertx vertx; + + @BeforeEach + void setup() + { + vertx = Vertx.vertx(); + MockitoAnnotations.openMocks(this); + ServiceConfiguration mockServiceConfig = mock(ServiceConfiguration.class); + when(mockConfig.serviceConfiguration()).thenReturn(mockServiceConfig); + when(mockServiceConfig.operationalJobExecutionMaxWaitTimeInMillis()).thenReturn(5000L); + } + + @Test + void testWithNoDownstreamJob() + { + OperationalJobTracker tracker = new OperationalJobTracker(4); + OperationalJobManager manager = new OperationalJobManager(tracker); + + OperationalJob testJob = OperationalJobTest.createOperationalJob(SUCCEEDED); + manager.trySubmitJob(testJob); + testJob.execute(Promise.promise()); + assertThat(testJob.asyncResult().isComplete()).isTrue(); + assertThat(testJob.status()).isEqualTo(SUCCEEDED); + assertThat(tracker.get(testJob.jobId)).isNotNull(); + } + + @Test + void testWithRunningDownstreamJob() + { + OperationalJob runningJob = OperationalJobTest.createOperationalJob(RUNNING); + OperationalJobTracker tracker = new OperationalJobTracker(4); + ExecutorPools mockPools = mock(ExecutorPools.class); + TaskExecutorPool mockExecPool = mock(TaskExecutorPool.class); + when(mockPools.internal()).thenReturn(mockExecPool); + when(mockExecPool.runBlocking(any())).thenReturn(null); + OperationalJobManager manager = new OperationalJobManager(tracker); + assertThatThrownBy(() -> manager.trySubmitJob(runningJob)) + .isExactlyInstanceOf(OperationalJobConflictException.class) + .hasMessage("The same operational job is already running on Cassandra. operationName='Operation X'"); + } + + @Test + void testWithLongRunningJob() + { + UUID jobId = UUIDs.timeBased(); + + OperationalJobTracker tracker = new OperationalJobTracker(4); + OperationalJobManager manager = new OperationalJobManager(tracker); + + OperationalJob testJob = OperationalJobTest.createOperationalJob(jobId, Duration.ofSeconds(10)); + + manager.trySubmitJob(testJob); + // execute the job async. + vertx.executeBlocking(testJob::execute); + // by the time of checking, the job should still be running. It runs for 10 seconds. + assertThat(testJob.asyncResult().isComplete()).isFalse(); + assertThat(tracker.get(jobId)).isNotNull(); + } + + @Test + void testWithFailingJob() + { + UUID jobId = UUIDs.timeBased(); + + OperationalJobTracker tracker = new OperationalJobTracker(4); + OperationalJobManager manager = new OperationalJobManager(tracker); + + String msg = "Test Job failed"; + OperationalJob failingJob = new OperationalJob(jobId) + { + @Override + protected void executeInternal() throws OperationalJobException + { + throw new OperationalJobException(msg); + } + }; + + manager.trySubmitJob(failingJob); + failingJob.execute(Promise.promise()); + assertThat(failingJob.asyncResult().isComplete()).isTrue(); + assertThat(failingJob.asyncResult().failed()).isTrue(); + assertThat(tracker.get(jobId)).isNotNull(); + } +} diff --git a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java new file mode 100644 index 00000000..9e2bd51f --- /dev/null +++ b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTest.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.job; + +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.jupiter.api.Test; + +import com.datastax.driver.core.utils.UUIDs; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; +import org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; +import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl; + +import static org.apache.cassandra.sidecar.AssertionUtils.loopAssert; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests to validate the Job APIs + */ +class OperationalJobTest +{ + private final TaskExecutorPool executorPool = new ExecutorPools(Vertx.vertx(), new ServiceConfigurationImpl()).internal(); + + public static OperationalJob createOperationalJob(OperationalJobStatus jobStatus) + { + return createOperationalJob(UUIDs.timeBased(), jobStatus); + } + + public static OperationalJob createOperationalJob(UUID jobId, OperationalJobStatus jobStatus) + { + return new OperationalJob(jobId) + { + @Override + protected void executeInternal() throws OperationalJobException + { + } + + @Override + public OperationalJobStatus status() + { + return jobStatus; + } + + @Override + public String name() + { + return "Operation X"; + } + }; + } + + public static OperationalJob createOperationalJob(UUID jobId, Duration jobDuration) + { + return createOperationalJob(jobId, jobDuration, null); + } + + public static OperationalJob createOperationalJob(UUID jobId, Duration jobDuration, OperationalJobException jobFailure) + { + return new OperationalJob(jobId) + { + @Override + protected void executeInternal() throws OperationalJobException + { + if (jobDuration != null) + { + Uninterruptibles.sleepUninterruptibly(jobDuration.toMillis(), TimeUnit.MILLISECONDS); + } + + if (jobFailure != null) + { + throw jobFailure; + } + } + + @Override + public String name() + { + return "Operation X"; + } + }; + } + + @Test + void testJobCompletion() + { + OperationalJob job = createOperationalJob(OperationalJobStatus.SUCCEEDED); + Promise<Void> p = Promise.promise(); + job.execute(p); + Future<Void> future = p.future(); + assertThat(future.succeeded()).isTrue(); + assertThat(job.asyncResult().succeeded()).isTrue(); + assertThat(job.status()).isEqualTo(OperationalJobStatus.SUCCEEDED); + } + + @Test + void testJobFailed() + { + String msg = "Test Job failed"; + OperationalJob failingJob = new OperationalJob(UUIDs.timeBased()) + { + @Override + protected void executeInternal() throws OperationalJobException + { + throw new OperationalJobException(msg); + } + }; + + Promise<Void> p = Promise.promise(); + failingJob.execute(p); + + Future<Void> future = p.future(); + assertThat(future.failed()).isTrue(); + assertThat(future.cause()) + .isExactlyInstanceOf(OperationalJobException.class) + .hasMessage(msg); + assertThat(failingJob.status()).isEqualTo(OperationalJobStatus.FAILED); + assertThat(failingJob.asyncResult().failed()).isTrue(); + assertThat(failingJob.asyncResult().cause()) + .isExactlyInstanceOf(OperationalJobException.class) + .hasMessage(msg); + } + + @Test + void testGetAsyncResultInWaitTime() + { + OperationalJob longRunning = createOperationalJob(UUIDs.timeBased(), Duration.ofMillis(500L)); + executorPool.executeBlocking(longRunning::execute); + Duration waitTime = Duration.ofSeconds(2); + Future<Void> result = longRunning.asyncResult(executorPool, waitTime); + // it should finish in around 500 ms. + loopAssert(1, () -> assertThat(result.succeeded()).isTrue()); + } + + @Test + void testGetFailedAsyncResultInWaitTime() + { + OperationalJobException jobFailure = new OperationalJobException("Job fails"); + OperationalJob longButFailedJob = createOperationalJob(UUIDs.timeBased(), Duration.ofMillis(500L), jobFailure); + executorPool.executeBlocking(longButFailedJob::execute); + Duration waitTime = Duration.ofSeconds(2); + Future<Void> result = longButFailedJob.asyncResult(executorPool, waitTime); + // it should finish in around 500 ms. + loopAssert(1, () -> { + assertThat(result.failed()).isTrue(); + assertThat(result.cause()).isEqualTo(jobFailure); + }); + } + + @Test + void testGetAsyncResultExceedsWaitTime() + { + OperationalJob longRunning = createOperationalJob(UUIDs.timeBased(), Duration.ofMillis(5000L)); + executorPool.executeBlocking(longRunning::execute); + Duration waitTime = Duration.ofMillis(200L); + Future<Void> result = longRunning.asyncResult(executorPool, waitTime); + loopAssert(1, () -> { + // the composite future is completed in 200ms. The operational job is still running, so the isExecuting should return true too. + assertThat(result.succeeded()).isTrue(); + assertThat(longRunning.isExecuting()).isTrue(); + }); + } +} diff --git a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTrackerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTrackerTest.java new file mode 100644 index 00000000..f6a49d4c --- /dev/null +++ b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobTrackerTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.job; + +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.datastax.driver.core.utils.UUIDs; +import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; +import org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException; + +import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED; +import static org.apache.cassandra.sidecar.job.OperationalJobTest.createOperationalJob; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Tests to validate job tracking + */ +class OperationalJobTrackerTest +{ + private OperationalJobTracker jobTracker; + private static final int trackerSize = 3; + + OperationalJob job1 = createOperationalJob(SUCCEEDED); + OperationalJob job2 = createOperationalJob(SUCCEEDED); + OperationalJob job3 = createOperationalJob(SUCCEEDED); + OperationalJob job4 = createOperationalJob(SUCCEEDED); + + long twoDaysAgo = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2); + OperationalJob jobWithStaleCreationTime = new OperationalJob(UUIDs.startOf(twoDaysAgo)) + { + @Override + protected void executeInternal() throws OperationalJobException + { + } + + @Override + public OperationalJobStatus status() + { + return SUCCEEDED; + } + }; + + @BeforeEach + void setUp() + { + jobTracker = new OperationalJobTracker(trackerSize); + } + + @Test + void testPutAndGet() + { + jobTracker.put(job1); + jobTracker.put(job2); + assertThat(jobTracker.get(job1.jobId)).isSameAs(job1); + assertThat(jobTracker.get(job2.jobId)).isSameAs(job2); + } + + @Test + void testComputeIfAbsent() + { + jobTracker.put(job1); + OperationalJob job = jobTracker.computeIfAbsent(job1.jobId, v -> job3); + assertThat(job).isNotSameAs(job3); + assertThat(job).isSameAs(job1); + assertThat(jobTracker.get(job1.jobId)).isSameAs(job1); + } + + @Test + void testNoEviction() + { + jobTracker.put(job1); + jobTracker.put(job2); + jobTracker.put(job3); + jobTracker.put(job4); + + assertThat(jobTracker.size()) + .describedAs("Although the tracker initial size is 3, no job is evicted since all jobs are still running") + .isEqualTo(4); + assertThat(jobTracker.get(job1.jobId)).isNotNull(); + assertThat(jobTracker.get(job2.jobId)).isNotNull(); + assertThat(jobTracker.get(job3.jobId)).isNotNull(); + assertThat(jobTracker.get(job4.jobId)).isNotNull(); + } + + @Test + void testRemoveEldestEntryEvictionOnExpiry() + { + jobTracker.put(jobWithStaleCreationTime); + jobTracker.put(job1); + jobTracker.put(job2); + jobTracker.put(job3); + + assertThat(jobTracker.size()).isEqualTo(3); + assertThat(jobTracker.get(job1.jobId)).isNotNull(); + assertThat(jobTracker.get(job2.jobId)).isNotNull(); + assertThat(jobTracker.get(job3.jobId)).isNotNull(); + assertThat(jobTracker.get(jobWithStaleCreationTime.jobId)).isNull(); + } + + @Test + void testGetViewImmutable() + { + // Test the immutable view returned by getView + jobTracker.put(job1); + jobTracker.put(job2); + + Map<UUID, OperationalJob> view = jobTracker.getJobsView(); + assertThat(view.size()).isEqualTo(2); + assertThatThrownBy(() -> view.put(job3.jobId, job3)) + .isExactlyInstanceOf(UnsupportedOperationException.class); + } + + @Test + void testConcurrentAccess() throws Exception + { + int one = 1; + long pastTimestamp = System.currentTimeMillis() - OperationalJobTracker.ONE_DAY_TTL - 1000L; + OperationalJobTracker tracker = new OperationalJobTracker(one); + ExecutorService executorService = Executors.newFixedThreadPool(trackerSize); + List<OperationalJob> sortedJobs = IntStream.range(0, trackerSize + 10) + .boxed() + .map(i -> createOperationalJob(UUIDs.startOf(pastTimestamp + i), SUCCEEDED)) + .collect(Collectors.toList()); + sortedJobs.forEach(tracker::put); + executorService.shutdown(); + executorService.awaitTermination(5, TimeUnit.SECONDS); + assertThat(tracker.size()).isEqualTo(one); + assertThat(tracker.getJobsView().values().iterator().next()) + .describedAs("Only the last job is kept") + .isSameAs(sortedJobs.get(sortedJobs.size() - 1)); + } +} diff --git a/server/src/test/java/org/apache/cassandra/sidecar/routes/ListOperationalJobsHandlerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/routes/ListOperationalJobsHandlerTest.java new file mode 100644 index 00000000..57b9ff4b --- /dev/null +++ b/server/src/test/java/org/apache/cassandra/sidecar/routes/ListOperationalJobsHandlerTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.driver.core.utils.UUIDs; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import com.google.inject.util.Modules; +import io.vertx.core.Vertx; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.predicate.ResponsePredicate; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.apache.cassandra.sidecar.TestModule; +import org.apache.cassandra.sidecar.common.response.ListOperationalJobsResponse; +import org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException; +import org.apache.cassandra.sidecar.job.OperationalJob; +import org.apache.cassandra.sidecar.job.OperationalJobManager; +import org.apache.cassandra.sidecar.server.MainModule; +import org.apache.cassandra.sidecar.server.Server; + +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link ListOperationalJobsHandler} + */ +@ExtendWith(VertxExtension.class) +class ListOperationalJobsHandlerTest +{ + static final Logger LOGGER = LoggerFactory.getLogger(ListOperationalJobsHandlerTest.class); + Vertx vertx; + Server server; + + static UUID runningUuid = UUIDs.timeBased(); + static UUID runningUuid2 = UUIDs.timeBased(); + + static SampleOperationalJob running = new SampleOperationalJob(runningUuid); + static SampleOperationalJob running2 = new SampleOperationalJob(runningUuid2); + + @BeforeEach + void before() throws InterruptedException + { + Injector injector; + Module testOverride = Modules.override(new TestModule()) + .with(new ListOperationalJobsHandlerTest.ListJobsTestModule()); + injector = Guice.createInjector(Modules.override(new MainModule()) + .with(testOverride)); + vertx = injector.getInstance(Vertx.class); + server = injector.getInstance(Server.class); + VertxTestContext context = new VertxTestContext(); + server.start() + .onSuccess(s -> context.completeNow()) + .onFailure(context::failNow); + context.awaitCompletion(5, TimeUnit.SECONDS); + } + + @AfterEach + void after() throws InterruptedException + { + CountDownLatch closeLatch = new CountDownLatch(1); + server.close().onSuccess(res -> closeLatch.countDown()); + if (closeLatch.await(60, TimeUnit.SECONDS)) + LOGGER.info("Close event received before timeout."); + else + LOGGER.error("Close event timed out."); + } + + @Test + void testListJobs(VertxTestContext context) + { + WebClient client = WebClient.create(vertx); + String testRoute = "/api/v1/cassandra/operational-jobs"; + client.get(server.actualPort(), "127.0.0.1", testRoute) + .expect(ResponsePredicate.SC_OK) + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + ListOperationalJobsResponse listJobs = response.bodyAsJson(ListOperationalJobsResponse.class); + assertThat(listJobs).isNotNull(); + assertThat(listJobs.jobs()).isNotNull(); + assertThat(listJobs.jobs().size()).isEqualTo(2); + assertThat(listJobs.jobs().get(0).jobId()).isIn(runningUuid, runningUuid2); + assertThat(listJobs.jobs().get(1).jobId()).isIn(runningUuid, runningUuid2); + context.completeNow(); + })); + } + + static class ListJobsTestModule extends AbstractModule + { + @Provides + @Singleton + public OperationalJobManager jobManager() + { + List<OperationalJob> testJobs = Arrays.asList(running, running2); + OperationalJobManager mockManager = mock(OperationalJobManager.class); + when(mockManager.allInflightJobs()).thenReturn(testJobs); + return mockManager; + } + } + + /** + * Concrete test implementation of the OperationalJob to be used by handler tests + */ + public static class SampleOperationalJob extends OperationalJob + { + /** + * Constructs a job with a unique UUID, in Pending state + * + * @param jobId UUID representing the Job to be created + */ + protected SampleOperationalJob(UUID jobId) + { + super(jobId); + } + + @Override + protected void executeInternal() throws OperationalJobException + { + } + } +} diff --git a/server/src/test/java/org/apache/cassandra/sidecar/routes/OperationalJobHandlerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/routes/OperationalJobHandlerTest.java new file mode 100644 index 00000000..39becc6f --- /dev/null +++ b/server/src/test/java/org/apache/cassandra/sidecar/routes/OperationalJobHandlerTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.driver.core.utils.UUIDs; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import com.google.inject.util.Modules; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.predicate.ResponsePredicate; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.apache.cassandra.sidecar.TestModule; +import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; +import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; +import org.apache.cassandra.sidecar.job.OperationalJob; +import org.apache.cassandra.sidecar.job.OperationalJobManager; +import org.apache.cassandra.sidecar.server.MainModule; +import org.apache.cassandra.sidecar.server.Server; + +import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link OperationalJobHandler} + */ +@ExtendWith(VertxExtension.class) +class OperationalJobHandlerTest +{ + static final Logger LOGGER = LoggerFactory.getLogger(GossipInfoHandlerTest.class); + Vertx vertx; + Server server; + + static UUID runningUuid = UUIDs.timeBased(); + static UUID completedUuid = UUIDs.timeBased(); + static UUID failedUuid = UUIDs.timeBased(); + + @BeforeEach + void before() throws InterruptedException + { + Injector injector; + Module testOverride = Modules.override(new TestModule()) + .with(new OperationalJobsHandlerTestModule()); + injector = Guice.createInjector(Modules.override(new MainModule()) + .with(testOverride)); + vertx = injector.getInstance(Vertx.class); + server = injector.getInstance(Server.class); + VertxTestContext context = new VertxTestContext(); + server.start() + .onSuccess(s -> context.completeNow()) + .onFailure(context::failNow); + context.awaitCompletion(5, TimeUnit.SECONDS); + } + + @AfterEach + void after() throws InterruptedException + { + CountDownLatch closeLatch = new CountDownLatch(1); + server.close().onSuccess(res -> closeLatch.countDown()); + if (closeLatch.await(60, TimeUnit.SECONDS)) + LOGGER.info("Close event received before timeout."); + else + LOGGER.error("Close event timed out."); + } + + @Test + void testGetJobStatusNonExistentJob(VertxTestContext context) + { + WebClient client = WebClient.create(vertx); + String uuid = UUIDs.timeBased().toString(); + String testRoute = "/api/v1/cassandra/operational-jobs/" + uuid; + client.get(server.actualPort(), "127.0.0.1", testRoute) + .expect(ResponsePredicate.SC_NOT_FOUND) + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(NOT_FOUND.code()); + context.completeNow(); + })); + } + + @Test + void testGetJobStatusRunningJob(VertxTestContext context) + { + WebClient client = WebClient.create(vertx); + String testRoute = "/api/v1/cassandra/operational-jobs/" + runningUuid; + client.get(server.actualPort(), "127.0.0.1", testRoute) + .expect(ResponsePredicate.SC_ACCEPTED) + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(ACCEPTED.code()); + context.completeNow(); + })); + } + + @Test + void testGetJobStatusCompletedJob(VertxTestContext context) + { + WebClient client = WebClient.create(vertx); + String testRoute = "/api/v1/cassandra/operational-jobs/" + completedUuid; + client.get(server.actualPort(), "127.0.0.1", testRoute) + .expect(ResponsePredicate.SC_OK) + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + OperationalJobResponse jobStatus = response.bodyAsJson(OperationalJobResponse.class); + assertThat(jobStatus.jobId()).isEqualTo(completedUuid); + assertThat(jobStatus.status()).isEqualTo(OperationalJobStatus.SUCCEEDED); + assertThat(jobStatus.operation()).isEqualTo("testCompleted"); + context.completeNow(); + })); + } + + @Test + void testGetJobStatusFailedJob(VertxTestContext context) + { + WebClient client = WebClient.create(vertx); + String testRoute = "/api/v1/cassandra/operational-jobs/" + failedUuid; + client.get(server.actualPort(), "127.0.0.1", testRoute) + .expect(ResponsePredicate.SC_OK) + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + OperationalJobResponse jobStatus = response.bodyAsJson(OperationalJobResponse.class); + assertThat(jobStatus.jobId()).isEqualTo(failedUuid); + assertThat(jobStatus.status()).isEqualTo(OperationalJobStatus.FAILED); + assertThat(jobStatus.operation()).isEqualTo("testFailed"); + assertThat(jobStatus.reason()).isEqualTo("Test failed"); + + context.completeNow(); + })); + } + + static class OperationalJobsHandlerTestModule extends AbstractModule + { + @Provides + @Singleton + public OperationalJobManager jobManager() + { + OperationalJobManager mockManager = mock(OperationalJobManager.class); + OperationalJob runningMock = mock(OperationalJob.class); + Promise<Void> p = Promise.promise(); + when(runningMock.status()).thenReturn(OperationalJobStatus.RUNNING); + when(runningMock.asyncResult()).thenReturn(p.future()); + OperationalJob completedMock = mock(OperationalJob.class); + when(completedMock.status()).thenReturn(OperationalJobStatus.SUCCEEDED); + when(completedMock.name()).thenReturn("testCompleted"); + OperationalJob failedMock = mock(OperationalJob.class); + when(failedMock.status()).thenReturn(OperationalJobStatus.FAILED); + when(failedMock.asyncResult()).thenReturn(Future.failedFuture("Test failed")); + when(failedMock.name()).thenReturn("testFailed"); + + when(mockManager.getJobIfExists(runningUuid)).thenReturn(runningMock); + when(mockManager.getJobIfExists(completedUuid)).thenReturn(completedMock); + when(mockManager.getJobIfExists(failedUuid)).thenReturn(failedMock); + return mockManager; + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org