This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push: new e3a6dd01 CASSSIDECAR-255: Fix restore ranges of failed sidecar-managed restore jobs do not persist (#221) e3a6dd01 is described below commit e3a6dd010b1dbe347c6d1e19a14f5fc99b64fa84 Author: Yifan Cai <y...@apache.org> AuthorDate: Fri Jun 6 12:47:18 2025 -0700 CASSSIDECAR-255: Fix restore ranges of failed sidecar-managed restore jobs do not persist (#221) Patch by Yifan Cai; Reviewed by Francisco Guerrero, Saranya Krishnakumar for CASSSIDECAR-255 --- CHANGES.txt | 1 + .../apache/cassandra/sidecar/db/RestoreRange.java | 7 +++ .../restore/RestoreJobConsistencyChecker.java | 1 + .../sidecar/restore/RestoreJobDiscoverer.java | 6 +- .../sidecar/restore/RestoreJobProgressTracker.java | 22 +++++-- .../AuthenticationHandlerFactoryRegistryTest.java | 3 +- .../cassandra/sidecar/db/RestoreSliceTest.java | 5 +- .../sidecar/restore/RestoreJobDiscovererTest.java | 68 +++++++++++++++++++++- .../sidecar/restore/RestoreRangeTest.java | 12 ++++ .../sidecar/utils/SidecarClientProviderTest.java | 33 ++++++----- 10 files changed, 131 insertions(+), 27 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 68144490..b7b76cb9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 0.2.0 ----- + * Fix restore ranges of failed sidecar-managed restore jobs do not persist (CASSSIDECAR-255) * Added endpoint for listing files for Live Migration (CASSSIDECAR-222) * Use List roles statement to check for super user status (CASSSIDECAR-252) * remove file:// from sidecar.logdir property (CASSSIDECAR-251) diff --git a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreRange.java b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreRange.java index dcf21813..4b93b65c 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreRange.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/db/RestoreRange.java @@ -312,6 +312,13 @@ public class RestoreRange this, null), this); } + RestoreJobFatalException failure = tracker.failureCause(); + if (failure != null) + { + return RestoreRangeTask.failed(RestoreJobExceptions.ofFatal("Restore job has already failed due to prior failure", + this, failure), this); + } + if (tracker.restoreJob().hasExpired(System.currentTimeMillis())) { return RestoreRangeTask.failed(RestoreJobExceptions.ofFatal("Restore job expired on " + tracker.restoreJob().expireAt.toInstant(), diff --git a/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobConsistencyChecker.java b/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobConsistencyChecker.java index b330f3d7..ae73dd73 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobConsistencyChecker.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobConsistencyChecker.java @@ -100,6 +100,7 @@ public class RestoreJobConsistencyChecker RestoreJobProgressCollector collector = RestoreJobProgressCollectors.create(restoreJob, fetchPolicy); RestoreRangeStatus successCriteria = restoreJob.expectedNextRangeStatus(); ConsistencyVerifier verifier = ConsistencyVerifiers.forConsistencyLevel(restoreJob.consistencyLevel, restoreJob.localDatacenter); + LOGGER.info("Checking restore job progress. jobId={} fetchPolicy={} successCriteria={}", restoreJob.jobId, fetchPolicy, successCriteria); Future<RestoreJobProgress> future = ringTopologyRefresher .replicaByTokenRangeAsync(restoreJob) .compose(topology -> findRangesAndConclude(restoreJob, successCriteria, topology, verifier, collector)); diff --git a/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java b/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java index a2fe92d2..15e8673d 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java @@ -474,10 +474,8 @@ public class RestoreJobDiscoverer implements PeriodicTask, RingTopologyChangeLis } catch (RestoreJobFatalException e) { - LOGGER.error("Restore range failed. startToken={} endToken={} instance={}", - range.startToken(), range.endToken(), range.owner().host(), e); - range.fail(e); - restoreRangeDatabaseAccessor.updateStatus(range); + LOGGER.error("The restore job has already failed. jobId={} startToken={} endToken={} instance={}", + job.jobId, range.startToken(), range.endToken(), range.owner().host(), e); return RestoreJobProgressTracker.Status.FAILED; } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobProgressTracker.java b/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobProgressTracker.java index ee38c23f..5cb4d862 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobProgressTracker.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobProgressTracker.java @@ -39,6 +39,7 @@ import org.apache.cassandra.sidecar.db.RestoreRange; import org.apache.cassandra.sidecar.db.RestoreSlice; import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.VisibleForTesting; /** @@ -70,9 +71,12 @@ public class RestoreJobProgressTracker */ Status trySubmit(RestoreRange range) throws RestoreJobFatalException { - // The job fails early, prevents further submissions - if (failureRef.get() != null) - throw failureRef.get(); + // The job fails, prevents further range submissions for non-sidecar-managed jobs + // For sidecar-managed jobs, we want to submit the restore range, then fail. + // Such sumitted ranges will fail and handled in RestoreProcessor + RestoreJobFatalException failure = failureRef.get(); + if (failure != null && !restoreJob.isManagedBySidecar()) + throw failure; RestoreRange rangeWithTracker = range.unbuild() .restoreJobProgressTracker(this) @@ -82,6 +86,10 @@ public class RestoreJobProgressTracker if (status == null) { processor.submit(rangeWithTracker); + // The failure exists. The range belongs to a sidecar-managed job; + // Still re-throw the exception to be compliant with the mehtod signature + if (failure != null) + throw failure; return Status.CREATED; } @@ -142,7 +150,13 @@ public class RestoreJobProgressTracker public boolean isFailed() { - return failureRef.get() != null; + return failureCause() != null; + } + + @Nullable + public RestoreJobFatalException failureCause() + { + return failureRef.get(); } public void requestOutOfRangeDataCleanup() diff --git a/server/src/test/java/org/apache/cassandra/sidecar/acl/authentication/AuthenticationHandlerFactoryRegistryTest.java b/server/src/test/java/org/apache/cassandra/sidecar/acl/authentication/AuthenticationHandlerFactoryRegistryTest.java index 31435e6b..10b938aa 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/acl/authentication/AuthenticationHandlerFactoryRegistryTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/acl/authentication/AuthenticationHandlerFactoryRegistryTest.java @@ -45,7 +45,8 @@ class AuthenticationHandlerFactoryRegistryTest AuthenticationHandlerFactoryRegistry registry = new AuthenticationHandlerFactoryRegistry(); IdentityToRoleCache identityToRoleCache = mock(IdentityToRoleCache.class); AdminIdentityResolver mockAdminIdentityResolver = mock(AdminIdentityResolver.class); - MutualTlsAuthenticationHandlerFactory mutualTlsAuthenticationHandlerFactory = new MutualTlsAuthenticationHandlerFactory(identityToRoleCache, mockAdminIdentityResolver); + MutualTlsAuthenticationHandlerFactory mutualTlsAuthenticationHandlerFactory = new MutualTlsAuthenticationHandlerFactory(identityToRoleCache, + mockAdminIdentityResolver); registry.register(mutualTlsAuthenticationHandlerFactory); assertThat(registry.getFactory(MutualTlsAuthenticationHandlerFactory.class.getName())).isNotNull(); } diff --git a/server/src/test/java/org/apache/cassandra/sidecar/db/RestoreSliceTest.java b/server/src/test/java/org/apache/cassandra/sidecar/db/RestoreSliceTest.java index 9ef82178..dd8a1fd3 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/db/RestoreSliceTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/db/RestoreSliceTest.java @@ -33,7 +33,10 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -class RestoreSliceTest +/** + * Tests RestoreSlice; and expose helpers to create RestoreSlice for testing + */ +public class RestoreSliceTest { @Test void testEquals() diff --git a/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java b/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java index ba6c15fe..d4c3c384 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererTest.java @@ -36,15 +36,21 @@ import com.datastax.driver.core.LocalDate; import com.datastax.driver.core.utils.UUIDs; import io.vertx.core.Promise; import org.apache.cassandra.sidecar.TestModule; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.data.ConsistencyLevel; import org.apache.cassandra.sidecar.common.data.RestoreJobStatus; +import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange; import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; import org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration; import org.apache.cassandra.sidecar.config.RestoreJobConfiguration; import org.apache.cassandra.sidecar.db.RestoreJob; import org.apache.cassandra.sidecar.db.RestoreJobDatabaseAccessor; +import org.apache.cassandra.sidecar.db.RestoreRange; import org.apache.cassandra.sidecar.db.RestoreRangeDatabaseAccessor; import org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor; +import org.apache.cassandra.sidecar.db.RestoreSliceTest; import org.apache.cassandra.sidecar.db.schema.SidecarSchema; +import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions; import org.apache.cassandra.sidecar.metrics.MetricRegistryFactory; import org.apache.cassandra.sidecar.metrics.SidecarMetrics; import org.apache.cassandra.sidecar.metrics.SidecarMetricsImpl; @@ -54,14 +60,21 @@ import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; import org.mockito.ArgumentCaptor; import static org.apache.cassandra.sidecar.db.RestoreJobTest.createNewTestingJob; +import static org.apache.cassandra.sidecar.db.RestoreJobTest.createTestingJob; import static org.apache.cassandra.sidecar.db.RestoreJobTest.createUpdatedJob; import static org.apache.cassandra.sidecar.utils.TestMetricUtils.registry; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyShort; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; class RestoreJobDiscovererTest @@ -75,6 +88,8 @@ class RestoreJobDiscovererTest private final RestoreJobManagerGroup mockManagers = mock(RestoreJobManagerGroup.class); private final PeriodicTaskExecutor executor = mock(PeriodicTaskExecutor.class); private final SidecarSchema sidecarSchema = mock(SidecarSchema.class); + private final RingTopologyRefresher ringTopologyRefresher = mock(RingTopologyRefresher.class); + private final InstanceMetadataFetcher instanceMetadataFetcher = mock(InstanceMetadataFetcher.class); private SidecarMetrics metrics; private RestoreJobDiscoverer loop; @@ -91,8 +106,8 @@ class RestoreJobDiscovererTest mockSliceAccessor, mockRangeAccessor, () -> mockManagers, - null, - null, + instanceMetadataFetcher, + ringTopologyRefresher, executor, metrics); } @@ -252,6 +267,55 @@ class RestoreJobDiscovererTest assertThat(abortedJobs.getAllValues()).isEqualTo(expectedAbortedJobs); } + @Test + void testDiscoverSidecarManagedJob() throws Exception + { + UUID jobId = discoverSidecarManagedJob(false); + + ArgumentCaptor<RestoreRange> restoreRangeCaptor = ArgumentCaptor.forClass(RestoreRange.class); + verify(mockRangeAccessor).create(restoreRangeCaptor.capture()); + assertThat(restoreRangeCaptor.getAllValues()).hasSize(1); + RestoreRange captured = restoreRangeCaptor.getValue(); + assertThat(captured.jobId()).isEqualTo(jobId); + } + + @Test + void testDisciverAlreadyFailedSidecarManagedJob() throws Exception + { + discoverSidecarManagedJob(true); + verify(mockRangeAccessor, never()).create(any()); + } + + private UUID discoverSidecarManagedJob(boolean isJobFailed) throws Exception + { + when(sidecarSchema.isInitialized()).thenReturn(true); + UUID jobId = UUIDs.timeBased(); + RestoreJob sidecarManagedJob = createTestingJob(jobId, RestoreJobStatus.STAGE_READY, ConsistencyLevel.QUORUM); + assertThat(sidecarManagedJob.isManagedBySidecar()).isTrue(); + + when(mockJobAccessor.findAllRecent(anyLong(), anyInt())) + .thenReturn(Collections.singletonList(sidecarManagedJob)); + when(ringTopologyRefresher.localTokenRanges(any(), anyBoolean())) + .thenReturn(Collections.singletonMap(1, Collections.singleton(new TokenRange(0, 100)))); + InstanceMetadata instance = mock(InstanceMetadata.class); + when(instance.stagingDir()).thenReturn("stagingDir"); + when(instanceMetadataFetcher.instance(anyInt())).thenReturn(instance); + when(mockSliceAccessor.selectByJobByBucketByTokenRange(any(), anyShort(), any())) + .thenReturn(Collections.singletonList(RestoreSliceTest.createTestingSlice(sidecarManagedJob, "sliceId", 0, 10))); + if (isJobFailed) + { + doThrow(RestoreJobExceptions.ofFatal("Job failed", mock(RestoreRange.class), null)) + .when(mockManagers).trySubmit(any(), any(), any()); + } + else + { + when(mockManagers.trySubmit(any(), any(), any())).thenReturn(RestoreJobProgressTracker.Status.CREATED); + } + + executeBlocking(); + return jobId; + } + @Test void testWhenJobShouldBeLogged() { diff --git a/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreRangeTest.java b/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreRangeTest.java index 092fc1e0..5440e3bc 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreRangeTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/restore/RestoreRangeTest.java @@ -39,6 +39,7 @@ import org.apache.cassandra.sidecar.db.RestoreJob; import org.apache.cassandra.sidecar.db.RestoreJobTest; import org.apache.cassandra.sidecar.db.RestoreRange; import org.apache.cassandra.sidecar.db.RestoreSlice; +import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions; import org.apache.cassandra.sidecar.metrics.SidecarMetrics; import static org.apache.cassandra.sidecar.common.server.data.RestoreRangeStatus.DISCARDED; @@ -112,6 +113,17 @@ public class RestoreRangeTest "RestoreRange{sliceId='sliceId-123', sliceKey='myKey', sliceBucket='myBucket'}"); } + @Test + void testCreateTaskFailsWhenJobFailsAlready() throws Exception + { + RestoreRange range = createTestRange(); + // simulate that the belong job has already failed + range.trackerUnsafe().fail(RestoreJobExceptions.ofFatal("Job fails", range, null)); + RestoreRangeHandler handler = createRestoreRangeHandler(range); + assertFailedHandler(range, handler, + "Restore job has already failed due to prior failure"); + } + @Test void testCreateTaskFailsWhenMissingSourceSlice() throws Exception { diff --git a/server/src/test/java/org/apache/cassandra/sidecar/utils/SidecarClientProviderTest.java b/server/src/test/java/org/apache/cassandra/sidecar/utils/SidecarClientProviderTest.java index 9926a7d8..6ef55976 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/utils/SidecarClientProviderTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/utils/SidecarClientProviderTest.java @@ -84,29 +84,32 @@ class SidecarClientProviderTest @BeforeAll static void configureCertificates() throws Exception { - CertificateBundle certificateAuthority = new CertificateBuilder().subject("CN=Apache Cassandra Root CA, OU=Certification Authority, O=Unknown, C=Unknown") - .alias("fakerootca") - .isCertificateAuthority(true) - .buildSelfSigned(); + CertificateBundle certificateAuthority = new CertificateBuilder() + .subject("CN=Apache Cassandra Root CA, OU=Certification Authority, O=Unknown, C=Unknown") + .alias("fakerootca") + .isCertificateAuthority(true) + .buildSelfSigned(); truststorePath = certificateAuthority.toTempKeyStorePath(secretsPath, EMPTY_PASSWORD, EMPTY_PASSWORD); - CertificateBuilder serverKeyStoreBuilder = - new CertificateBuilder().subject("CN=Apache Cassandra, OU=mtls_test, O=Unknown, L=Unknown, ST=Unknown, C=Unknown") - .addSanDnsName("localhost"); + CertificateBuilder serverKeyStoreBuilder = new CertificateBuilder() + .subject("CN=Apache Cassandra, OU=mtls_test, O=Unknown, L=Unknown, ST=Unknown, C=Unknown") + .addSanDnsName("localhost"); CertificateBundle serverKeyStore = serverKeyStoreBuilder.buildIssuedBy(certificateAuthority); serverKeyStorePath = serverKeyStore.toTempKeyStorePath(secretsPath, EMPTY_PASSWORD, EMPTY_PASSWORD); - CertificateBundle expiredClientKeyStore = new CertificateBuilder().subject("CN=Apache Cassandra, OU=mtls_test, O=Unknown, L=Unknown, ST=Unknown, C=Unknown") - .addSanDnsName("localhost") - .notBefore(Instant.now().minus(7, ChronoUnit.DAYS)) - .notAfter(Instant.now().minus(1, ChronoUnit.DAYS)) - .buildIssuedBy(certificateAuthority); + CertificateBundle expiredClientKeyStore = new CertificateBuilder() + .subject("CN=Apache Cassandra, OU=mtls_test, O=Unknown, L=Unknown, ST=Unknown, C=Unknown") + .addSanDnsName("localhost") + .notBefore(Instant.now().minus(7, ChronoUnit.DAYS)) + .notAfter(Instant.now().minus(1, ChronoUnit.DAYS)) + .buildIssuedBy(certificateAuthority); // Assign the expired client cert to the cert path clientCertPath = expiredClientKeyStore.toTempKeyStorePath(secretsPath, EMPTY_PASSWORD, EMPTY_PASSWORD); - CertificateBundle validClientKeyStore = new CertificateBuilder().subject("CN=Apache Cassandra, OU=mtls_test, O=Unknown, L=Unknown, ST=Unknown, C=Unknown") - .addSanDnsName("localhost") - .buildIssuedBy(certificateAuthority); + CertificateBundle validClientKeyStore = new CertificateBuilder() + .subject("CN=Apache Cassandra, OU=mtls_test, O=Unknown, L=Unknown, ST=Unknown, C=Unknown") + .addSanDnsName("localhost") + .buildIssuedBy(certificateAuthority); validClientCertPath = validClientKeyStore.toTempKeyStorePath(secretsPath, EMPTY_PASSWORD, EMPTY_PASSWORD); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org