This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e3a6dd01 CASSSIDECAR-255: Fix restore ranges of failed sidecar-managed 
restore jobs do not persist (#221)
e3a6dd01 is described below

commit e3a6dd010b1dbe347c6d1e19a14f5fc99b64fa84
Author: Yifan Cai <y...@apache.org>
AuthorDate: Fri Jun 6 12:47:18 2025 -0700

    CASSSIDECAR-255: Fix restore ranges of failed sidecar-managed restore jobs 
do not persist (#221)
    
    Patch by Yifan Cai; Reviewed by Francisco Guerrero, Saranya Krishnakumar 
for CASSSIDECAR-255
---
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/sidecar/db/RestoreRange.java  |  7 +++
 .../restore/RestoreJobConsistencyChecker.java      |  1 +
 .../sidecar/restore/RestoreJobDiscoverer.java      |  6 +-
 .../sidecar/restore/RestoreJobProgressTracker.java | 22 +++++--
 .../AuthenticationHandlerFactoryRegistryTest.java  |  3 +-
 .../cassandra/sidecar/db/RestoreSliceTest.java     |  5 +-
 .../sidecar/restore/RestoreJobDiscovererTest.java  | 68 +++++++++++++++++++++-
 .../sidecar/restore/RestoreRangeTest.java          | 12 ++++
 .../sidecar/utils/SidecarClientProviderTest.java   | 33 ++++++-----
 10 files changed, 131 insertions(+), 27 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 68144490..b7b76cb9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.2.0
 -----
+ * Fix restore ranges of failed sidecar-managed restore jobs do not persist 
(CASSSIDECAR-255)
  * Added endpoint for listing files for Live Migration (CASSSIDECAR-222)
  * Use List roles statement to check for super user status (CASSSIDECAR-252)
  * remove file:// from sidecar.logdir property (CASSSIDECAR-251)
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreRange.java 
b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreRange.java
index dcf21813..4b93b65c 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreRange.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreRange.java
@@ -312,6 +312,13 @@ public class RestoreRange
                                                                         this, 
null), this);
         }
 
+        RestoreJobFatalException failure = tracker.failureCause();
+        if (failure != null)
+        {
+            return 
RestoreRangeTask.failed(RestoreJobExceptions.ofFatal("Restore job has already 
failed due to prior failure",
+                                                                        this, 
failure), this);
+        }
+
         if (tracker.restoreJob().hasExpired(System.currentTimeMillis()))
         {
             return 
RestoreRangeTask.failed(RestoreJobExceptions.ofFatal("Restore job expired on " 
+ tracker.restoreJob().expireAt.toInstant(),
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobConsistencyChecker.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobConsistencyChecker.java
index b330f3d7..ae73dd73 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobConsistencyChecker.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobConsistencyChecker.java
@@ -100,6 +100,7 @@ public class RestoreJobConsistencyChecker
         RestoreJobProgressCollector collector = 
RestoreJobProgressCollectors.create(restoreJob, fetchPolicy);
         RestoreRangeStatus successCriteria = 
restoreJob.expectedNextRangeStatus();
         ConsistencyVerifier verifier = 
ConsistencyVerifiers.forConsistencyLevel(restoreJob.consistencyLevel, 
restoreJob.localDatacenter);
+        LOGGER.info("Checking restore job progress. jobId={} fetchPolicy={} 
successCriteria={}", restoreJob.jobId, fetchPolicy, successCriteria);
         Future<RestoreJobProgress> future = ringTopologyRefresher
                                             
.replicaByTokenRangeAsync(restoreJob)
                                             .compose(topology -> 
findRangesAndConclude(restoreJob, successCriteria, topology, verifier, 
collector));
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
index a2fe92d2..15e8673d 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java
@@ -474,10 +474,8 @@ public class RestoreJobDiscoverer implements PeriodicTask, 
RingTopologyChangeLis
         }
         catch (RestoreJobFatalException e)
         {
-            LOGGER.error("Restore range failed. startToken={} endToken={} 
instance={}",
-                         range.startToken(), range.endToken(), 
range.owner().host(), e);
-            range.fail(e);
-            restoreRangeDatabaseAccessor.updateStatus(range);
+            LOGGER.error("The restore job has already failed. jobId={} 
startToken={} endToken={} instance={}",
+                         job.jobId, range.startToken(), range.endToken(), 
range.owner().host(), e);
             return RestoreJobProgressTracker.Status.FAILED;
         }
     }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobProgressTracker.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobProgressTracker.java
index ee38c23f..5cb4d862 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobProgressTracker.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobProgressTracker.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.sidecar.db.RestoreRange;
 import org.apache.cassandra.sidecar.db.RestoreSlice;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.VisibleForTesting;
 
 /**
@@ -70,9 +71,12 @@ public class RestoreJobProgressTracker
      */
     Status trySubmit(RestoreRange range) throws RestoreJobFatalException
     {
-        // The job fails early, prevents further submissions
-        if (failureRef.get() != null)
-            throw failureRef.get();
+        // The job fails, prevents further range submissions for 
non-sidecar-managed jobs
+        // For sidecar-managed jobs, we want to submit the restore range, then 
fail.
+        // Such sumitted ranges will fail and handled in RestoreProcessor
+        RestoreJobFatalException failure = failureRef.get();
+        if (failure != null && !restoreJob.isManagedBySidecar())
+            throw failure;
 
         RestoreRange rangeWithTracker = range.unbuild()
                                              .restoreJobProgressTracker(this)
@@ -82,6 +86,10 @@ public class RestoreJobProgressTracker
         if (status == null)
         {
             processor.submit(rangeWithTracker);
+            // The failure exists. The range belongs to a sidecar-managed job;
+            // Still re-throw the exception to be compliant with the mehtod 
signature
+            if (failure != null)
+                throw failure;
             return Status.CREATED;
         }
 
@@ -142,7 +150,13 @@ public class RestoreJobProgressTracker
 
     public boolean isFailed()
     {
-        return failureRef.get() != null;
+        return failureCause() != null;
+    }
+
+    @Nullable
+    public RestoreJobFatalException failureCause()
+    {
+        return failureRef.get();
     }
 
     public void requestOutOfRangeDataCleanup()
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/acl/authentication/AuthenticationHandlerFactoryRegistryTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/acl/authentication/AuthenticationHandlerFactoryRegistryTest.java
index 31435e6b..10b938aa 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/acl/authentication/AuthenticationHandlerFactoryRegistryTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/acl/authentication/AuthenticationHandlerFactoryRegistryTest.java
@@ -45,7 +45,8 @@ class AuthenticationHandlerFactoryRegistryTest
         AuthenticationHandlerFactoryRegistry registry = new 
AuthenticationHandlerFactoryRegistry();
         IdentityToRoleCache identityToRoleCache = 
mock(IdentityToRoleCache.class);
         AdminIdentityResolver mockAdminIdentityResolver = 
mock(AdminIdentityResolver.class);
-        MutualTlsAuthenticationHandlerFactory 
mutualTlsAuthenticationHandlerFactory = new 
MutualTlsAuthenticationHandlerFactory(identityToRoleCache, 
mockAdminIdentityResolver);
+        MutualTlsAuthenticationHandlerFactory 
mutualTlsAuthenticationHandlerFactory = new 
MutualTlsAuthenticationHandlerFactory(identityToRoleCache,
+                                                                               
                                                 mockAdminIdentityResolver);
         registry.register(mutualTlsAuthenticationHandlerFactory);
         
assertThat(registry.getFactory(MutualTlsAuthenticationHandlerFactory.class.getName())).isNotNull();
     }
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/db/RestoreSliceTest.java 
b/server/src/test/java/org/apache/cassandra/sidecar/db/RestoreSliceTest.java
index 9ef82178..dd8a1fd3 100644
--- a/server/src/test/java/org/apache/cassandra/sidecar/db/RestoreSliceTest.java
+++ b/server/src/test/java/org/apache/cassandra/sidecar/db/RestoreSliceTest.java
@@ -33,7 +33,10 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-class RestoreSliceTest
+/**
+ * Tests RestoreSlice; and expose helpers to create RestoreSlice for testing
+ */
+public class RestoreSliceTest
 {
     @Test
     void testEquals()
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java
index ba6c15fe..d4c3c384 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java
@@ -36,15 +36,21 @@ import com.datastax.driver.core.LocalDate;
 import com.datastax.driver.core.utils.UUIDs;
 import io.vertx.core.Promise;
 import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.data.ConsistencyLevel;
 import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
 import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
 import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
 import org.apache.cassandra.sidecar.config.RestoreJobConfiguration;
 import org.apache.cassandra.sidecar.db.RestoreJob;
 import org.apache.cassandra.sidecar.db.RestoreJobDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.RestoreRange;
 import org.apache.cassandra.sidecar.db.RestoreRangeDatabaseAccessor;
 import org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.RestoreSliceTest;
 import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions;
 import org.apache.cassandra.sidecar.metrics.MetricRegistryFactory;
 import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
 import org.apache.cassandra.sidecar.metrics.SidecarMetricsImpl;
@@ -54,14 +60,21 @@ import 
org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
 import org.mockito.ArgumentCaptor;
 
 import static 
org.apache.cassandra.sidecar.db.RestoreJobTest.createNewTestingJob;
+import static org.apache.cassandra.sidecar.db.RestoreJobTest.createTestingJob;
 import static org.apache.cassandra.sidecar.db.RestoreJobTest.createUpdatedJob;
 import static org.apache.cassandra.sidecar.utils.TestMetricUtils.registry;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyShort;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 class RestoreJobDiscovererTest
@@ -75,6 +88,8 @@ class RestoreJobDiscovererTest
     private final RestoreJobManagerGroup mockManagers = 
mock(RestoreJobManagerGroup.class);
     private final PeriodicTaskExecutor executor = 
mock(PeriodicTaskExecutor.class);
     private final SidecarSchema sidecarSchema = mock(SidecarSchema.class);
+    private final RingTopologyRefresher ringTopologyRefresher = 
mock(RingTopologyRefresher.class);
+    private final InstanceMetadataFetcher instanceMetadataFetcher = 
mock(InstanceMetadataFetcher.class);
     private SidecarMetrics metrics;
     private RestoreJobDiscoverer loop;
 
@@ -91,8 +106,8 @@ class RestoreJobDiscovererTest
                                         mockSliceAccessor,
                                         mockRangeAccessor,
                                         () -> mockManagers,
-                                        null,
-                                        null,
+                                        instanceMetadataFetcher,
+                                        ringTopologyRefresher,
                                         executor,
                                         metrics);
     }
@@ -252,6 +267,55 @@ class RestoreJobDiscovererTest
         assertThat(abortedJobs.getAllValues()).isEqualTo(expectedAbortedJobs);
     }
 
+    @Test
+    void testDiscoverSidecarManagedJob() throws Exception
+    {
+        UUID jobId = discoverSidecarManagedJob(false);
+
+        ArgumentCaptor<RestoreRange> restoreRangeCaptor = 
ArgumentCaptor.forClass(RestoreRange.class);
+        verify(mockRangeAccessor).create(restoreRangeCaptor.capture());
+        assertThat(restoreRangeCaptor.getAllValues()).hasSize(1);
+        RestoreRange captured = restoreRangeCaptor.getValue();
+        assertThat(captured.jobId()).isEqualTo(jobId);
+    }
+
+    @Test
+    void testDisciverAlreadyFailedSidecarManagedJob() throws Exception
+    {
+        discoverSidecarManagedJob(true);
+        verify(mockRangeAccessor, never()).create(any());
+    }
+
+    private UUID discoverSidecarManagedJob(boolean isJobFailed) throws 
Exception
+    {
+        when(sidecarSchema.isInitialized()).thenReturn(true);
+        UUID jobId = UUIDs.timeBased();
+        RestoreJob sidecarManagedJob = createTestingJob(jobId, 
RestoreJobStatus.STAGE_READY, ConsistencyLevel.QUORUM);
+        assertThat(sidecarManagedJob.isManagedBySidecar()).isTrue();
+
+        when(mockJobAccessor.findAllRecent(anyLong(), anyInt()))
+        .thenReturn(Collections.singletonList(sidecarManagedJob));
+        when(ringTopologyRefresher.localTokenRanges(any(), anyBoolean()))
+        .thenReturn(Collections.singletonMap(1, Collections.singleton(new 
TokenRange(0, 100))));
+        InstanceMetadata instance = mock(InstanceMetadata.class);
+        when(instance.stagingDir()).thenReturn("stagingDir");
+        when(instanceMetadataFetcher.instance(anyInt())).thenReturn(instance);
+        when(mockSliceAccessor.selectByJobByBucketByTokenRange(any(), 
anyShort(), any()))
+        
.thenReturn(Collections.singletonList(RestoreSliceTest.createTestingSlice(sidecarManagedJob,
 "sliceId", 0, 10)));
+        if (isJobFailed)
+        {
+            doThrow(RestoreJobExceptions.ofFatal("Job failed", 
mock(RestoreRange.class), null))
+            .when(mockManagers).trySubmit(any(), any(), any());
+        }
+        else
+        {
+            when(mockManagers.trySubmit(any(), any(), 
any())).thenReturn(RestoreJobProgressTracker.Status.CREATED);
+        }
+
+        executeBlocking();
+        return jobId;
+    }
+
     @Test
     void testWhenJobShouldBeLogged()
     {
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreRangeTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreRangeTest.java
index 092fc1e0..5440e3bc 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreRangeTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreRangeTest.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.sidecar.db.RestoreJob;
 import org.apache.cassandra.sidecar.db.RestoreJobTest;
 import org.apache.cassandra.sidecar.db.RestoreRange;
 import org.apache.cassandra.sidecar.db.RestoreSlice;
+import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions;
 import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
 
 import static 
org.apache.cassandra.sidecar.common.server.data.RestoreRangeStatus.DISCARDED;
@@ -112,6 +113,17 @@ public class RestoreRangeTest
                             "RestoreRange{sliceId='sliceId-123', 
sliceKey='myKey', sliceBucket='myBucket'}");
     }
 
+    @Test
+    void testCreateTaskFailsWhenJobFailsAlready() throws Exception
+    {
+        RestoreRange range = createTestRange();
+        // simulate that the belong job has already failed
+        range.trackerUnsafe().fail(RestoreJobExceptions.ofFatal("Job fails", 
range, null));
+        RestoreRangeHandler handler = createRestoreRangeHandler(range);
+        assertFailedHandler(range, handler,
+                            "Restore job has already failed due to prior 
failure");
+    }
+
     @Test
     void testCreateTaskFailsWhenMissingSourceSlice() throws Exception
     {
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/utils/SidecarClientProviderTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/utils/SidecarClientProviderTest.java
index 9926a7d8..6ef55976 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/utils/SidecarClientProviderTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/utils/SidecarClientProviderTest.java
@@ -84,29 +84,32 @@ class SidecarClientProviderTest
     @BeforeAll
     static void configureCertificates() throws Exception
     {
-        CertificateBundle certificateAuthority = new 
CertificateBuilder().subject("CN=Apache Cassandra Root CA, OU=Certification 
Authority, O=Unknown, C=Unknown")
-                                                                         
.alias("fakerootca")
-                                                                         
.isCertificateAuthority(true)
-                                                                         
.buildSelfSigned();
+        CertificateBundle certificateAuthority = new CertificateBuilder()
+                                                 .subject("CN=Apache Cassandra 
Root CA, OU=Certification Authority, O=Unknown, C=Unknown")
+                                                 .alias("fakerootca")
+                                                 .isCertificateAuthority(true)
+                                                 .buildSelfSigned();
         truststorePath = certificateAuthority.toTempKeyStorePath(secretsPath, 
EMPTY_PASSWORD, EMPTY_PASSWORD);
 
-        CertificateBuilder serverKeyStoreBuilder =
-        new CertificateBuilder().subject("CN=Apache Cassandra, OU=mtls_test, 
O=Unknown, L=Unknown, ST=Unknown, C=Unknown")
-                                .addSanDnsName("localhost");
+        CertificateBuilder serverKeyStoreBuilder = new CertificateBuilder()
+                                                   .subject("CN=Apache 
Cassandra, OU=mtls_test, O=Unknown, L=Unknown, ST=Unknown, C=Unknown")
+                                                   .addSanDnsName("localhost");
         CertificateBundle serverKeyStore = 
serverKeyStoreBuilder.buildIssuedBy(certificateAuthority);
         serverKeyStorePath = serverKeyStore.toTempKeyStorePath(secretsPath, 
EMPTY_PASSWORD, EMPTY_PASSWORD);
 
-        CertificateBundle expiredClientKeyStore = new 
CertificateBuilder().subject("CN=Apache Cassandra, OU=mtls_test, O=Unknown, 
L=Unknown, ST=Unknown, C=Unknown")
-                                                                          
.addSanDnsName("localhost")
-                                                                          
.notBefore(Instant.now().minus(7, ChronoUnit.DAYS))
-                                                                          
.notAfter(Instant.now().minus(1, ChronoUnit.DAYS))
-                                                                          
.buildIssuedBy(certificateAuthority);
+        CertificateBundle expiredClientKeyStore = new CertificateBuilder()
+                                                  .subject("CN=Apache 
Cassandra, OU=mtls_test, O=Unknown, L=Unknown, ST=Unknown, C=Unknown")
+                                                  .addSanDnsName("localhost")
+                                                  
.notBefore(Instant.now().minus(7, ChronoUnit.DAYS))
+                                                  
.notAfter(Instant.now().minus(1, ChronoUnit.DAYS))
+                                                  
.buildIssuedBy(certificateAuthority);
         // Assign the expired client cert to the cert path
         clientCertPath = expiredClientKeyStore.toTempKeyStorePath(secretsPath, 
EMPTY_PASSWORD, EMPTY_PASSWORD);
 
-        CertificateBundle validClientKeyStore = new 
CertificateBuilder().subject("CN=Apache Cassandra, OU=mtls_test, O=Unknown, 
L=Unknown, ST=Unknown, C=Unknown")
-                                                                        
.addSanDnsName("localhost")
-                                                                        
.buildIssuedBy(certificateAuthority);
+        CertificateBundle validClientKeyStore = new CertificateBuilder()
+                                                .subject("CN=Apache Cassandra, 
OU=mtls_test, O=Unknown, L=Unknown, ST=Unknown, C=Unknown")
+                                                .addSanDnsName("localhost")
+                                                
.buildIssuedBy(certificateAuthority);
         validClientCertPath = 
validClientKeyStore.toTempKeyStorePath(secretsPath, EMPTY_PASSWORD, 
EMPTY_PASSWORD);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to