frankgh commented on code in PR #309:
URL: https://github.com/apache/cassandra-sidecar/pull/309#discussion_r3170657610


##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/response/DigestResponse.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response;
+
+import java.util.Objects;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.cassandra.sidecar.common.request.data.Digest;
+import org.apache.cassandra.sidecar.common.request.data.MD5Digest;
+import org.apache.cassandra.sidecar.common.request.data.XXHash32Digest;
+
+/**
+ * Response object containing a cryptographic digest value for file 
verification purposes.
+ */
+public class DigestResponse
+{
+    @JsonProperty("digest")
+    public final String digest;
+    @JsonProperty("digestAlgorithm")
+    public final String digestAlgorithm;
+
+    @JsonCreator
+    public DigestResponse(@JsonProperty("digest") String digest,
+                          @JsonProperty("digestAlgorithm") String 
digestAlgorithm)
+    {
+        Objects.requireNonNull(digest, "digest is required");
+        Objects.requireNonNull(digestAlgorithm, "digestAlgorithm is required");
+        this.digest = digest;
+        this.digestAlgorithm = digestAlgorithm;
+    }

Review Comment:
   very small NIT, no need to change it
   ```suggestion
           this.digest = Objects.requireNonNull(digest, "digest is required");
           this.digestAlgorithm = Objects.requireNonNull(digestAlgorithm, 
"digestAlgorithm is required");
   ```



##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/response/LiveMigrationFilesVerificationResponse.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response;
+
+import java.util.Objects;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Response object for live migration file verification operations, containing 
statistics about
+ * files not found at source/target and digest mismatches during verification.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class LiveMigrationFilesVerificationResponse
+{
+    private final String id;
+    private final String state;
+    private final String source;
+    private final int port;
+    private final int filesNotFoundAtSource;
+    private final int filesNotFoundAtDestination;
+    private final int metadataMatched;
+    private final int metadataMismatches;
+    private final int digestMismatches;
+    private final int digestVerificationFailures;
+    private final int filesMatched;
+    private final String digestAlgorithm;
+
+    @JsonCreator
+    public LiveMigrationFilesVerificationResponse(@JsonProperty("id") String 
id,
+                                                  
@JsonProperty("digestAlgorithm") String digestAlgorithm,
+                                                  @JsonProperty("state") 
String state,
+                                                  @JsonProperty("source") 
String source,
+                                                  @JsonProperty("port") int 
port,
+                                                  
@JsonProperty("filesNotFoundAtSource") int filesNotFoundAtSource,
+                                                  
@JsonProperty("filesNotFoundAtDestination") int filesNotFoundAtDestination,
+                                                  
@JsonProperty("metadataMatched") int metadataMatched,
+                                                  
@JsonProperty("metadataMismatches") int metadataMismatches,
+                                                  
@JsonProperty("digestMismatches") int digestMismatches,
+                                                  
@JsonProperty("digestVerificationFailures") int digestVerificationFailures,
+                                                  
@JsonProperty("filesMatched") int filesMatched)
+    {
+        Objects.requireNonNull(id, "id of files verification task must be 
specified");
+        Objects.requireNonNull(state, "state of files verification task must 
be specified");
+        this.id = id;
+        this.digestAlgorithm = digestAlgorithm;
+        this.state = state;

Review Comment:
   Small NIT, feel free to ignore:
   ```suggestion
           this.id = Objects.requireNonNull(id, "id of files verification task 
must be specified");
           this.digestAlgorithm = digestAlgorithm;
           this.state = Objects.requireNonNull(state, "state of files 
verification task must be specified");
   ```



##########
CHANGES.txt:
##########
@@ -1,5 +1,6 @@
 0.3.0
 -----
+ * Adding endpoint for verifying files post data copy during live migration 
(CASSSIDECAR-226)

Review Comment:
   should move under 0.4.0



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateFilesVerificationTaskHandler.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers.livemigration;
+
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.json.DecodeException;
+import io.vertx.core.json.Json;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import 
org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest;
+import 
org.apache.cassandra.sidecar.common.response.LiveMigrationTaskCreationResponse;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationInvalidRequestException;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskInProgressException;
+import 
org.apache.cassandra.sidecar.exceptions.NoSuchCassandraInstanceException;
+import org.apache.cassandra.sidecar.handlers.AbstractHandler;
+import org.apache.cassandra.sidecar.handlers.AccessProtected;
+import org.apache.cassandra.sidecar.livemigration.FilesVerificationTaskManager;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.jetbrains.annotations.NotNull;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED;
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.CONFLICT;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE;
+import static 
org.apache.cassandra.sidecar.acl.authorization.BasicPermissions.DATA_COPY;
+import static 
org.apache.cassandra.sidecar.common.ApiEndpointsV1.LIVE_MIGRATION_FILES_VERIFICATION_TASKS_ROUTE;
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+
+/**
+ * HTTP handler for creating file digest verification tasks during live 
migration.
+ * Manages concurrent verification tasks per instance and orchestrates the 
verification process.
+ */
+public class LiveMigrationCreateFilesVerificationTaskHandler extends 
AbstractHandler<LiveMigrationFilesVerificationRequest> implements 
AccessProtected
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LiveMigrationCreateFilesVerificationTaskHandler.class);
+
+    private final FilesVerificationTaskManager filesVerificationTaskManager;
+    private final LiveMigrationMap liveMigrationMap;
+
+    /**
+     * Constructs a handler with the provided {@code metadataFetcher}
+     *
+     * @param metadataFetcher the interface to retrieve instance metadata
+     * @param executorPools   the executor pools for blocking executions
+     * @param validator       a validator instance to validate 
Cassandra-specific input
+     */
+    @Inject
+    protected 
LiveMigrationCreateFilesVerificationTaskHandler(InstanceMetadataFetcher 
metadataFetcher,
+                                                              ExecutorPools 
executorPools,
+                                                              
CassandraInputValidator validator,
+                                                              LiveMigrationMap 
liveMigrationMap,
+                                                              
FilesVerificationTaskManager filesVerificationTaskManager)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.liveMigrationMap = liveMigrationMap;
+        this.filesVerificationTaskManager = filesVerificationTaskManager;
+    }
+
+    @Override
+    protected LiveMigrationFilesVerificationRequest 
extractParamsOrThrow(RoutingContext context)
+    {
+        try
+        {
+            return Json.decodeValue(context.body().buffer(), 
LiveMigrationFilesVerificationRequest.class);
+        }
+        catch (DecodeException decodeException)
+        {
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST,
+                                    "Failed to parse request body, please 
ensure that the request is valid.",
+                                    decodeException);
+        }
+        catch (IllegalArgumentException e)
+        {
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, 
e.getMessage(), e);
+        }
+    }
+
+    @Override
+    protected void handleInternal(RoutingContext context,
+                                  HttpServerRequest httpRequest,
+                                  @NotNull String host,
+                                  SocketAddress remoteAddress,
+                                  LiveMigrationFilesVerificationRequest 
request)
+    {
+        LOGGER.debug("Received files verification request for host {} with 
maxConcurrency {}",
+                     host, request.maxConcurrency());
+        InstanceMetadata localInstanceMetadata;
+        try
+        {
+            localInstanceMetadata = metadataFetcher.instance(host);
+        }
+        catch (NoSuchCassandraInstanceException e)
+        {
+            LOGGER.error("Failed to fetch instance metadata for host={}", 
host);
+            context.fail(wrapHttpException(SERVICE_UNAVAILABLE, e));
+            return;
+        }

Review Comment:
   There's no need to handle these exceptions here. It is already explicitly 
handled in 
`org.apache.cassandra.sidecar.handlers.AbstractHandler#determineHttpException`
   ```suggestion
           InstanceMetadata localInstanceMetadata = 
metadataFetcher.instance(host);
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationGetFilesVerificationTaskHandler.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers.livemigration;
+
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import 
org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskNotFoundException;
+import org.apache.cassandra.sidecar.handlers.AbstractHandler;
+import org.apache.cassandra.sidecar.handlers.AccessProtected;
+import org.apache.cassandra.sidecar.livemigration.FilesVerificationTaskManager;
+import org.apache.cassandra.sidecar.livemigration.LiveMigrationTask;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.acl.authorization.BasicPermissions.DATA_COPY;
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+
+/**
+ * Handler for retrieving a specific live migration files verification task by 
task ID.
+ * Returns the task details if found, or a 404 error if the task does not 
exist on the specified host.
+ */
+public class LiveMigrationGetFilesVerificationTaskHandler extends 
AbstractHandler<String> implements AccessProtected

Review Comment:
   ```suggestion
   @Singleton
   public class LiveMigrationGetFilesVerificationTaskHandler extends 
AbstractHandler<String> implements AccessProtected
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCancelFilesVerificationTaskHandler.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers.livemigration;
+
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import 
org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskNotFoundException;
+import org.apache.cassandra.sidecar.handlers.AbstractHandler;
+import org.apache.cassandra.sidecar.handlers.AccessProtected;
+import org.apache.cassandra.sidecar.livemigration.FilesVerificationTaskManager;
+import org.apache.cassandra.sidecar.livemigration.LiveMigrationTask;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.acl.authorization.BasicPermissions.DATA_COPY;
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+
+/**
+ * Handler for canceling an active live migration files verification task.
+ * Accepts a task ID and cancels the corresponding verification task on the 
specified host.
+ */
+public class LiveMigrationCancelFilesVerificationTaskHandler extends 
AbstractHandler<String> implements AccessProtected

Review Comment:
   Annotate with Singleton
   ```suggestion
   @Singleton
   public class LiveMigrationCancelFilesVerificationTaskHandler extends 
AbstractHandler<String> implements AccessProtected
   ```



##########
client-common/src/test/java/org/apache/cassandra/sidecar/common/response/LiveMigrationFilesVerificationResponseTest.java:
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response;
+
+import org.junit.jupiter.api.Test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class LiveMigrationFilesVerificationResponseTest
+{
+    private final ObjectMapper mapper = new ObjectMapper();
+
+    @Test
+    void testSerializationRoundTrip() throws Exception

Review Comment:
   we can simplify with a ParameterizedTest



##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/response/LiveMigrationTaskCreationResponse.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response;
+
+import java.util.Objects;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Response object returned when a live migration task is created.
+ * Contains the task identifier and a URL to query the task status.
+ */
+public class LiveMigrationTaskCreationResponse
+{
+    private final String taskId;
+    private final String statusUrl;
+
+    @JsonCreator
+    public LiveMigrationTaskCreationResponse(@JsonProperty("taskId") String 
taskId,
+                                             @JsonProperty("statusUrl") String 
statusUrl)
+    {
+        Objects.requireNonNull(taskId, "taskId cannot be null");
+        Objects.requireNonNull(statusUrl, "statusUrl cannot be null");
+
+        this.taskId = taskId;
+        this.statusUrl = statusUrl;

Review Comment:
   Small NIT, feel free to ignore
   ```suggestion
           this.taskId = Objects.requireNonNull(taskId, "taskId cannot be 
null");
           this.statusUrl = Objects.requireNonNull(statusUrl, "statusUrl cannot 
be null");
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateFilesVerificationTaskHandler.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers.livemigration;
+
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.json.DecodeException;
+import io.vertx.core.json.Json;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import 
org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest;
+import 
org.apache.cassandra.sidecar.common.response.LiveMigrationTaskCreationResponse;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationInvalidRequestException;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskInProgressException;
+import 
org.apache.cassandra.sidecar.exceptions.NoSuchCassandraInstanceException;
+import org.apache.cassandra.sidecar.handlers.AbstractHandler;
+import org.apache.cassandra.sidecar.handlers.AccessProtected;
+import org.apache.cassandra.sidecar.livemigration.FilesVerificationTaskManager;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.jetbrains.annotations.NotNull;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED;
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.CONFLICT;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE;
+import static 
org.apache.cassandra.sidecar.acl.authorization.BasicPermissions.DATA_COPY;
+import static 
org.apache.cassandra.sidecar.common.ApiEndpointsV1.LIVE_MIGRATION_FILES_VERIFICATION_TASKS_ROUTE;
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+
+/**
+ * HTTP handler for creating file digest verification tasks during live 
migration.
+ * Manages concurrent verification tasks per instance and orchestrates the 
verification process.
+ */
+public class LiveMigrationCreateFilesVerificationTaskHandler extends 
AbstractHandler<LiveMigrationFilesVerificationRequest> implements 
AccessProtected

Review Comment:
   ```suggestion
   @Singleton
   public class LiveMigrationCreateFilesVerificationTaskHandler extends 
AbstractHandler<LiveMigrationFilesVerificationRequest> implements 
AccessProtected
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCancelFilesVerificationTaskHandler.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers.livemigration;
+
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import 
org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskNotFoundException;
+import org.apache.cassandra.sidecar.handlers.AbstractHandler;
+import org.apache.cassandra.sidecar.handlers.AccessProtected;
+import org.apache.cassandra.sidecar.livemigration.FilesVerificationTaskManager;
+import org.apache.cassandra.sidecar.livemigration.LiveMigrationTask;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.acl.authorization.BasicPermissions.DATA_COPY;
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+
+/**
+ * Handler for canceling an active live migration files verification task.
+ * Accepts a task ID and cancels the corresponding verification task on the 
specified host.
+ */
+public class LiveMigrationCancelFilesVerificationTaskHandler extends 
AbstractHandler<String> implements AccessProtected
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LiveMigrationCancelFilesVerificationTaskHandler.class);
+
+    private final FilesVerificationTaskManager taskManager;
+
+    /**
+     * Constructs a handler with the provided {@code metadataFetcher}
+     *
+     * @param metadataFetcher the interface to retrieve instance metadata
+     * @param executorPools   the executor pools for blocking executions
+     * @param validator       a validator instance to validate 
Cassandra-specific input
+     */
+    @Inject
+    public 
LiveMigrationCancelFilesVerificationTaskHandler(InstanceMetadataFetcher 
metadataFetcher,
+                                                           ExecutorPools 
executorPools,
+                                                           
CassandraInputValidator validator,
+                                                           
FilesVerificationTaskManager taskManager)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.taskManager = taskManager;
+    }
+
+    @Override
+    protected String extractParamsOrThrow(RoutingContext context)
+    {
+        return context.pathParam("taskId");

Review Comment:
   can we do some validation of this parameter here? I don't think it can be 
null, but can it be empty? Is there some parameter validation that can be done ?



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationConcurrencyLimitHandler.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers.livemigration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Handler;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.concurrent.ConcurrencyLimiter;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+
+/**
+ * Handler that enforces concurrency limits for live migration file operations.
+ * Returns HTTP 503 (SERVICE_UNAVAILABLE) when the maximum concurrent file 
requests limit is exceeded.
+ */
+@Singleton
+public class LiveMigrationConcurrencyLimitHandler implements 
Handler<RoutingContext>
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LiveMigrationConcurrencyLimitHandler.class);
+
+    private final ConcurrencyLimiter concurrencyLimiter;
+
+    @Inject
+    public LiveMigrationConcurrencyLimitHandler(SidecarConfiguration 
sidecarConfiguration)
+    {
+        this.concurrencyLimiter =
+        new ConcurrencyLimiter(() -> 
sidecarConfiguration.liveMigrationConfiguration().maxConcurrentFileRequests());
+    }
+
+    @Override
+    public void handle(RoutingContext rc)
+    {
+        if (!concurrencyLimiter.tryAcquire())
+        {
+            LOGGER.warn("Too many concurrent live migration file requests. 
Path={}", rc.request().path());
+            rc.fail(wrapHttpException(HttpResponseStatus.SERVICE_UNAVAILABLE,

Review Comment:
   Should the status code be a 429 instead? maybe we should follow the 
implementation of `org.apache.cassandra.sidecar.utils.FileStreamer#acquire` ?



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationDigestHandlerWrapper.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers.livemigration;
+
+import com.google.inject.Inject;
+import io.vertx.core.Handler;
+import io.vertx.ext.web.RoutingContext;
+import 
org.apache.cassandra.sidecar.common.request.LiveMigrationFileDigestRequest;
+
+import static 
org.apache.cassandra.sidecar.common.request.LiveMigrationFileDigestRequest.DIGEST_ALGORITHM_PARAM;
+
+/**
+ * Wrapper handler that conditionally delegates to {@link 
LiveMigrationFileDigestHandler} based on the presence
+ * of the {@link LiveMigrationFileDigestRequest#DIGEST_ALGORITHM_PARAM} query 
parameter, otherwise
+ * passes control to the next handler in the chain.
+ */
+public class LiveMigrationDigestHandlerWrapper implements 
Handler<RoutingContext>

Review Comment:
   ```suggestion
   @Singleton
   public class LiveMigrationDigestHandlerWrapper implements 
Handler<RoutingContext>
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFilesVerificationTaskFactory.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import 
org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest;
+import 
org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.utils.DigestVerifierFactory;
+import org.apache.cassandra.sidecar.utils.SidecarClientProvider;
+
+/**
+ * Factory for creating {@link LiveMigrationFilesVerificationTask} instances 
used to verify file digests
+ * between source and destination instances during live migration operation.
+ */
+@Singleton
+public class LiveMigrationFilesVerificationTaskFactory
+{
+    private final Vertx vertx;
+    private final SidecarClientProvider sidecarClientProvider;
+    private final ExecutorPools executorPools;
+    private final DigestVerifierFactory digestVerifierFactory;
+    private final SidecarConfiguration sidecarConfiguration;
+
+    @Inject
+    public LiveMigrationFilesVerificationTaskFactory(Vertx vertx,
+                                                     ExecutorPools 
executorPools,
+                                                     SidecarConfiguration 
sidecarConfiguration,
+                                                     SidecarClientProvider 
sidecarClientProvider,
+                                                     DigestVerifierFactory 
digestVerifierFactory)
+    {
+        this.vertx = vertx;
+        this.sidecarClientProvider = sidecarClientProvider;
+        this.executorPools = executorPools;
+        this.digestVerifierFactory = digestVerifierFactory;
+        this.sidecarConfiguration = sidecarConfiguration;
+    }
+
+    public LiveMigrationTask<LiveMigrationFilesVerificationResponse> 
create(String id,
+                                                                            
String source,
+                                                                            
int port,
+                                                                            
LiveMigrationFilesVerificationRequest request,
+                                                                            
InstanceMetadata localInstanceMetadata)
+    {
+        return LiveMigrationFilesVerificationTask.builder()
+                                                 .id(id)
+                                                 .source(source)
+                                                 .port(port)
+                                                 .vertx(vertx)
+                                                 .executorPools(executorPools)
+                                                 
.sidecarClient(sidecarClientProvider.get())

Review Comment:
   What's the side-effect of calling `sidecarClientProvider.get()`? I don't 
remember off the top of my head. Can you ensure that calling 
`sidecarClientProvider.get()` does not create a new client? We should reuse a 
single client throughout the application.



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateFilesVerificationTaskHandler.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers.livemigration;
+
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.json.DecodeException;
+import io.vertx.core.json.Json;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import 
org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest;
+import 
org.apache.cassandra.sidecar.common.response.LiveMigrationTaskCreationResponse;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationInvalidRequestException;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskInProgressException;
+import 
org.apache.cassandra.sidecar.exceptions.NoSuchCassandraInstanceException;
+import org.apache.cassandra.sidecar.handlers.AbstractHandler;
+import org.apache.cassandra.sidecar.handlers.AccessProtected;
+import org.apache.cassandra.sidecar.livemigration.FilesVerificationTaskManager;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.jetbrains.annotations.NotNull;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED;
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.CONFLICT;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE;
+import static 
org.apache.cassandra.sidecar.acl.authorization.BasicPermissions.DATA_COPY;
+import static 
org.apache.cassandra.sidecar.common.ApiEndpointsV1.LIVE_MIGRATION_FILES_VERIFICATION_TASKS_ROUTE;
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+
+/**
+ * HTTP handler for creating file digest verification tasks during live 
migration.
+ * Manages concurrent verification tasks per instance and orchestrates the 
verification process.
+ */
+public class LiveMigrationCreateFilesVerificationTaskHandler extends 
AbstractHandler<LiveMigrationFilesVerificationRequest> implements 
AccessProtected
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LiveMigrationCreateFilesVerificationTaskHandler.class);
+
+    private final FilesVerificationTaskManager filesVerificationTaskManager;
+    private final LiveMigrationMap liveMigrationMap;
+
+    /**
+     * Constructs a handler with the provided {@code metadataFetcher}
+     *
+     * @param metadataFetcher the interface to retrieve instance metadata
+     * @param executorPools   the executor pools for blocking executions
+     * @param validator       a validator instance to validate 
Cassandra-specific input
+     */
+    @Inject
+    protected 
LiveMigrationCreateFilesVerificationTaskHandler(InstanceMetadataFetcher 
metadataFetcher,
+                                                              ExecutorPools 
executorPools,
+                                                              
CassandraInputValidator validator,
+                                                              LiveMigrationMap 
liveMigrationMap,
+                                                              
FilesVerificationTaskManager filesVerificationTaskManager)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.liveMigrationMap = liveMigrationMap;
+        this.filesVerificationTaskManager = filesVerificationTaskManager;
+    }
+
+    @Override
+    protected LiveMigrationFilesVerificationRequest 
extractParamsOrThrow(RoutingContext context)
+    {
+        try
+        {
+            return Json.decodeValue(context.body().buffer(), 
LiveMigrationFilesVerificationRequest.class);
+        }
+        catch (DecodeException decodeException)
+        {
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST,
+                                    "Failed to parse request body, please 
ensure that the request is valid.",
+                                    decodeException);
+        }
+        catch (IllegalArgumentException e)
+        {
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, 
e.getMessage(), e);
+        }
+    }
+
+    @Override
+    protected void handleInternal(RoutingContext context,
+                                  HttpServerRequest httpRequest,
+                                  @NotNull String host,
+                                  SocketAddress remoteAddress,
+                                  LiveMigrationFilesVerificationRequest 
request)
+    {
+        LOGGER.debug("Received files verification request for host {} with 
maxConcurrency {}",

Review Comment:
   this log entry might be redundant, as we already have it in 
`org.apache.cassandra.sidecar.handlers.AbstractHandler#handle` where we log the 
request. I guess we just need to make sure that the `request` has a 
`toString()` method



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationFileDigestHandler.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers.livemigration;
+
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
+import 
org.apache.cassandra.sidecar.common.request.LiveMigrationFileDigestRequest;
+import org.apache.cassandra.sidecar.common.response.DigestResponse;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.handlers.AbstractHandler;
+import org.apache.cassandra.sidecar.handlers.AccessProtected;
+import org.apache.cassandra.sidecar.handlers.FileStreamHandler;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.DigestAlgorithm;
+import org.apache.cassandra.sidecar.utils.DigestAlgorithmFactory;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.jetbrains.annotations.NotNull;
+
+import static 
io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static 
org.apache.cassandra.sidecar.common.request.LiveMigrationFileDigestRequest.DIGEST_ALGORITHM_PARAM;
+import static 
org.apache.cassandra.sidecar.utils.AsyncFileDigestCalculator.calculateDigest;
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+
+/**
+ * Handler for calculating and returning file digests during live migration.
+ * Supported digest algorithms are determined by {@link 
DigestAlgorithmFactory} and specified
+ * via the {@link LiveMigrationFileDigestRequest#DIGEST_ALGORITHM_PARAM} query 
parameter.
+ */
+public class LiveMigrationFileDigestHandler extends 
AbstractHandler<DigestAlgorithm> implements AccessProtected
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LiveMigrationFileDigestHandler.class);
+
+    private final Vertx vertx;
+    private final DigestAlgorithmFactory digestAlgorithmFactory;
+
+    @Inject
+    public LiveMigrationFileDigestHandler(InstanceMetadataFetcher 
metadataFetcher,
+                                          ExecutorPools executorPools,
+                                          CassandraInputValidator validator,
+                                          Vertx vertx,
+                                          DigestAlgorithmFactory 
digestAlgorithmFactory)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.vertx = vertx;
+        this.digestAlgorithmFactory = digestAlgorithmFactory;
+    }
+
+    @Override
+    protected DigestAlgorithm extractParamsOrThrow(RoutingContext context)
+    {
+        String digestAlgorithmParam = getDigestAlgorithmParam(context);
+        try
+        {
+            return 
digestAlgorithmFactory.getDigestAlgorithm(digestAlgorithmParam, 0);
+        }
+        catch (IllegalArgumentException e)
+        {
+            LOGGER.error("Unexpected error while getting digest algorithm for 
{}", digestAlgorithmParam, e);
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, 
e.getMessage());
+        }
+    }
+
+    @Override
+    protected void handleInternal(RoutingContext context,
+                                  HttpServerRequest httpRequest,
+                                  @NotNull String host,
+                                  SocketAddress remoteAddress,
+                                  DigestAlgorithm digestAlgorithm)
+    {
+        String file = context.get(FileStreamHandler.FILE_PATH_CONTEXT_KEY);
+
+        if (file == null)
+        {
+            LOGGER.error("File path not found in context");
+            context.fail(wrapHttpException(INTERNAL_SERVER_ERROR, "File path 
not available"));
+            return;
+        }
+        calculateDigest(vertx, file, digestAlgorithm)
+        .onComplete(ar -> {

Review Comment:
   Why don't we re-use the existing machinery for digest verification? it feels 
like we are rewriting the same work instead of adjusting to the existing 
implementation.
   
   ```
           DigestVerifier digestVerifier = 
digestVerifierFactory.verifier(httpRequest.headers());
           digestVerifier.verify(file)
   ````



##########
client-common/src/test/java/org/apache/cassandra/sidecar/common/response/LiveMigrationFilesVerificationResponseTest.java:
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response;
+
+import org.junit.jupiter.api.Test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class LiveMigrationFilesVerificationResponseTest
+{
+    private final ObjectMapper mapper = new ObjectMapper();
+
+    @Test
+    void testSerializationRoundTrip() throws Exception
+    {
+        LiveMigrationFilesVerificationResponse original = new 
LiveMigrationFilesVerificationResponse(
+        "test-id-123",
+        "MD5",
+        "COMPLETED",
+        "192.168.1.100",
+        9042,
+        5,
+        10,
+        100,
+        15,
+        20,
+        3,
+        85
+        );
+
+        // Serialize to JSON
+        String json = mapper.writeValueAsString(original);
+
+        // Deserialize back to object
+        LiveMigrationFilesVerificationResponse deserialized =
+        mapper.readValue(json, LiveMigrationFilesVerificationResponse.class);
+
+        // Verify all fields match
+        assertThat(deserialized.id()).isEqualTo(original.id());
+        
assertThat(deserialized.digestAlgorithm()).isEqualTo(original.digestAlgorithm());
+        assertThat(deserialized.state()).isEqualTo(original.state());
+        assertThat(deserialized.source()).isEqualTo(original.source());
+        assertThat(deserialized.port()).isEqualTo(original.port());
+        
assertThat(deserialized.filesNotFoundAtSource()).isEqualTo(original.filesNotFoundAtSource());
+        
assertThat(deserialized.filesNotFoundAtDestination()).isEqualTo(original.filesNotFoundAtDestination());
+        
assertThat(deserialized.metadataMatched()).isEqualTo(original.metadataMatched());
+        
assertThat(deserialized.metadataMismatches()).isEqualTo(original.metadataMismatches());
+        
assertThat(deserialized.digestMismatches()).isEqualTo(original.digestMismatches());
+        
assertThat(deserialized.digestVerificationFailures()).isEqualTo(original.digestVerificationFailures());
+        
assertThat(deserialized.filesMatched()).isEqualTo(original.filesMatched());
+        
assertThat(deserialized.isVerificationSuccessful()).isEqualTo(original.isVerificationSuccessful());

Review Comment:
   Let's move to a helper method and reuse for other tests.



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationFileDigestHandler.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers.livemigration;
+
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
+import 
org.apache.cassandra.sidecar.common.request.LiveMigrationFileDigestRequest;
+import org.apache.cassandra.sidecar.common.response.DigestResponse;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.handlers.AbstractHandler;
+import org.apache.cassandra.sidecar.handlers.AccessProtected;
+import org.apache.cassandra.sidecar.handlers.FileStreamHandler;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.DigestAlgorithm;
+import org.apache.cassandra.sidecar.utils.DigestAlgorithmFactory;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.jetbrains.annotations.NotNull;
+
+import static 
io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static 
org.apache.cassandra.sidecar.common.request.LiveMigrationFileDigestRequest.DIGEST_ALGORITHM_PARAM;
+import static 
org.apache.cassandra.sidecar.utils.AsyncFileDigestCalculator.calculateDigest;
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+
+/**
+ * Handler for calculating and returning file digests during live migration.
+ * Supported digest algorithms are determined by {@link 
DigestAlgorithmFactory} and specified
+ * via the {@link LiveMigrationFileDigestRequest#DIGEST_ALGORITHM_PARAM} query 
parameter.
+ */
+public class LiveMigrationFileDigestHandler extends 
AbstractHandler<DigestAlgorithm> implements AccessProtected
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LiveMigrationFileDigestHandler.class);
+
+    private final Vertx vertx;
+    private final DigestAlgorithmFactory digestAlgorithmFactory;
+
+    @Inject
+    public LiveMigrationFileDigestHandler(InstanceMetadataFetcher 
metadataFetcher,
+                                          ExecutorPools executorPools,
+                                          CassandraInputValidator validator,
+                                          Vertx vertx,
+                                          DigestAlgorithmFactory 
digestAlgorithmFactory)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.vertx = vertx;
+        this.digestAlgorithmFactory = digestAlgorithmFactory;
+    }
+
+    @Override
+    protected DigestAlgorithm extractParamsOrThrow(RoutingContext context)
+    {
+        String digestAlgorithmParam = getDigestAlgorithmParam(context);
+        try
+        {
+            return 
digestAlgorithmFactory.getDigestAlgorithm(digestAlgorithmParam, 0);
+        }
+        catch (IllegalArgumentException e)
+        {
+            LOGGER.error("Unexpected error while getting digest algorithm for 
{}", digestAlgorithmParam, e);
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, 
e.getMessage());
+        }

Review Comment:
   I think it's redundant to handle the `IllegalArgumentException` here. This 
will be correctly handled on an exception here: 
`org.apache.cassandra.sidecar.handlers.AbstractHandler#processFailure`
   ```suggestion
           String digestAlgorithmParam = getDigestAlgorithmParam(context);
           return 
digestAlgorithmFactory.getDigestAlgorithm(digestAlgorithmParam, 0);
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskManager.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Centralized singleton manager for all live migration tasks across Cassandra 
instances.
+ * Enforces the constraint that only one {@link LiveMigrationTask} (of any 
type) can be active per instance at a time,
+ * preventing concurrent data copy and files verification operations that 
could lead to resource conflicts or data integrity issues.
+ * This singleton is shared across {@link DataCopyTaskManager} and {@link 
FilesVerificationTaskManager} to ensure
+ * proper coordination and mutual exclusion of tasks per instance.
+ */
+@Singleton
+public class LiveMigrationTaskManager
+{
+    @VisibleForTesting
+    final ConcurrentHashMap<Integer, LiveMigrationTask<?>> currentTasks = new 
ConcurrentHashMap<>();
+
+    private final InstancesMetadata instancesMetadata;
+
+    @Inject
+    public LiveMigrationTaskManager(InstancesMetadata instancesMetadata)
+    {
+        this.instancesMetadata = instancesMetadata;
+    }
+
+    /**
+     * Attempts to submit a new task for the specified instance.
+     * Only one task (of any type) can be active per instance at a time.
+     *
+     * @param instanceId the instance ID
+     * @param newTask    the task to submit
+     * @return true if the task was accepted, false if another task is already 
in progress
+     */
+    public boolean submitTask(int instanceId, LiveMigrationTask<?> newTask)
+    {
+        return currentTasks.compute(instanceId, (integer, taskInMap) -> {

Review Comment:
   NIT
   ```suggestion
           return currentTasks.compute(instanceId, (ignored, taskInMap) -> {
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskManager.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Centralized singleton manager for all live migration tasks across Cassandra 
instances.
+ * Enforces the constraint that only one {@link LiveMigrationTask} (of any 
type) can be active per instance at a time,
+ * preventing concurrent data copy and files verification operations that 
could lead to resource conflicts or data integrity issues.
+ * This singleton is shared across {@link DataCopyTaskManager} and {@link 
FilesVerificationTaskManager} to ensure
+ * proper coordination and mutual exclusion of tasks per instance.
+ */
+@Singleton
+public class LiveMigrationTaskManager
+{
+    @VisibleForTesting
+    final ConcurrentHashMap<Integer, LiveMigrationTask<?>> currentTasks = new 
ConcurrentHashMap<>();
+
+    private final InstancesMetadata instancesMetadata;
+
+    @Inject
+    public LiveMigrationTaskManager(InstancesMetadata instancesMetadata)
+    {
+        this.instancesMetadata = instancesMetadata;
+    }
+
+    /**
+     * Attempts to submit a new task for the specified instance.
+     * Only one task (of any type) can be active per instance at a time.
+     *
+     * @param instanceId the instance ID
+     * @param newTask    the task to submit
+     * @return true if the task was accepted, false if another task is already 
in progress
+     */
+    public boolean submitTask(int instanceId, LiveMigrationTask<?> newTask)
+    {
+        return currentTasks.compute(instanceId, (integer, taskInMap) -> {
+            if (taskInMap == null)
+            {
+                return newTask;
+            }
+
+            if (!taskInMap.isCompleted())
+            {
+                // Reject new task if existing task is still in progress
+                return taskInMap;
+            }
+            else
+            {
+                // Accept new task if existing task has completed
+                return newTask;
+            }
+        }) == newTask;
+    }
+
+    /**
+     * Returns all live migration tasks for given currentHost.
+     * This includes both active and completed tasks that haven't been 
replaced.
+     *
+     * @param currentHost the host where sidecar is running
+     * @return list containing at most one task (empty if no task has ever 
been submitted for this host)
+     */
+    @SuppressWarnings("ConstantValue")
+    public List<LiveMigrationTask<?>> getAllTasks(@NotNull String currentHost)
+    {
+        InstanceMetadata localInstance = 
instancesMetadata.instanceFromHost(currentHost);
+        if (localInstance == null)
+        {
+            throw new IllegalStateException("No instance found for host: " + 
currentHost);
+        }
+
+        LiveMigrationTask<?> task = currentTasks.get(localInstance.id());
+        return task == null ? Collections.emptyList() : 
Collections.singletonList(task);
+    }
+
+    /**
+     * Returns the live migration task with the specified task ID.
+     *
+     * @param taskId      ID of the task to retrieve
+     * @param currentHost the host where sidecar is running
+     * @return the LiveMigrationTask matching the given taskId
+     * @throws LiveMigrationExceptions.LiveMigrationTaskNotFoundException if 
no task found with the given ID
+     */
+    public LiveMigrationTask<?> getTask(@NotNull String taskId,
+                                        @NotNull String currentHost) throws 
LiveMigrationExceptions.LiveMigrationTaskNotFoundException
+    {
+        return getLiveMigrationTask(taskId, currentHost);
+    }
+
+    /**
+     * Cancels the live migration task with the specified task ID.
+     *
+     * @param taskId      ID of the task to cancel
+     * @param currentHost the host where sidecar is running
+     * @return the cancelled LiveMigrationTask
+     * @throws LiveMigrationExceptions.LiveMigrationTaskNotFoundException if 
no task found with the given ID
+     */
+    public LiveMigrationTask<?> cancelTask(@NotNull String taskId,
+                                           @NotNull String currentHost) throws 
LiveMigrationExceptions.LiveMigrationTaskNotFoundException
+    {
+        LiveMigrationTask<?> taskInProgress = getLiveMigrationTask(taskId, 
currentHost);
+
+        // Cancelling the task
+        taskInProgress.cancel();
+
+        return taskInProgress;
+    }
+
+    @SuppressWarnings("ConstantValue")
+    private LiveMigrationTask<?> getLiveMigrationTask(@NotNull String taskId, 
@NotNull String currentHost)
+    {
+        InstanceMetadata localInstance = 
instancesMetadata.instanceFromHost(currentHost);
+        if (localInstance == null)
+        {
+            throw new IllegalStateException("No instance found for host: " + 
currentHost);
+        }

Review Comment:
   not necessary to do a null check here
   ```suggestion
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationGetAllFilesVerificationTasksHandler.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers.livemigration;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.google.inject.Inject;
+import io.vertx.core.Handler;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import 
org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse;
+import org.apache.cassandra.sidecar.handlers.AccessProtected;
+import org.apache.cassandra.sidecar.livemigration.FilesVerificationTaskManager;
+import org.apache.cassandra.sidecar.livemigration.LiveMigrationTask;
+
+import static 
org.apache.cassandra.sidecar.acl.authorization.BasicPermissions.DATA_COPY;
+import static 
org.apache.cassandra.sidecar.handlers.AbstractHandler.extractHostAddressWithoutPort;
+
+/**
+ * Handler for retrieving all active live migration files verification tasks.
+ * Returns a list of all verification tasks running on the local host.
+ */
+public class LiveMigrationGetAllFilesVerificationTasksHandler implements 
Handler<RoutingContext>, AccessProtected

Review Comment:
   ```suggestion
   @Singleton
   public class LiveMigrationGetAllFilesVerificationTasksHandler implements 
Handler<RoutingContext>, AccessProtected
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationGetFilesVerificationTaskHandler.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers.livemigration;
+
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import 
org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskNotFoundException;
+import org.apache.cassandra.sidecar.handlers.AbstractHandler;
+import org.apache.cassandra.sidecar.handlers.AccessProtected;
+import org.apache.cassandra.sidecar.livemigration.FilesVerificationTaskManager;
+import org.apache.cassandra.sidecar.livemigration.LiveMigrationTask;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.acl.authorization.BasicPermissions.DATA_COPY;
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+
+/**
+ * Handler for retrieving a specific live migration files verification task by 
task ID.
+ * Returns the task details if found, or a 404 error if the task does not 
exist on the specified host.
+ */
+public class LiveMigrationGetFilesVerificationTaskHandler extends 
AbstractHandler<String> implements AccessProtected
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LiveMigrationGetFilesVerificationTaskHandler.class);
+
+    private final FilesVerificationTaskManager taskManager;
+
+    /**
+     * Constructs a handler with the provided {@code metadataFetcher}
+     *
+     * @param metadataFetcher the interface to retrieve instance metadata
+     * @param executorPools   the executor pools for blocking executions
+     * @param validator       a validator instance to validate 
Cassandra-specific input
+     */
+    @Inject
+    public 
LiveMigrationGetFilesVerificationTaskHandler(InstanceMetadataFetcher 
metadataFetcher,
+                                                        ExecutorPools 
executorPools,
+                                                        
CassandraInputValidator validator,
+                                                        
FilesVerificationTaskManager taskManager)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.taskManager = taskManager;
+    }
+
+    @Override
+    protected String extractParamsOrThrow(RoutingContext context)
+    {
+        String taskId = context.pathParam("taskId");
+        if (taskId == null || taskId.isEmpty())
+        {
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "taskId is 
required");
+        }
+
+        return taskId;
+    }
+
+    @Override
+    protected void handleInternal(RoutingContext context,
+                                  HttpServerRequest httpRequest,
+                                  @NotNull String host,
+                                  SocketAddress remoteAddress,
+                                  String taskId)
+    {
+        try
+        {
+            LiveMigrationTask<LiveMigrationFilesVerificationResponse> task = 
taskManager.getTask(taskId, host);
+            LOGGER.info("Found live migration task with taskId={} on host={}", 
taskId, host);

Review Comment:
   seems too much, can we lower to debug?



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskManager.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Centralized singleton manager for all live migration tasks across Cassandra 
instances.
+ * Enforces the constraint that only one {@link LiveMigrationTask} (of any 
type) can be active per instance at a time,
+ * preventing concurrent data copy and files verification operations that 
could lead to resource conflicts or data integrity issues.
+ * This singleton is shared across {@link DataCopyTaskManager} and {@link 
FilesVerificationTaskManager} to ensure
+ * proper coordination and mutual exclusion of tasks per instance.
+ */
+@Singleton
+public class LiveMigrationTaskManager
+{
+    @VisibleForTesting
+    final ConcurrentHashMap<Integer, LiveMigrationTask<?>> currentTasks = new 
ConcurrentHashMap<>();
+
+    private final InstancesMetadata instancesMetadata;
+
+    @Inject
+    public LiveMigrationTaskManager(InstancesMetadata instancesMetadata)
+    {
+        this.instancesMetadata = instancesMetadata;
+    }
+
+    /**
+     * Attempts to submit a new task for the specified instance.
+     * Only one task (of any type) can be active per instance at a time.
+     *
+     * @param instanceId the instance ID
+     * @param newTask    the task to submit
+     * @return true if the task was accepted, false if another task is already 
in progress
+     */
+    public boolean submitTask(int instanceId, LiveMigrationTask<?> newTask)
+    {
+        return currentTasks.compute(instanceId, (integer, taskInMap) -> {
+            if (taskInMap == null)
+            {
+                return newTask;
+            }
+
+            if (!taskInMap.isCompleted())
+            {
+                // Reject new task if existing task is still in progress
+                return taskInMap;
+            }
+            else
+            {
+                // Accept new task if existing task has completed
+                return newTask;
+            }
+        }) == newTask;
+    }
+
+    /**
+     * Returns all live migration tasks for given currentHost.
+     * This includes both active and completed tasks that haven't been 
replaced.
+     *
+     * @param currentHost the host where sidecar is running
+     * @return list containing at most one task (empty if no task has ever 
been submitted for this host)
+     */
+    @SuppressWarnings("ConstantValue")
+    public List<LiveMigrationTask<?>> getAllTasks(@NotNull String currentHost)
+    {
+        InstanceMetadata localInstance = 
instancesMetadata.instanceFromHost(currentHost);
+        if (localInstance == null)
+        {
+            throw new IllegalStateException("No instance found for host: " + 
currentHost);
+        }
+

Review Comment:
   this check is not required. instanceFromHost returns non-null results



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to