frankgh commented on code in PR #309: URL: https://github.com/apache/cassandra-sidecar/pull/309#discussion_r3170657610
########## client-common/src/main/java/org/apache/cassandra/sidecar/common/response/DigestResponse.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.response; + +import java.util.Objects; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.cassandra.sidecar.common.request.data.Digest; +import org.apache.cassandra.sidecar.common.request.data.MD5Digest; +import org.apache.cassandra.sidecar.common.request.data.XXHash32Digest; + +/** + * Response object containing a cryptographic digest value for file verification purposes. + */ +public class DigestResponse +{ + @JsonProperty("digest") + public final String digest; + @JsonProperty("digestAlgorithm") + public final String digestAlgorithm; + + @JsonCreator + public DigestResponse(@JsonProperty("digest") String digest, + @JsonProperty("digestAlgorithm") String digestAlgorithm) + { + Objects.requireNonNull(digest, "digest is required"); + Objects.requireNonNull(digestAlgorithm, "digestAlgorithm is required"); + this.digest = digest; + this.digestAlgorithm = digestAlgorithm; + } Review Comment: very small NIT, no need to change it ```suggestion this.digest = Objects.requireNonNull(digest, "digest is required"); this.digestAlgorithm = Objects.requireNonNull(digestAlgorithm, "digestAlgorithm is required"); ``` ########## client-common/src/main/java/org/apache/cassandra/sidecar/common/response/LiveMigrationFilesVerificationResponse.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.response; + +import java.util.Objects; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Response object for live migration file verification operations, containing statistics about + * files not found at source/target and digest mismatches during verification. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class LiveMigrationFilesVerificationResponse +{ + private final String id; + private final String state; + private final String source; + private final int port; + private final int filesNotFoundAtSource; + private final int filesNotFoundAtDestination; + private final int metadataMatched; + private final int metadataMismatches; + private final int digestMismatches; + private final int digestVerificationFailures; + private final int filesMatched; + private final String digestAlgorithm; + + @JsonCreator + public LiveMigrationFilesVerificationResponse(@JsonProperty("id") String id, + @JsonProperty("digestAlgorithm") String digestAlgorithm, + @JsonProperty("state") String state, + @JsonProperty("source") String source, + @JsonProperty("port") int port, + @JsonProperty("filesNotFoundAtSource") int filesNotFoundAtSource, + @JsonProperty("filesNotFoundAtDestination") int filesNotFoundAtDestination, + @JsonProperty("metadataMatched") int metadataMatched, + @JsonProperty("metadataMismatches") int metadataMismatches, + @JsonProperty("digestMismatches") int digestMismatches, + @JsonProperty("digestVerificationFailures") int digestVerificationFailures, + @JsonProperty("filesMatched") int filesMatched) + { + Objects.requireNonNull(id, "id of files verification task must be specified"); + Objects.requireNonNull(state, "state of files verification task must be specified"); + this.id = id; + this.digestAlgorithm = digestAlgorithm; + this.state = state; Review Comment: Small NIT, feel free to ignore: ```suggestion this.id = Objects.requireNonNull(id, "id of files verification task must be specified"); this.digestAlgorithm = digestAlgorithm; this.state = Objects.requireNonNull(state, "state of files verification task must be specified"); ``` ########## CHANGES.txt: ########## @@ -1,5 +1,6 @@ 0.3.0 ----- + * Adding endpoint for verifying files post data copy during live migration (CASSSIDECAR-226) Review Comment: should move under 0.4.0 ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateFilesVerificationTaskHandler.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.livemigration; + +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.json.DecodeException; +import io.vertx.core.json.Json; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest; +import org.apache.cassandra.sidecar.common.response.LiveMigrationTaskCreationResponse; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationInvalidRequestException; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskInProgressException; +import org.apache.cassandra.sidecar.exceptions.NoSuchCassandraInstanceException; +import org.apache.cassandra.sidecar.handlers.AbstractHandler; +import org.apache.cassandra.sidecar.handlers.AccessProtected; +import org.apache.cassandra.sidecar.livemigration.FilesVerificationTaskManager; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; + +import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED; +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.CONFLICT; +import static io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE; +import static org.apache.cassandra.sidecar.acl.authorization.BasicPermissions.DATA_COPY; +import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.LIVE_MIGRATION_FILES_VERIFICATION_TASKS_ROUTE; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * HTTP handler for creating file digest verification tasks during live migration. + * Manages concurrent verification tasks per instance and orchestrates the verification process. + */ +public class LiveMigrationCreateFilesVerificationTaskHandler extends AbstractHandler<LiveMigrationFilesVerificationRequest> implements AccessProtected +{ + private static final Logger LOGGER = LoggerFactory.getLogger(LiveMigrationCreateFilesVerificationTaskHandler.class); + + private final FilesVerificationTaskManager filesVerificationTaskManager; + private final LiveMigrationMap liveMigrationMap; + + /** + * Constructs a handler with the provided {@code metadataFetcher} + * + * @param metadataFetcher the interface to retrieve instance metadata + * @param executorPools the executor pools for blocking executions + * @param validator a validator instance to validate Cassandra-specific input + */ + @Inject + protected LiveMigrationCreateFilesVerificationTaskHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + CassandraInputValidator validator, + LiveMigrationMap liveMigrationMap, + FilesVerificationTaskManager filesVerificationTaskManager) + { + super(metadataFetcher, executorPools, validator); + this.liveMigrationMap = liveMigrationMap; + this.filesVerificationTaskManager = filesVerificationTaskManager; + } + + @Override + protected LiveMigrationFilesVerificationRequest extractParamsOrThrow(RoutingContext context) + { + try + { + return Json.decodeValue(context.body().buffer(), LiveMigrationFilesVerificationRequest.class); + } + catch (DecodeException decodeException) + { + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, + "Failed to parse request body, please ensure that the request is valid.", + decodeException); + } + catch (IllegalArgumentException e) + { + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, e.getMessage(), e); + } + } + + @Override + protected void handleInternal(RoutingContext context, + HttpServerRequest httpRequest, + @NotNull String host, + SocketAddress remoteAddress, + LiveMigrationFilesVerificationRequest request) + { + LOGGER.debug("Received files verification request for host {} with maxConcurrency {}", + host, request.maxConcurrency()); + InstanceMetadata localInstanceMetadata; + try + { + localInstanceMetadata = metadataFetcher.instance(host); + } + catch (NoSuchCassandraInstanceException e) + { + LOGGER.error("Failed to fetch instance metadata for host={}", host); + context.fail(wrapHttpException(SERVICE_UNAVAILABLE, e)); + return; + } Review Comment: There's no need to handle these exceptions here. It is already explicitly handled in `org.apache.cassandra.sidecar.handlers.AbstractHandler#determineHttpException` ```suggestion InstanceMetadata localInstanceMetadata = metadataFetcher.instance(host); ``` ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationGetFilesVerificationTaskHandler.java: ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.livemigration; + +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskNotFoundException; +import org.apache.cassandra.sidecar.handlers.AbstractHandler; +import org.apache.cassandra.sidecar.handlers.AccessProtected; +import org.apache.cassandra.sidecar.livemigration.FilesVerificationTaskManager; +import org.apache.cassandra.sidecar.livemigration.LiveMigrationTask; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.acl.authorization.BasicPermissions.DATA_COPY; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Handler for retrieving a specific live migration files verification task by task ID. + * Returns the task details if found, or a 404 error if the task does not exist on the specified host. + */ +public class LiveMigrationGetFilesVerificationTaskHandler extends AbstractHandler<String> implements AccessProtected Review Comment: ```suggestion @Singleton public class LiveMigrationGetFilesVerificationTaskHandler extends AbstractHandler<String> implements AccessProtected ``` ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCancelFilesVerificationTaskHandler.java: ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.livemigration; + +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskNotFoundException; +import org.apache.cassandra.sidecar.handlers.AbstractHandler; +import org.apache.cassandra.sidecar.handlers.AccessProtected; +import org.apache.cassandra.sidecar.livemigration.FilesVerificationTaskManager; +import org.apache.cassandra.sidecar.livemigration.LiveMigrationTask; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.acl.authorization.BasicPermissions.DATA_COPY; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Handler for canceling an active live migration files verification task. + * Accepts a task ID and cancels the corresponding verification task on the specified host. + */ +public class LiveMigrationCancelFilesVerificationTaskHandler extends AbstractHandler<String> implements AccessProtected Review Comment: Annotate with Singleton ```suggestion @Singleton public class LiveMigrationCancelFilesVerificationTaskHandler extends AbstractHandler<String> implements AccessProtected ``` ########## client-common/src/test/java/org/apache/cassandra/sidecar/common/response/LiveMigrationFilesVerificationResponseTest.java: ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.response; + +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class LiveMigrationFilesVerificationResponseTest +{ + private final ObjectMapper mapper = new ObjectMapper(); + + @Test + void testSerializationRoundTrip() throws Exception Review Comment: we can simplify with a ParameterizedTest ########## client-common/src/main/java/org/apache/cassandra/sidecar/common/response/LiveMigrationTaskCreationResponse.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.response; + +import java.util.Objects; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Response object returned when a live migration task is created. + * Contains the task identifier and a URL to query the task status. + */ +public class LiveMigrationTaskCreationResponse +{ + private final String taskId; + private final String statusUrl; + + @JsonCreator + public LiveMigrationTaskCreationResponse(@JsonProperty("taskId") String taskId, + @JsonProperty("statusUrl") String statusUrl) + { + Objects.requireNonNull(taskId, "taskId cannot be null"); + Objects.requireNonNull(statusUrl, "statusUrl cannot be null"); + + this.taskId = taskId; + this.statusUrl = statusUrl; Review Comment: Small NIT, feel free to ignore ```suggestion this.taskId = Objects.requireNonNull(taskId, "taskId cannot be null"); this.statusUrl = Objects.requireNonNull(statusUrl, "statusUrl cannot be null"); ``` ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateFilesVerificationTaskHandler.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.livemigration; + +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.json.DecodeException; +import io.vertx.core.json.Json; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest; +import org.apache.cassandra.sidecar.common.response.LiveMigrationTaskCreationResponse; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationInvalidRequestException; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskInProgressException; +import org.apache.cassandra.sidecar.exceptions.NoSuchCassandraInstanceException; +import org.apache.cassandra.sidecar.handlers.AbstractHandler; +import org.apache.cassandra.sidecar.handlers.AccessProtected; +import org.apache.cassandra.sidecar.livemigration.FilesVerificationTaskManager; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; + +import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED; +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.CONFLICT; +import static io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE; +import static org.apache.cassandra.sidecar.acl.authorization.BasicPermissions.DATA_COPY; +import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.LIVE_MIGRATION_FILES_VERIFICATION_TASKS_ROUTE; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * HTTP handler for creating file digest verification tasks during live migration. + * Manages concurrent verification tasks per instance and orchestrates the verification process. + */ +public class LiveMigrationCreateFilesVerificationTaskHandler extends AbstractHandler<LiveMigrationFilesVerificationRequest> implements AccessProtected Review Comment: ```suggestion @Singleton public class LiveMigrationCreateFilesVerificationTaskHandler extends AbstractHandler<LiveMigrationFilesVerificationRequest> implements AccessProtected ``` ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCancelFilesVerificationTaskHandler.java: ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.livemigration; + +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskNotFoundException; +import org.apache.cassandra.sidecar.handlers.AbstractHandler; +import org.apache.cassandra.sidecar.handlers.AccessProtected; +import org.apache.cassandra.sidecar.livemigration.FilesVerificationTaskManager; +import org.apache.cassandra.sidecar.livemigration.LiveMigrationTask; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.acl.authorization.BasicPermissions.DATA_COPY; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Handler for canceling an active live migration files verification task. + * Accepts a task ID and cancels the corresponding verification task on the specified host. + */ +public class LiveMigrationCancelFilesVerificationTaskHandler extends AbstractHandler<String> implements AccessProtected +{ + private static final Logger LOGGER = LoggerFactory.getLogger(LiveMigrationCancelFilesVerificationTaskHandler.class); + + private final FilesVerificationTaskManager taskManager; + + /** + * Constructs a handler with the provided {@code metadataFetcher} + * + * @param metadataFetcher the interface to retrieve instance metadata + * @param executorPools the executor pools for blocking executions + * @param validator a validator instance to validate Cassandra-specific input + */ + @Inject + public LiveMigrationCancelFilesVerificationTaskHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + CassandraInputValidator validator, + FilesVerificationTaskManager taskManager) + { + super(metadataFetcher, executorPools, validator); + this.taskManager = taskManager; + } + + @Override + protected String extractParamsOrThrow(RoutingContext context) + { + return context.pathParam("taskId"); Review Comment: can we do some validation of this parameter here? I don't think it can be null, but can it be empty? Is there some parameter validation that can be done ? ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationConcurrencyLimitHandler.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.livemigration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Handler; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.concurrent.ConcurrencyLimiter; +import org.apache.cassandra.sidecar.config.SidecarConfiguration; + +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Handler that enforces concurrency limits for live migration file operations. + * Returns HTTP 503 (SERVICE_UNAVAILABLE) when the maximum concurrent file requests limit is exceeded. + */ +@Singleton +public class LiveMigrationConcurrencyLimitHandler implements Handler<RoutingContext> +{ + private static final Logger LOGGER = LoggerFactory.getLogger(LiveMigrationConcurrencyLimitHandler.class); + + private final ConcurrencyLimiter concurrencyLimiter; + + @Inject + public LiveMigrationConcurrencyLimitHandler(SidecarConfiguration sidecarConfiguration) + { + this.concurrencyLimiter = + new ConcurrencyLimiter(() -> sidecarConfiguration.liveMigrationConfiguration().maxConcurrentFileRequests()); + } + + @Override + public void handle(RoutingContext rc) + { + if (!concurrencyLimiter.tryAcquire()) + { + LOGGER.warn("Too many concurrent live migration file requests. Path={}", rc.request().path()); + rc.fail(wrapHttpException(HttpResponseStatus.SERVICE_UNAVAILABLE, Review Comment: Should the status code be a 429 instead? maybe we should follow the implementation of `org.apache.cassandra.sidecar.utils.FileStreamer#acquire` ? ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationDigestHandlerWrapper.java: ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.livemigration; + +import com.google.inject.Inject; +import io.vertx.core.Handler; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.common.request.LiveMigrationFileDigestRequest; + +import static org.apache.cassandra.sidecar.common.request.LiveMigrationFileDigestRequest.DIGEST_ALGORITHM_PARAM; + +/** + * Wrapper handler that conditionally delegates to {@link LiveMigrationFileDigestHandler} based on the presence + * of the {@link LiveMigrationFileDigestRequest#DIGEST_ALGORITHM_PARAM} query parameter, otherwise + * passes control to the next handler in the chain. + */ +public class LiveMigrationDigestHandlerWrapper implements Handler<RoutingContext> Review Comment: ```suggestion @Singleton public class LiveMigrationDigestHandlerWrapper implements Handler<RoutingContext> ``` ########## server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFilesVerificationTaskFactory.java: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.livemigration; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.Vertx; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest; +import org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.utils.DigestVerifierFactory; +import org.apache.cassandra.sidecar.utils.SidecarClientProvider; + +/** + * Factory for creating {@link LiveMigrationFilesVerificationTask} instances used to verify file digests + * between source and destination instances during live migration operation. + */ +@Singleton +public class LiveMigrationFilesVerificationTaskFactory +{ + private final Vertx vertx; + private final SidecarClientProvider sidecarClientProvider; + private final ExecutorPools executorPools; + private final DigestVerifierFactory digestVerifierFactory; + private final SidecarConfiguration sidecarConfiguration; + + @Inject + public LiveMigrationFilesVerificationTaskFactory(Vertx vertx, + ExecutorPools executorPools, + SidecarConfiguration sidecarConfiguration, + SidecarClientProvider sidecarClientProvider, + DigestVerifierFactory digestVerifierFactory) + { + this.vertx = vertx; + this.sidecarClientProvider = sidecarClientProvider; + this.executorPools = executorPools; + this.digestVerifierFactory = digestVerifierFactory; + this.sidecarConfiguration = sidecarConfiguration; + } + + public LiveMigrationTask<LiveMigrationFilesVerificationResponse> create(String id, + String source, + int port, + LiveMigrationFilesVerificationRequest request, + InstanceMetadata localInstanceMetadata) + { + return LiveMigrationFilesVerificationTask.builder() + .id(id) + .source(source) + .port(port) + .vertx(vertx) + .executorPools(executorPools) + .sidecarClient(sidecarClientProvider.get()) Review Comment: What's the side-effect of calling `sidecarClientProvider.get()`? I don't remember off the top of my head. Can you ensure that calling `sidecarClientProvider.get()` does not create a new client? We should reuse a single client throughout the application. ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateFilesVerificationTaskHandler.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.livemigration; + +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.json.DecodeException; +import io.vertx.core.json.Json; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest; +import org.apache.cassandra.sidecar.common.response.LiveMigrationTaskCreationResponse; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationInvalidRequestException; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskInProgressException; +import org.apache.cassandra.sidecar.exceptions.NoSuchCassandraInstanceException; +import org.apache.cassandra.sidecar.handlers.AbstractHandler; +import org.apache.cassandra.sidecar.handlers.AccessProtected; +import org.apache.cassandra.sidecar.livemigration.FilesVerificationTaskManager; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; + +import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED; +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.CONFLICT; +import static io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE; +import static org.apache.cassandra.sidecar.acl.authorization.BasicPermissions.DATA_COPY; +import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.LIVE_MIGRATION_FILES_VERIFICATION_TASKS_ROUTE; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * HTTP handler for creating file digest verification tasks during live migration. + * Manages concurrent verification tasks per instance and orchestrates the verification process. + */ +public class LiveMigrationCreateFilesVerificationTaskHandler extends AbstractHandler<LiveMigrationFilesVerificationRequest> implements AccessProtected +{ + private static final Logger LOGGER = LoggerFactory.getLogger(LiveMigrationCreateFilesVerificationTaskHandler.class); + + private final FilesVerificationTaskManager filesVerificationTaskManager; + private final LiveMigrationMap liveMigrationMap; + + /** + * Constructs a handler with the provided {@code metadataFetcher} + * + * @param metadataFetcher the interface to retrieve instance metadata + * @param executorPools the executor pools for blocking executions + * @param validator a validator instance to validate Cassandra-specific input + */ + @Inject + protected LiveMigrationCreateFilesVerificationTaskHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + CassandraInputValidator validator, + LiveMigrationMap liveMigrationMap, + FilesVerificationTaskManager filesVerificationTaskManager) + { + super(metadataFetcher, executorPools, validator); + this.liveMigrationMap = liveMigrationMap; + this.filesVerificationTaskManager = filesVerificationTaskManager; + } + + @Override + protected LiveMigrationFilesVerificationRequest extractParamsOrThrow(RoutingContext context) + { + try + { + return Json.decodeValue(context.body().buffer(), LiveMigrationFilesVerificationRequest.class); + } + catch (DecodeException decodeException) + { + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, + "Failed to parse request body, please ensure that the request is valid.", + decodeException); + } + catch (IllegalArgumentException e) + { + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, e.getMessage(), e); + } + } + + @Override + protected void handleInternal(RoutingContext context, + HttpServerRequest httpRequest, + @NotNull String host, + SocketAddress remoteAddress, + LiveMigrationFilesVerificationRequest request) + { + LOGGER.debug("Received files verification request for host {} with maxConcurrency {}", Review Comment: this log entry might be redundant, as we already have it in `org.apache.cassandra.sidecar.handlers.AbstractHandler#handle` where we log the request. I guess we just need to make sure that the `request` has a `toString()` method ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationFileDigestHandler.java: ########## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.livemigration; + +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.common.request.LiveMigrationFileDigestRequest; +import org.apache.cassandra.sidecar.common.response.DigestResponse; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.handlers.AbstractHandler; +import org.apache.cassandra.sidecar.handlers.AccessProtected; +import org.apache.cassandra.sidecar.handlers.FileStreamHandler; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.DigestAlgorithm; +import org.apache.cassandra.sidecar.utils.DigestAlgorithmFactory; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; + +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static org.apache.cassandra.sidecar.common.request.LiveMigrationFileDigestRequest.DIGEST_ALGORITHM_PARAM; +import static org.apache.cassandra.sidecar.utils.AsyncFileDigestCalculator.calculateDigest; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Handler for calculating and returning file digests during live migration. + * Supported digest algorithms are determined by {@link DigestAlgorithmFactory} and specified + * via the {@link LiveMigrationFileDigestRequest#DIGEST_ALGORITHM_PARAM} query parameter. + */ +public class LiveMigrationFileDigestHandler extends AbstractHandler<DigestAlgorithm> implements AccessProtected +{ + private static final Logger LOGGER = LoggerFactory.getLogger(LiveMigrationFileDigestHandler.class); + + private final Vertx vertx; + private final DigestAlgorithmFactory digestAlgorithmFactory; + + @Inject + public LiveMigrationFileDigestHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + CassandraInputValidator validator, + Vertx vertx, + DigestAlgorithmFactory digestAlgorithmFactory) + { + super(metadataFetcher, executorPools, validator); + this.vertx = vertx; + this.digestAlgorithmFactory = digestAlgorithmFactory; + } + + @Override + protected DigestAlgorithm extractParamsOrThrow(RoutingContext context) + { + String digestAlgorithmParam = getDigestAlgorithmParam(context); + try + { + return digestAlgorithmFactory.getDigestAlgorithm(digestAlgorithmParam, 0); + } + catch (IllegalArgumentException e) + { + LOGGER.error("Unexpected error while getting digest algorithm for {}", digestAlgorithmParam, e); + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, e.getMessage()); + } + } + + @Override + protected void handleInternal(RoutingContext context, + HttpServerRequest httpRequest, + @NotNull String host, + SocketAddress remoteAddress, + DigestAlgorithm digestAlgorithm) + { + String file = context.get(FileStreamHandler.FILE_PATH_CONTEXT_KEY); + + if (file == null) + { + LOGGER.error("File path not found in context"); + context.fail(wrapHttpException(INTERNAL_SERVER_ERROR, "File path not available")); + return; + } + calculateDigest(vertx, file, digestAlgorithm) + .onComplete(ar -> { Review Comment: Why don't we re-use the existing machinery for digest verification? it feels like we are rewriting the same work instead of adjusting to the existing implementation. ``` DigestVerifier digestVerifier = digestVerifierFactory.verifier(httpRequest.headers()); digestVerifier.verify(file) ```` ########## client-common/src/test/java/org/apache/cassandra/sidecar/common/response/LiveMigrationFilesVerificationResponseTest.java: ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.response; + +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class LiveMigrationFilesVerificationResponseTest +{ + private final ObjectMapper mapper = new ObjectMapper(); + + @Test + void testSerializationRoundTrip() throws Exception + { + LiveMigrationFilesVerificationResponse original = new LiveMigrationFilesVerificationResponse( + "test-id-123", + "MD5", + "COMPLETED", + "192.168.1.100", + 9042, + 5, + 10, + 100, + 15, + 20, + 3, + 85 + ); + + // Serialize to JSON + String json = mapper.writeValueAsString(original); + + // Deserialize back to object + LiveMigrationFilesVerificationResponse deserialized = + mapper.readValue(json, LiveMigrationFilesVerificationResponse.class); + + // Verify all fields match + assertThat(deserialized.id()).isEqualTo(original.id()); + assertThat(deserialized.digestAlgorithm()).isEqualTo(original.digestAlgorithm()); + assertThat(deserialized.state()).isEqualTo(original.state()); + assertThat(deserialized.source()).isEqualTo(original.source()); + assertThat(deserialized.port()).isEqualTo(original.port()); + assertThat(deserialized.filesNotFoundAtSource()).isEqualTo(original.filesNotFoundAtSource()); + assertThat(deserialized.filesNotFoundAtDestination()).isEqualTo(original.filesNotFoundAtDestination()); + assertThat(deserialized.metadataMatched()).isEqualTo(original.metadataMatched()); + assertThat(deserialized.metadataMismatches()).isEqualTo(original.metadataMismatches()); + assertThat(deserialized.digestMismatches()).isEqualTo(original.digestMismatches()); + assertThat(deserialized.digestVerificationFailures()).isEqualTo(original.digestVerificationFailures()); + assertThat(deserialized.filesMatched()).isEqualTo(original.filesMatched()); + assertThat(deserialized.isVerificationSuccessful()).isEqualTo(original.isVerificationSuccessful()); Review Comment: Let's move to a helper method and reuse for other tests. ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationFileDigestHandler.java: ########## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.livemigration; + +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.common.request.LiveMigrationFileDigestRequest; +import org.apache.cassandra.sidecar.common.response.DigestResponse; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.handlers.AbstractHandler; +import org.apache.cassandra.sidecar.handlers.AccessProtected; +import org.apache.cassandra.sidecar.handlers.FileStreamHandler; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.DigestAlgorithm; +import org.apache.cassandra.sidecar.utils.DigestAlgorithmFactory; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; + +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static org.apache.cassandra.sidecar.common.request.LiveMigrationFileDigestRequest.DIGEST_ALGORITHM_PARAM; +import static org.apache.cassandra.sidecar.utils.AsyncFileDigestCalculator.calculateDigest; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Handler for calculating and returning file digests during live migration. + * Supported digest algorithms are determined by {@link DigestAlgorithmFactory} and specified + * via the {@link LiveMigrationFileDigestRequest#DIGEST_ALGORITHM_PARAM} query parameter. + */ +public class LiveMigrationFileDigestHandler extends AbstractHandler<DigestAlgorithm> implements AccessProtected +{ + private static final Logger LOGGER = LoggerFactory.getLogger(LiveMigrationFileDigestHandler.class); + + private final Vertx vertx; + private final DigestAlgorithmFactory digestAlgorithmFactory; + + @Inject + public LiveMigrationFileDigestHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + CassandraInputValidator validator, + Vertx vertx, + DigestAlgorithmFactory digestAlgorithmFactory) + { + super(metadataFetcher, executorPools, validator); + this.vertx = vertx; + this.digestAlgorithmFactory = digestAlgorithmFactory; + } + + @Override + protected DigestAlgorithm extractParamsOrThrow(RoutingContext context) + { + String digestAlgorithmParam = getDigestAlgorithmParam(context); + try + { + return digestAlgorithmFactory.getDigestAlgorithm(digestAlgorithmParam, 0); + } + catch (IllegalArgumentException e) + { + LOGGER.error("Unexpected error while getting digest algorithm for {}", digestAlgorithmParam, e); + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, e.getMessage()); + } Review Comment: I think it's redundant to handle the `IllegalArgumentException` here. This will be correctly handled on an exception here: `org.apache.cassandra.sidecar.handlers.AbstractHandler#processFailure` ```suggestion String digestAlgorithmParam = getDigestAlgorithmParam(context); return digestAlgorithmFactory.getDigestAlgorithm(digestAlgorithmParam, 0); ``` ########## server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskManager.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.livemigration; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.annotations.VisibleForTesting; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions; +import org.jetbrains.annotations.NotNull; + +/** + * Centralized singleton manager for all live migration tasks across Cassandra instances. + * Enforces the constraint that only one {@link LiveMigrationTask} (of any type) can be active per instance at a time, + * preventing concurrent data copy and files verification operations that could lead to resource conflicts or data integrity issues. + * This singleton is shared across {@link DataCopyTaskManager} and {@link FilesVerificationTaskManager} to ensure + * proper coordination and mutual exclusion of tasks per instance. + */ +@Singleton +public class LiveMigrationTaskManager +{ + @VisibleForTesting + final ConcurrentHashMap<Integer, LiveMigrationTask<?>> currentTasks = new ConcurrentHashMap<>(); + + private final InstancesMetadata instancesMetadata; + + @Inject + public LiveMigrationTaskManager(InstancesMetadata instancesMetadata) + { + this.instancesMetadata = instancesMetadata; + } + + /** + * Attempts to submit a new task for the specified instance. + * Only one task (of any type) can be active per instance at a time. + * + * @param instanceId the instance ID + * @param newTask the task to submit + * @return true if the task was accepted, false if another task is already in progress + */ + public boolean submitTask(int instanceId, LiveMigrationTask<?> newTask) + { + return currentTasks.compute(instanceId, (integer, taskInMap) -> { Review Comment: NIT ```suggestion return currentTasks.compute(instanceId, (ignored, taskInMap) -> { ``` ########## server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskManager.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.livemigration; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.annotations.VisibleForTesting; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions; +import org.jetbrains.annotations.NotNull; + +/** + * Centralized singleton manager for all live migration tasks across Cassandra instances. + * Enforces the constraint that only one {@link LiveMigrationTask} (of any type) can be active per instance at a time, + * preventing concurrent data copy and files verification operations that could lead to resource conflicts or data integrity issues. + * This singleton is shared across {@link DataCopyTaskManager} and {@link FilesVerificationTaskManager} to ensure + * proper coordination and mutual exclusion of tasks per instance. + */ +@Singleton +public class LiveMigrationTaskManager +{ + @VisibleForTesting + final ConcurrentHashMap<Integer, LiveMigrationTask<?>> currentTasks = new ConcurrentHashMap<>(); + + private final InstancesMetadata instancesMetadata; + + @Inject + public LiveMigrationTaskManager(InstancesMetadata instancesMetadata) + { + this.instancesMetadata = instancesMetadata; + } + + /** + * Attempts to submit a new task for the specified instance. + * Only one task (of any type) can be active per instance at a time. + * + * @param instanceId the instance ID + * @param newTask the task to submit + * @return true if the task was accepted, false if another task is already in progress + */ + public boolean submitTask(int instanceId, LiveMigrationTask<?> newTask) + { + return currentTasks.compute(instanceId, (integer, taskInMap) -> { + if (taskInMap == null) + { + return newTask; + } + + if (!taskInMap.isCompleted()) + { + // Reject new task if existing task is still in progress + return taskInMap; + } + else + { + // Accept new task if existing task has completed + return newTask; + } + }) == newTask; + } + + /** + * Returns all live migration tasks for given currentHost. + * This includes both active and completed tasks that haven't been replaced. + * + * @param currentHost the host where sidecar is running + * @return list containing at most one task (empty if no task has ever been submitted for this host) + */ + @SuppressWarnings("ConstantValue") + public List<LiveMigrationTask<?>> getAllTasks(@NotNull String currentHost) + { + InstanceMetadata localInstance = instancesMetadata.instanceFromHost(currentHost); + if (localInstance == null) + { + throw new IllegalStateException("No instance found for host: " + currentHost); + } + + LiveMigrationTask<?> task = currentTasks.get(localInstance.id()); + return task == null ? Collections.emptyList() : Collections.singletonList(task); + } + + /** + * Returns the live migration task with the specified task ID. + * + * @param taskId ID of the task to retrieve + * @param currentHost the host where sidecar is running + * @return the LiveMigrationTask matching the given taskId + * @throws LiveMigrationExceptions.LiveMigrationTaskNotFoundException if no task found with the given ID + */ + public LiveMigrationTask<?> getTask(@NotNull String taskId, + @NotNull String currentHost) throws LiveMigrationExceptions.LiveMigrationTaskNotFoundException + { + return getLiveMigrationTask(taskId, currentHost); + } + + /** + * Cancels the live migration task with the specified task ID. + * + * @param taskId ID of the task to cancel + * @param currentHost the host where sidecar is running + * @return the cancelled LiveMigrationTask + * @throws LiveMigrationExceptions.LiveMigrationTaskNotFoundException if no task found with the given ID + */ + public LiveMigrationTask<?> cancelTask(@NotNull String taskId, + @NotNull String currentHost) throws LiveMigrationExceptions.LiveMigrationTaskNotFoundException + { + LiveMigrationTask<?> taskInProgress = getLiveMigrationTask(taskId, currentHost); + + // Cancelling the task + taskInProgress.cancel(); + + return taskInProgress; + } + + @SuppressWarnings("ConstantValue") + private LiveMigrationTask<?> getLiveMigrationTask(@NotNull String taskId, @NotNull String currentHost) + { + InstanceMetadata localInstance = instancesMetadata.instanceFromHost(currentHost); + if (localInstance == null) + { + throw new IllegalStateException("No instance found for host: " + currentHost); + } Review Comment: not necessary to do a null check here ```suggestion ``` ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationGetAllFilesVerificationTasksHandler.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.livemigration; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import com.google.inject.Inject; +import io.vertx.core.Handler; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse; +import org.apache.cassandra.sidecar.handlers.AccessProtected; +import org.apache.cassandra.sidecar.livemigration.FilesVerificationTaskManager; +import org.apache.cassandra.sidecar.livemigration.LiveMigrationTask; + +import static org.apache.cassandra.sidecar.acl.authorization.BasicPermissions.DATA_COPY; +import static org.apache.cassandra.sidecar.handlers.AbstractHandler.extractHostAddressWithoutPort; + +/** + * Handler for retrieving all active live migration files verification tasks. + * Returns a list of all verification tasks running on the local host. + */ +public class LiveMigrationGetAllFilesVerificationTasksHandler implements Handler<RoutingContext>, AccessProtected Review Comment: ```suggestion @Singleton public class LiveMigrationGetAllFilesVerificationTasksHandler implements Handler<RoutingContext>, AccessProtected ``` ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationGetFilesVerificationTaskHandler.java: ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.livemigration; + +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskNotFoundException; +import org.apache.cassandra.sidecar.handlers.AbstractHandler; +import org.apache.cassandra.sidecar.handlers.AccessProtected; +import org.apache.cassandra.sidecar.livemigration.FilesVerificationTaskManager; +import org.apache.cassandra.sidecar.livemigration.LiveMigrationTask; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.acl.authorization.BasicPermissions.DATA_COPY; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Handler for retrieving a specific live migration files verification task by task ID. + * Returns the task details if found, or a 404 error if the task does not exist on the specified host. + */ +public class LiveMigrationGetFilesVerificationTaskHandler extends AbstractHandler<String> implements AccessProtected +{ + private static final Logger LOGGER = LoggerFactory.getLogger(LiveMigrationGetFilesVerificationTaskHandler.class); + + private final FilesVerificationTaskManager taskManager; + + /** + * Constructs a handler with the provided {@code metadataFetcher} + * + * @param metadataFetcher the interface to retrieve instance metadata + * @param executorPools the executor pools for blocking executions + * @param validator a validator instance to validate Cassandra-specific input + */ + @Inject + public LiveMigrationGetFilesVerificationTaskHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + CassandraInputValidator validator, + FilesVerificationTaskManager taskManager) + { + super(metadataFetcher, executorPools, validator); + this.taskManager = taskManager; + } + + @Override + protected String extractParamsOrThrow(RoutingContext context) + { + String taskId = context.pathParam("taskId"); + if (taskId == null || taskId.isEmpty()) + { + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "taskId is required"); + } + + return taskId; + } + + @Override + protected void handleInternal(RoutingContext context, + HttpServerRequest httpRequest, + @NotNull String host, + SocketAddress remoteAddress, + String taskId) + { + try + { + LiveMigrationTask<LiveMigrationFilesVerificationResponse> task = taskManager.getTask(taskId, host); + LOGGER.info("Found live migration task with taskId={} on host={}", taskId, host); Review Comment: seems too much, can we lower to debug? ########## server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskManager.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.livemigration; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.annotations.VisibleForTesting; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions; +import org.jetbrains.annotations.NotNull; + +/** + * Centralized singleton manager for all live migration tasks across Cassandra instances. + * Enforces the constraint that only one {@link LiveMigrationTask} (of any type) can be active per instance at a time, + * preventing concurrent data copy and files verification operations that could lead to resource conflicts or data integrity issues. + * This singleton is shared across {@link DataCopyTaskManager} and {@link FilesVerificationTaskManager} to ensure + * proper coordination and mutual exclusion of tasks per instance. + */ +@Singleton +public class LiveMigrationTaskManager +{ + @VisibleForTesting + final ConcurrentHashMap<Integer, LiveMigrationTask<?>> currentTasks = new ConcurrentHashMap<>(); + + private final InstancesMetadata instancesMetadata; + + @Inject + public LiveMigrationTaskManager(InstancesMetadata instancesMetadata) + { + this.instancesMetadata = instancesMetadata; + } + + /** + * Attempts to submit a new task for the specified instance. + * Only one task (of any type) can be active per instance at a time. + * + * @param instanceId the instance ID + * @param newTask the task to submit + * @return true if the task was accepted, false if another task is already in progress + */ + public boolean submitTask(int instanceId, LiveMigrationTask<?> newTask) + { + return currentTasks.compute(instanceId, (integer, taskInMap) -> { + if (taskInMap == null) + { + return newTask; + } + + if (!taskInMap.isCompleted()) + { + // Reject new task if existing task is still in progress + return taskInMap; + } + else + { + // Accept new task if existing task has completed + return newTask; + } + }) == newTask; + } + + /** + * Returns all live migration tasks for given currentHost. + * This includes both active and completed tasks that haven't been replaced. + * + * @param currentHost the host where sidecar is running + * @return list containing at most one task (empty if no task has ever been submitted for this host) + */ + @SuppressWarnings("ConstantValue") + public List<LiveMigrationTask<?>> getAllTasks(@NotNull String currentHost) + { + InstanceMetadata localInstance = instancesMetadata.instanceFromHost(currentHost); + if (localInstance == null) + { + throw new IllegalStateException("No instance found for host: " + currentHost); + } + Review Comment: this check is not required. instanceFromHost returns non-null results -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

