nvharikrishna commented on code in PR #309: URL: https://github.com/apache/cassandra-sidecar/pull/309#discussion_r3180505098
########## client-common/src/test/java/org/apache/cassandra/sidecar/common/response/LiveMigrationFilesVerificationResponseTest.java: ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.response; + +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class LiveMigrationFilesVerificationResponseTest +{ + private final ObjectMapper mapper = new ObjectMapper(); + + @Test + void testSerializationRoundTrip() throws Exception + { + LiveMigrationFilesVerificationResponse original = new LiveMigrationFilesVerificationResponse( + "test-id-123", + "MD5", + "COMPLETED", + "192.168.1.100", + 9042, + 5, + 10, + 100, + 15, + 20, + 3, + 85 + ); + + // Serialize to JSON + String json = mapper.writeValueAsString(original); + + // Deserialize back to object + LiveMigrationFilesVerificationResponse deserialized = + mapper.readValue(json, LiveMigrationFilesVerificationResponse.class); + + // Verify all fields match + assertThat(deserialized.id()).isEqualTo(original.id()); + assertThat(deserialized.digestAlgorithm()).isEqualTo(original.digestAlgorithm()); + assertThat(deserialized.state()).isEqualTo(original.state()); + assertThat(deserialized.source()).isEqualTo(original.source()); + assertThat(deserialized.port()).isEqualTo(original.port()); + assertThat(deserialized.filesNotFoundAtSource()).isEqualTo(original.filesNotFoundAtSource()); + assertThat(deserialized.filesNotFoundAtDestination()).isEqualTo(original.filesNotFoundAtDestination()); + assertThat(deserialized.metadataMatched()).isEqualTo(original.metadataMatched()); + assertThat(deserialized.metadataMismatches()).isEqualTo(original.metadataMismatches()); + assertThat(deserialized.digestMismatches()).isEqualTo(original.digestMismatches()); + assertThat(deserialized.digestVerificationFailures()).isEqualTo(original.digestVerificationFailures()); + assertThat(deserialized.filesMatched()).isEqualTo(original.filesMatched()); + assertThat(deserialized.isVerificationSuccessful()).isEqualTo(original.isVerificationSuccessful()); Review Comment: Now it appears only once after using parameterised tests. ########## client-common/src/main/java/org/apache/cassandra/sidecar/common/response/DigestResponse.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.response; + +import java.util.Objects; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.cassandra.sidecar.common.request.data.Digest; +import org.apache.cassandra.sidecar.common.request.data.MD5Digest; +import org.apache.cassandra.sidecar.common.request.data.XXHash32Digest; + +/** + * Response object containing a cryptographic digest value for file verification purposes. + */ +public class DigestResponse +{ + @JsonProperty("digest") + public final String digest; + @JsonProperty("digestAlgorithm") + public final String digestAlgorithm; + + @JsonCreator + public DigestResponse(@JsonProperty("digest") String digest, + @JsonProperty("digestAlgorithm") String digestAlgorithm) + { + Objects.requireNonNull(digest, "digest is required"); + Objects.requireNonNull(digestAlgorithm, "digestAlgorithm is required"); + this.digest = digest; + this.digestAlgorithm = digestAlgorithm; + } Review Comment: Done ########## client-common/src/main/java/org/apache/cassandra/sidecar/common/response/LiveMigrationFilesVerificationResponse.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.response; + +import java.util.Objects; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Response object for live migration file verification operations, containing statistics about + * files not found at source/target and digest mismatches during verification. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class LiveMigrationFilesVerificationResponse +{ + private final String id; + private final String state; + private final String source; + private final int port; + private final int filesNotFoundAtSource; + private final int filesNotFoundAtDestination; + private final int metadataMatched; + private final int metadataMismatches; + private final int digestMismatches; + private final int digestVerificationFailures; + private final int filesMatched; + private final String digestAlgorithm; + + @JsonCreator + public LiveMigrationFilesVerificationResponse(@JsonProperty("id") String id, + @JsonProperty("digestAlgorithm") String digestAlgorithm, + @JsonProperty("state") String state, + @JsonProperty("source") String source, + @JsonProperty("port") int port, + @JsonProperty("filesNotFoundAtSource") int filesNotFoundAtSource, + @JsonProperty("filesNotFoundAtDestination") int filesNotFoundAtDestination, + @JsonProperty("metadataMatched") int metadataMatched, + @JsonProperty("metadataMismatches") int metadataMismatches, + @JsonProperty("digestMismatches") int digestMismatches, + @JsonProperty("digestVerificationFailures") int digestVerificationFailures, + @JsonProperty("filesMatched") int filesMatched) + { + Objects.requireNonNull(id, "id of files verification task must be specified"); + Objects.requireNonNull(state, "state of files verification task must be specified"); + this.id = id; + this.digestAlgorithm = digestAlgorithm; + this.state = state; Review Comment: done ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCancelFilesVerificationTaskHandler.java: ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.livemigration; + +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskNotFoundException; +import org.apache.cassandra.sidecar.handlers.AbstractHandler; +import org.apache.cassandra.sidecar.handlers.AccessProtected; +import org.apache.cassandra.sidecar.livemigration.FilesVerificationTaskManager; +import org.apache.cassandra.sidecar.livemigration.LiveMigrationTask; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.acl.authorization.BasicPermissions.DATA_COPY; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Handler for canceling an active live migration files verification task. + * Accepts a task ID and cancels the corresponding verification task on the specified host. + */ +public class LiveMigrationCancelFilesVerificationTaskHandler extends AbstractHandler<String> implements AccessProtected Review Comment: done ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationConcurrencyLimitHandler.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.livemigration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Handler; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.concurrent.ConcurrencyLimiter; +import org.apache.cassandra.sidecar.config.SidecarConfiguration; + +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Handler that enforces concurrency limits for live migration file operations. + * Returns HTTP 503 (SERVICE_UNAVAILABLE) when the maximum concurrent file requests limit is exceeded. + */ +@Singleton +public class LiveMigrationConcurrencyLimitHandler implements Handler<RoutingContext> +{ + private static final Logger LOGGER = LoggerFactory.getLogger(LiveMigrationConcurrencyLimitHandler.class); + + private final ConcurrencyLimiter concurrencyLimiter; + + @Inject + public LiveMigrationConcurrencyLimitHandler(SidecarConfiguration sidecarConfiguration) + { + this.concurrencyLimiter = + new ConcurrencyLimiter(() -> sidecarConfiguration.liveMigrationConfiguration().maxConcurrentFileRequests()); + } + + @Override + public void handle(RoutingContext rc) + { + if (!concurrencyLimiter.tryAcquire()) + { + LOGGER.warn("Too many concurrent live migration file requests. Path={}", rc.request().path()); + rc.fail(wrapHttpException(HttpResponseStatus.SERVICE_UNAVAILABLE, Review Comment: It is discussed here: https://github.com/apache/cassandra-sidecar/pull/309#discussion_r2868621502 ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationFileDigestHandler.java: ########## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.livemigration; + +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.common.request.LiveMigrationFileDigestRequest; +import org.apache.cassandra.sidecar.common.response.DigestResponse; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.handlers.AbstractHandler; +import org.apache.cassandra.sidecar.handlers.AccessProtected; +import org.apache.cassandra.sidecar.handlers.FileStreamHandler; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.DigestAlgorithm; +import org.apache.cassandra.sidecar.utils.DigestAlgorithmFactory; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; + +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static org.apache.cassandra.sidecar.common.request.LiveMigrationFileDigestRequest.DIGEST_ALGORITHM_PARAM; +import static org.apache.cassandra.sidecar.utils.AsyncFileDigestCalculator.calculateDigest; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Handler for calculating and returning file digests during live migration. + * Supported digest algorithms are determined by {@link DigestAlgorithmFactory} and specified + * via the {@link LiveMigrationFileDigestRequest#DIGEST_ALGORITHM_PARAM} query parameter. + */ +public class LiveMigrationFileDigestHandler extends AbstractHandler<DigestAlgorithm> implements AccessProtected +{ + private static final Logger LOGGER = LoggerFactory.getLogger(LiveMigrationFileDigestHandler.class); + + private final Vertx vertx; + private final DigestAlgorithmFactory digestAlgorithmFactory; + + @Inject + public LiveMigrationFileDigestHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + CassandraInputValidator validator, + Vertx vertx, + DigestAlgorithmFactory digestAlgorithmFactory) + { + super(metadataFetcher, executorPools, validator); + this.vertx = vertx; + this.digestAlgorithmFactory = digestAlgorithmFactory; + } + + @Override + protected DigestAlgorithm extractParamsOrThrow(RoutingContext context) + { + String digestAlgorithmParam = getDigestAlgorithmParam(context); + try + { + return digestAlgorithmFactory.getDigestAlgorithm(digestAlgorithmParam, 0); + } + catch (IllegalArgumentException e) + { + LOGGER.error("Unexpected error while getting digest algorithm for {}", digestAlgorithmParam, e); + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, e.getMessage()); + } Review Comment: Done ########## server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskManager.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.livemigration; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.annotations.VisibleForTesting; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions; +import org.jetbrains.annotations.NotNull; + +/** + * Centralized singleton manager for all live migration tasks across Cassandra instances. + * Enforces the constraint that only one {@link LiveMigrationTask} (of any type) can be active per instance at a time, + * preventing concurrent data copy and files verification operations that could lead to resource conflicts or data integrity issues. + * This singleton is shared across {@link DataCopyTaskManager} and {@link FilesVerificationTaskManager} to ensure + * proper coordination and mutual exclusion of tasks per instance. + */ +@Singleton +public class LiveMigrationTaskManager +{ + @VisibleForTesting + final ConcurrentHashMap<Integer, LiveMigrationTask<?>> currentTasks = new ConcurrentHashMap<>(); + + private final InstancesMetadata instancesMetadata; + + @Inject + public LiveMigrationTaskManager(InstancesMetadata instancesMetadata) + { + this.instancesMetadata = instancesMetadata; + } + + /** + * Attempts to submit a new task for the specified instance. + * Only one task (of any type) can be active per instance at a time. + * + * @param instanceId the instance ID + * @param newTask the task to submit + * @return true if the task was accepted, false if another task is already in progress + */ + public boolean submitTask(int instanceId, LiveMigrationTask<?> newTask) + { + return currentTasks.compute(instanceId, (integer, taskInMap) -> { + if (taskInMap == null) + { + return newTask; + } + + if (!taskInMap.isCompleted()) + { + // Reject new task if existing task is still in progress + return taskInMap; + } + else + { + // Accept new task if existing task has completed + return newTask; + } + }) == newTask; + } + + /** + * Returns all live migration tasks for given currentHost. + * This includes both active and completed tasks that haven't been replaced. + * + * @param currentHost the host where sidecar is running + * @return list containing at most one task (empty if no task has ever been submitted for this host) + */ + @SuppressWarnings("ConstantValue") + public List<LiveMigrationTask<?>> getAllTasks(@NotNull String currentHost) + { + InstanceMetadata localInstance = instancesMetadata.instanceFromHost(currentHost); + if (localInstance == null) + { + throw new IllegalStateException("No instance found for host: " + currentHost); + } + Review Comment: removed ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationGetFilesVerificationTaskHandler.java: ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.livemigration; + +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskNotFoundException; +import org.apache.cassandra.sidecar.handlers.AbstractHandler; +import org.apache.cassandra.sidecar.handlers.AccessProtected; +import org.apache.cassandra.sidecar.livemigration.FilesVerificationTaskManager; +import org.apache.cassandra.sidecar.livemigration.LiveMigrationTask; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.acl.authorization.BasicPermissions.DATA_COPY; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Handler for retrieving a specific live migration files verification task by task ID. + * Returns the task details if found, or a 404 error if the task does not exist on the specified host. + */ +public class LiveMigrationGetFilesVerificationTaskHandler extends AbstractHandler<String> implements AccessProtected +{ + private static final Logger LOGGER = LoggerFactory.getLogger(LiveMigrationGetFilesVerificationTaskHandler.class); + + private final FilesVerificationTaskManager taskManager; + + /** + * Constructs a handler with the provided {@code metadataFetcher} + * + * @param metadataFetcher the interface to retrieve instance metadata + * @param executorPools the executor pools for blocking executions + * @param validator a validator instance to validate Cassandra-specific input + */ + @Inject + public LiveMigrationGetFilesVerificationTaskHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + CassandraInputValidator validator, + FilesVerificationTaskManager taskManager) + { + super(metadataFetcher, executorPools, validator); + this.taskManager = taskManager; + } + + @Override + protected String extractParamsOrThrow(RoutingContext context) + { + String taskId = context.pathParam("taskId"); + if (taskId == null || taskId.isEmpty()) + { + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "taskId is required"); + } + + return taskId; + } + + @Override + protected void handleInternal(RoutingContext context, + HttpServerRequest httpRequest, + @NotNull String host, + SocketAddress remoteAddress, + String taskId) + { + try + { + LiveMigrationTask<LiveMigrationFilesVerificationResponse> task = taskManager.getTask(taskId, host); + LOGGER.info("Found live migration task with taskId={} on host={}", taskId, host); Review Comment: done ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateFilesVerificationTaskHandler.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.livemigration; + +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.json.DecodeException; +import io.vertx.core.json.Json; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest; +import org.apache.cassandra.sidecar.common.response.LiveMigrationTaskCreationResponse; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationInvalidRequestException; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskInProgressException; +import org.apache.cassandra.sidecar.exceptions.NoSuchCassandraInstanceException; +import org.apache.cassandra.sidecar.handlers.AbstractHandler; +import org.apache.cassandra.sidecar.handlers.AccessProtected; +import org.apache.cassandra.sidecar.livemigration.FilesVerificationTaskManager; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; + +import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED; +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.CONFLICT; +import static io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE; +import static org.apache.cassandra.sidecar.acl.authorization.BasicPermissions.DATA_COPY; +import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.LIVE_MIGRATION_FILES_VERIFICATION_TASKS_ROUTE; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * HTTP handler for creating file digest verification tasks during live migration. + * Manages concurrent verification tasks per instance and orchestrates the verification process. + */ +public class LiveMigrationCreateFilesVerificationTaskHandler extends AbstractHandler<LiveMigrationFilesVerificationRequest> implements AccessProtected Review Comment: done ########## client-common/src/main/java/org/apache/cassandra/sidecar/common/response/LiveMigrationTaskCreationResponse.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.response; + +import java.util.Objects; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Response object returned when a live migration task is created. + * Contains the task identifier and a URL to query the task status. + */ +public class LiveMigrationTaskCreationResponse +{ + private final String taskId; + private final String statusUrl; + + @JsonCreator + public LiveMigrationTaskCreationResponse(@JsonProperty("taskId") String taskId, + @JsonProperty("statusUrl") String statusUrl) + { + Objects.requireNonNull(taskId, "taskId cannot be null"); + Objects.requireNonNull(statusUrl, "statusUrl cannot be null"); + + this.taskId = taskId; + this.statusUrl = statusUrl; Review Comment: done ########## client-common/src/test/java/org/apache/cassandra/sidecar/common/response/LiveMigrationFilesVerificationResponseTest.java: ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.response; + +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class LiveMigrationFilesVerificationResponseTest +{ + private final ObjectMapper mapper = new ObjectMapper(); + + @Test + void testSerializationRoundTrip() throws Exception Review Comment: done ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCancelFilesVerificationTaskHandler.java: ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.livemigration; + +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskNotFoundException; +import org.apache.cassandra.sidecar.handlers.AbstractHandler; +import org.apache.cassandra.sidecar.handlers.AccessProtected; +import org.apache.cassandra.sidecar.livemigration.FilesVerificationTaskManager; +import org.apache.cassandra.sidecar.livemigration.LiveMigrationTask; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.acl.authorization.BasicPermissions.DATA_COPY; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Handler for canceling an active live migration files verification task. + * Accepts a task ID and cancels the corresponding verification task on the specified host. + */ +public class LiveMigrationCancelFilesVerificationTaskHandler extends AbstractHandler<String> implements AccessProtected +{ + private static final Logger LOGGER = LoggerFactory.getLogger(LiveMigrationCancelFilesVerificationTaskHandler.class); + + private final FilesVerificationTaskManager taskManager; + + /** + * Constructs a handler with the provided {@code metadataFetcher} + * + * @param metadataFetcher the interface to retrieve instance metadata + * @param executorPools the executor pools for blocking executions + * @param validator a validator instance to validate Cassandra-specific input + */ + @Inject + public LiveMigrationCancelFilesVerificationTaskHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + CassandraInputValidator validator, + FilesVerificationTaskManager taskManager) + { + super(metadataFetcher, executorPools, validator); + this.taskManager = taskManager; + } + + @Override + protected String extractParamsOrThrow(RoutingContext context) + { + return context.pathParam("taskId"); Review Comment: done ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationDigestHandlerWrapper.java: ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.livemigration; + +import com.google.inject.Inject; +import io.vertx.core.Handler; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.common.request.LiveMigrationFileDigestRequest; + +import static org.apache.cassandra.sidecar.common.request.LiveMigrationFileDigestRequest.DIGEST_ALGORITHM_PARAM; + +/** + * Wrapper handler that conditionally delegates to {@link LiveMigrationFileDigestHandler} based on the presence + * of the {@link LiveMigrationFileDigestRequest#DIGEST_ALGORITHM_PARAM} query parameter, otherwise + * passes control to the next handler in the chain. + */ +public class LiveMigrationDigestHandlerWrapper implements Handler<RoutingContext> Review Comment: Realised that I missed to mark multiple other handlers as Singleton. Marked other handlers too as Singleton. ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateFilesVerificationTaskHandler.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.livemigration; + +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.json.DecodeException; +import io.vertx.core.json.Json; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest; +import org.apache.cassandra.sidecar.common.response.LiveMigrationTaskCreationResponse; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationInvalidRequestException; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskInProgressException; +import org.apache.cassandra.sidecar.exceptions.NoSuchCassandraInstanceException; +import org.apache.cassandra.sidecar.handlers.AbstractHandler; +import org.apache.cassandra.sidecar.handlers.AccessProtected; +import org.apache.cassandra.sidecar.livemigration.FilesVerificationTaskManager; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; + +import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED; +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.CONFLICT; +import static io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE; +import static org.apache.cassandra.sidecar.acl.authorization.BasicPermissions.DATA_COPY; +import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.LIVE_MIGRATION_FILES_VERIFICATION_TASKS_ROUTE; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * HTTP handler for creating file digest verification tasks during live migration. + * Manages concurrent verification tasks per instance and orchestrates the verification process. + */ +public class LiveMigrationCreateFilesVerificationTaskHandler extends AbstractHandler<LiveMigrationFilesVerificationRequest> implements AccessProtected +{ + private static final Logger LOGGER = LoggerFactory.getLogger(LiveMigrationCreateFilesVerificationTaskHandler.class); + + private final FilesVerificationTaskManager filesVerificationTaskManager; + private final LiveMigrationMap liveMigrationMap; + + /** + * Constructs a handler with the provided {@code metadataFetcher} + * + * @param metadataFetcher the interface to retrieve instance metadata + * @param executorPools the executor pools for blocking executions + * @param validator a validator instance to validate Cassandra-specific input + */ + @Inject + protected LiveMigrationCreateFilesVerificationTaskHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + CassandraInputValidator validator, + LiveMigrationMap liveMigrationMap, + FilesVerificationTaskManager filesVerificationTaskManager) + { + super(metadataFetcher, executorPools, validator); + this.liveMigrationMap = liveMigrationMap; + this.filesVerificationTaskManager = filesVerificationTaskManager; + } + + @Override + protected LiveMigrationFilesVerificationRequest extractParamsOrThrow(RoutingContext context) + { + try + { + return Json.decodeValue(context.body().buffer(), LiveMigrationFilesVerificationRequest.class); + } + catch (DecodeException decodeException) + { + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, + "Failed to parse request body, please ensure that the request is valid.", + decodeException); + } + catch (IllegalArgumentException e) + { + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, e.getMessage(), e); + } + } + + @Override + protected void handleInternal(RoutingContext context, + HttpServerRequest httpRequest, + @NotNull String host, + SocketAddress remoteAddress, + LiveMigrationFilesVerificationRequest request) + { + LOGGER.debug("Received files verification request for host {} with maxConcurrency {}", Review Comment: removed it ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationGetAllFilesVerificationTasksHandler.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.livemigration; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import com.google.inject.Inject; +import io.vertx.core.Handler; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse; +import org.apache.cassandra.sidecar.handlers.AccessProtected; +import org.apache.cassandra.sidecar.livemigration.FilesVerificationTaskManager; +import org.apache.cassandra.sidecar.livemigration.LiveMigrationTask; + +import static org.apache.cassandra.sidecar.acl.authorization.BasicPermissions.DATA_COPY; +import static org.apache.cassandra.sidecar.handlers.AbstractHandler.extractHostAddressWithoutPort; + +/** + * Handler for retrieving all active live migration files verification tasks. + * Returns a list of all verification tasks running on the local host. + */ +public class LiveMigrationGetAllFilesVerificationTasksHandler implements Handler<RoutingContext>, AccessProtected Review Comment: Done ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationFileDigestHandler.java: ########## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.livemigration; + +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.common.request.LiveMigrationFileDigestRequest; +import org.apache.cassandra.sidecar.common.response.DigestResponse; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.handlers.AbstractHandler; +import org.apache.cassandra.sidecar.handlers.AccessProtected; +import org.apache.cassandra.sidecar.handlers.FileStreamHandler; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.DigestAlgorithm; +import org.apache.cassandra.sidecar.utils.DigestAlgorithmFactory; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; + +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static org.apache.cassandra.sidecar.common.request.LiveMigrationFileDigestRequest.DIGEST_ALGORITHM_PARAM; +import static org.apache.cassandra.sidecar.utils.AsyncFileDigestCalculator.calculateDigest; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Handler for calculating and returning file digests during live migration. + * Supported digest algorithms are determined by {@link DigestAlgorithmFactory} and specified + * via the {@link LiveMigrationFileDigestRequest#DIGEST_ALGORITHM_PARAM} query parameter. + */ +public class LiveMigrationFileDigestHandler extends AbstractHandler<DigestAlgorithm> implements AccessProtected +{ + private static final Logger LOGGER = LoggerFactory.getLogger(LiveMigrationFileDigestHandler.class); + + private final Vertx vertx; + private final DigestAlgorithmFactory digestAlgorithmFactory; + + @Inject + public LiveMigrationFileDigestHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + CassandraInputValidator validator, + Vertx vertx, + DigestAlgorithmFactory digestAlgorithmFactory) + { + super(metadataFetcher, executorPools, validator); + this.vertx = vertx; + this.digestAlgorithmFactory = digestAlgorithmFactory; + } + + @Override + protected DigestAlgorithm extractParamsOrThrow(RoutingContext context) + { + String digestAlgorithmParam = getDigestAlgorithmParam(context); + try + { + return digestAlgorithmFactory.getDigestAlgorithm(digestAlgorithmParam, 0); + } + catch (IllegalArgumentException e) + { + LOGGER.error("Unexpected error while getting digest algorithm for {}", digestAlgorithmParam, e); + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, e.getMessage()); + } + } + + @Override + protected void handleInternal(RoutingContext context, + HttpServerRequest httpRequest, + @NotNull String host, + SocketAddress remoteAddress, + DigestAlgorithm digestAlgorithm) + { + String file = context.get(FileStreamHandler.FILE_PATH_CONTEXT_KEY); + + if (file == null) + { + LOGGER.error("File path not found in context"); + context.fail(wrapHttpException(INTERNAL_SERVER_ERROR, "File path not available")); + return; + } + calculateDigest(vertx, file, digestAlgorithm) + .onComplete(ar -> { Review Comment: > digestVerifier.verify(file) Is returning path and 'DigestVerifier' is validating path against known digest. So re-using `org.apache.cassandra.sidecar.utils.AsyncFileDigestCalculator#calculateDigest(io.vertx.core.Vertx, java.lang.String, org.apache.cassandra.sidecar.utils.DigestAlgorithm)` which is used by DigestVerifier too. ########## server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskManager.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.livemigration; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.annotations.VisibleForTesting; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions; +import org.jetbrains.annotations.NotNull; + +/** + * Centralized singleton manager for all live migration tasks across Cassandra instances. + * Enforces the constraint that only one {@link LiveMigrationTask} (of any type) can be active per instance at a time, + * preventing concurrent data copy and files verification operations that could lead to resource conflicts or data integrity issues. + * This singleton is shared across {@link DataCopyTaskManager} and {@link FilesVerificationTaskManager} to ensure + * proper coordination and mutual exclusion of tasks per instance. + */ +@Singleton +public class LiveMigrationTaskManager +{ + @VisibleForTesting + final ConcurrentHashMap<Integer, LiveMigrationTask<?>> currentTasks = new ConcurrentHashMap<>(); + + private final InstancesMetadata instancesMetadata; + + @Inject + public LiveMigrationTaskManager(InstancesMetadata instancesMetadata) + { + this.instancesMetadata = instancesMetadata; + } + + /** + * Attempts to submit a new task for the specified instance. + * Only one task (of any type) can be active per instance at a time. + * + * @param instanceId the instance ID + * @param newTask the task to submit + * @return true if the task was accepted, false if another task is already in progress + */ + public boolean submitTask(int instanceId, LiveMigrationTask<?> newTask) + { + return currentTasks.compute(instanceId, (integer, taskInMap) -> { Review Comment: done ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateFilesVerificationTaskHandler.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.livemigration; + +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.json.DecodeException; +import io.vertx.core.json.Json; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest; +import org.apache.cassandra.sidecar.common.response.LiveMigrationTaskCreationResponse; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationInvalidRequestException; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskInProgressException; +import org.apache.cassandra.sidecar.exceptions.NoSuchCassandraInstanceException; +import org.apache.cassandra.sidecar.handlers.AbstractHandler; +import org.apache.cassandra.sidecar.handlers.AccessProtected; +import org.apache.cassandra.sidecar.livemigration.FilesVerificationTaskManager; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; + +import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED; +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.CONFLICT; +import static io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE; +import static org.apache.cassandra.sidecar.acl.authorization.BasicPermissions.DATA_COPY; +import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.LIVE_MIGRATION_FILES_VERIFICATION_TASKS_ROUTE; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * HTTP handler for creating file digest verification tasks during live migration. + * Manages concurrent verification tasks per instance and orchestrates the verification process. + */ +public class LiveMigrationCreateFilesVerificationTaskHandler extends AbstractHandler<LiveMigrationFilesVerificationRequest> implements AccessProtected +{ + private static final Logger LOGGER = LoggerFactory.getLogger(LiveMigrationCreateFilesVerificationTaskHandler.class); + + private final FilesVerificationTaskManager filesVerificationTaskManager; + private final LiveMigrationMap liveMigrationMap; + + /** + * Constructs a handler with the provided {@code metadataFetcher} + * + * @param metadataFetcher the interface to retrieve instance metadata + * @param executorPools the executor pools for blocking executions + * @param validator a validator instance to validate Cassandra-specific input + */ + @Inject + protected LiveMigrationCreateFilesVerificationTaskHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + CassandraInputValidator validator, + LiveMigrationMap liveMigrationMap, + FilesVerificationTaskManager filesVerificationTaskManager) + { + super(metadataFetcher, executorPools, validator); + this.liveMigrationMap = liveMigrationMap; + this.filesVerificationTaskManager = filesVerificationTaskManager; + } + + @Override + protected LiveMigrationFilesVerificationRequest extractParamsOrThrow(RoutingContext context) + { + try + { + return Json.decodeValue(context.body().buffer(), LiveMigrationFilesVerificationRequest.class); + } + catch (DecodeException decodeException) + { + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, + "Failed to parse request body, please ensure that the request is valid.", + decodeException); + } + catch (IllegalArgumentException e) + { + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, e.getMessage(), e); + } + } + + @Override + protected void handleInternal(RoutingContext context, + HttpServerRequest httpRequest, + @NotNull String host, + SocketAddress remoteAddress, + LiveMigrationFilesVerificationRequest request) + { + LOGGER.debug("Received files verification request for host {} with maxConcurrency {}", + host, request.maxConcurrency()); + InstanceMetadata localInstanceMetadata; + try + { + localInstanceMetadata = metadataFetcher.instance(host); + } + catch (NoSuchCassandraInstanceException e) + { + LOGGER.error("Failed to fetch instance metadata for host={}", host); + context.fail(wrapHttpException(SERVICE_UNAVAILABLE, e)); + return; + } Review Comment: Wanted to throw 503 instead of 500. So catching it and wrapping it into HttpException. ########## server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskManager.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.livemigration; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.annotations.VisibleForTesting; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions; +import org.jetbrains.annotations.NotNull; + +/** + * Centralized singleton manager for all live migration tasks across Cassandra instances. + * Enforces the constraint that only one {@link LiveMigrationTask} (of any type) can be active per instance at a time, + * preventing concurrent data copy and files verification operations that could lead to resource conflicts or data integrity issues. + * This singleton is shared across {@link DataCopyTaskManager} and {@link FilesVerificationTaskManager} to ensure + * proper coordination and mutual exclusion of tasks per instance. + */ +@Singleton +public class LiveMigrationTaskManager +{ + @VisibleForTesting + final ConcurrentHashMap<Integer, LiveMigrationTask<?>> currentTasks = new ConcurrentHashMap<>(); + + private final InstancesMetadata instancesMetadata; + + @Inject + public LiveMigrationTaskManager(InstancesMetadata instancesMetadata) + { + this.instancesMetadata = instancesMetadata; + } + + /** + * Attempts to submit a new task for the specified instance. + * Only one task (of any type) can be active per instance at a time. + * + * @param instanceId the instance ID + * @param newTask the task to submit + * @return true if the task was accepted, false if another task is already in progress + */ + public boolean submitTask(int instanceId, LiveMigrationTask<?> newTask) + { + return currentTasks.compute(instanceId, (integer, taskInMap) -> { + if (taskInMap == null) + { + return newTask; + } + + if (!taskInMap.isCompleted()) + { + // Reject new task if existing task is still in progress + return taskInMap; + } + else + { + // Accept new task if existing task has completed + return newTask; + } + }) == newTask; + } + + /** + * Returns all live migration tasks for given currentHost. + * This includes both active and completed tasks that haven't been replaced. + * + * @param currentHost the host where sidecar is running + * @return list containing at most one task (empty if no task has ever been submitted for this host) + */ + @SuppressWarnings("ConstantValue") + public List<LiveMigrationTask<?>> getAllTasks(@NotNull String currentHost) + { + InstanceMetadata localInstance = instancesMetadata.instanceFromHost(currentHost); + if (localInstance == null) + { + throw new IllegalStateException("No instance found for host: " + currentHost); + } + + LiveMigrationTask<?> task = currentTasks.get(localInstance.id()); + return task == null ? Collections.emptyList() : Collections.singletonList(task); + } + + /** + * Returns the live migration task with the specified task ID. + * + * @param taskId ID of the task to retrieve + * @param currentHost the host where sidecar is running + * @return the LiveMigrationTask matching the given taskId + * @throws LiveMigrationExceptions.LiveMigrationTaskNotFoundException if no task found with the given ID + */ + public LiveMigrationTask<?> getTask(@NotNull String taskId, + @NotNull String currentHost) throws LiveMigrationExceptions.LiveMigrationTaskNotFoundException + { + return getLiveMigrationTask(taskId, currentHost); + } + + /** + * Cancels the live migration task with the specified task ID. + * + * @param taskId ID of the task to cancel + * @param currentHost the host where sidecar is running + * @return the cancelled LiveMigrationTask + * @throws LiveMigrationExceptions.LiveMigrationTaskNotFoundException if no task found with the given ID + */ + public LiveMigrationTask<?> cancelTask(@NotNull String taskId, + @NotNull String currentHost) throws LiveMigrationExceptions.LiveMigrationTaskNotFoundException + { + LiveMigrationTask<?> taskInProgress = getLiveMigrationTask(taskId, currentHost); + + // Cancelling the task + taskInProgress.cancel(); + + return taskInProgress; + } + + @SuppressWarnings("ConstantValue") + private LiveMigrationTask<?> getLiveMigrationTask(@NotNull String taskId, @NotNull String currentHost) + { + InstanceMetadata localInstance = instancesMetadata.instanceFromHost(currentHost); + if (localInstance == null) + { + throw new IllegalStateException("No instance found for host: " + currentHost); + } Review Comment: removed ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationGetFilesVerificationTaskHandler.java: ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.livemigration; + +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskNotFoundException; +import org.apache.cassandra.sidecar.handlers.AbstractHandler; +import org.apache.cassandra.sidecar.handlers.AccessProtected; +import org.apache.cassandra.sidecar.livemigration.FilesVerificationTaskManager; +import org.apache.cassandra.sidecar.livemigration.LiveMigrationTask; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.acl.authorization.BasicPermissions.DATA_COPY; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Handler for retrieving a specific live migration files verification task by task ID. + * Returns the task details if found, or a 404 error if the task does not exist on the specified host. + */ +public class LiveMigrationGetFilesVerificationTaskHandler extends AbstractHandler<String> implements AccessProtected Review Comment: done ########## server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFilesVerificationTaskFactory.java: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.livemigration; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.Vertx; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest; +import org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.utils.DigestVerifierFactory; +import org.apache.cassandra.sidecar.utils.SidecarClientProvider; + +/** + * Factory for creating {@link LiveMigrationFilesVerificationTask} instances used to verify file digests + * between source and destination instances during live migration operation. + */ +@Singleton +public class LiveMigrationFilesVerificationTaskFactory +{ + private final Vertx vertx; + private final SidecarClientProvider sidecarClientProvider; + private final ExecutorPools executorPools; + private final DigestVerifierFactory digestVerifierFactory; + private final SidecarConfiguration sidecarConfiguration; + + @Inject + public LiveMigrationFilesVerificationTaskFactory(Vertx vertx, + ExecutorPools executorPools, + SidecarConfiguration sidecarConfiguration, + SidecarClientProvider sidecarClientProvider, + DigestVerifierFactory digestVerifierFactory) + { + this.vertx = vertx; + this.sidecarClientProvider = sidecarClientProvider; + this.executorPools = executorPools; + this.digestVerifierFactory = digestVerifierFactory; + this.sidecarConfiguration = sidecarConfiguration; + } + + public LiveMigrationTask<LiveMigrationFilesVerificationResponse> create(String id, + String source, + int port, + LiveMigrationFilesVerificationRequest request, + InstanceMetadata localInstanceMetadata) + { + return LiveMigrationFilesVerificationTask.builder() + .id(id) + .source(source) + .port(port) + .vertx(vertx) + .executorPools(executorPools) + .sidecarClient(sidecarClientProvider.get()) Review Comment: SidecarClientProvider is Singleton and `.get()` is returning its internal initialized client. Only one client instance is used. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

