yifan-c commented on code in PR #346: URL: https://github.com/apache/cassandra-sidecar/pull/346#discussion_r3252320203
########## integration-tests/src/integrationTest/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererPhaseSignalIntTest.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.restore; + +import java.math.BigInteger; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import org.junit.jupiter.api.Test; + +import com.google.inject.AbstractModule; +import org.apache.cassandra.distributed.api.ICluster; +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.sidecar.common.data.RestoreJobStatus; +import org.apache.cassandra.sidecar.common.request.data.CreateSliceRequestPayload; +import org.apache.cassandra.sidecar.common.request.data.UpdateRestoreJobRequestPayload; +import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName; +import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; +import org.apache.cassandra.sidecar.config.yaml.RestoreJobConfigurationImpl; +import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl; +import org.apache.cassandra.sidecar.db.RestoreRange; +import org.apache.cassandra.sidecar.db.RestoreRangeDatabaseAccessor; +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase; + +import static org.apache.cassandra.sidecar.restore.RestoreJobTestUtils.createJob; +import static org.apache.cassandra.sidecar.restore.RestoreJobTestUtils.disableRestoreProcessor; +import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Verifies that {@code STAGE_READY} / {@code IMPORT_READY} phase signals delivered via + * the {@code UpdateRestoreJobHandler} REST endpoint trigger immediate processing on the + * receiving Sidecar instance, without waiting for the next discovery loop cycle. + * + * <p>Both discovery loop delays are pinned to 1h via {@link #configurationOverrides()}, + * so any successful observation of restore ranges within seconds can only be the work of + * the wake-up path added by CASSSIDECAR-454 — not the periodic discovery loop. + */ +class RestoreJobDiscovererPhaseSignalIntTest extends SharedClusterSidecarIntegrationTestBase Review Comment: 👍 ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/restore/UpdateRestoreJobHandler.java: ########## @@ -87,36 +91,40 @@ protected void handleInternal(RoutingContext context, { RoutingContextUtils .getAsFuture(context, SC_RESTORE_JOB) - .compose(job -> { - if (job.status.isFinal()) + .compose(existingJob -> { + if (existingJob.status.isFinal()) { // skip the update, since the job is in the final state already - logger.debug("The job has completed already. job={}", job); + logger.debug("The job has completed already. job={}", existingJob); return Future.failedFuture(wrapHttpException(HttpResponseStatus.CONFLICT, - "Job is already in final state: " + job.status)); + "Job is already in final state: " + existingJob.status)); } return executorPools.service() - .executeBlocking(() -> restoreJobDatabaseAccessor.update(requestPayload, job.jobId)); - }) - .onSuccess(job -> { - logger.info("Successfully updated restore job. job={}, request={}, remoteAddress={}, instance={}", - job, requestPayload, remoteAddress, host); - if (job.status == RestoreJobStatus.SUCCEEDED) - { - metrics.successfulJobs.metric.update(1); - long startMillis = UUIDs.unixTimestamp(job.jobId); - long durationMillis = System.currentTimeMillis() - startMillis; - // toNanos does not overflow. Nanos in `long` can at most represent 106,751 days. - metrics.jobCompletionTime.metric.update(durationMillis, TimeUnit.MILLISECONDS); - } + .executeBlocking(() -> restoreJobDatabaseAccessor.update(requestPayload, existingJob.jobId)) + .onSuccess(updatedJob -> { + logger.info("Successfully updated restore job. job={}, request={}, remoteAddress={}, instance={}", + updatedJob, requestPayload, remoteAddress, host); + if (updatedJob.status == RestoreJobStatus.SUCCEEDED) + { + metrics.successfulJobs.metric.update(1); + long startMillis = UUIDs.unixTimestamp(updatedJob.jobId); + long durationMillis = System.currentTimeMillis() - startMillis; + // toNanos does not overflow. Nanos in `long` can at most represent 106,751 days. + metrics.jobCompletionTime.metric.update(durationMillis, TimeUnit.MILLISECONDS); + } - if (job.secrets != null) - { - metrics.tokenRefreshed.metric.update(1); - } + if (updatedJob.secrets != null) + { + metrics.tokenRefreshed.metric.update(1); + } - context.response().setStatusCode(HttpResponseStatus.OK.code()).end(); + context.response().setStatusCode(HttpResponseStatus.OK.code()).end(); + // Fire-and-forget on a worker thread — notifying the restore system should not + // block the event loop or delay the HTTP response. + executorPools.service() + .runBlocking(() -> notifyPhaseSignalMaybe(existingJob, updatedJob.status)); + }); Review Comment: The refactoring should work. But, I'd like to suggest the change in `RestoreJobDatabaseAccessor` (see below). Basically, adding a new update override that takes `RestoreJob existingJob`. In the method, it returns an updated `existingJob` after successfully persisted to the database. Please give it a try and see if it looks cleaner. ```java /** * Update fields in the restore job and persist * * @param payload fields to be updated * @param jobId job ID * @return the restore job object with only the updated fields * @throws DataObjectMappingException when secrets json cannot be serialized */ public RestoreJob update(UpdateRestoreJobRequestPayload payload, UUID jobId) throws DataObjectMappingException { sidecarSchema.ensureInitialized(); return update(payload, jobId, RestoreJob.builder()); } /** * Update fields in the restore job and persist * * @param payload fields to be updated * @param existingJob existing restore job to be updated * @return the restore job object built from the existingJob and the updated fields * @throws DataObjectMappingException when secrets json cannot be serialized */ public RestoreJob update(UpdateRestoreJobRequestPayload payload, RestoreJob existingJob) throws DataObjectMappingException { sidecarSchema.ensureInitialized(); return update(payload, existingJob.jobId, existingJob.unbuild()); } private RestoreJob update(UpdateRestoreJobRequestPayload payload, UUID jobId, RestoreJob.Builder updateBuilder) throws DataObjectMappingException { LocalDate createdAt = RestoreJob.toLocalDate(jobId); updateBuilder.createdAt(createdAt) .jobId(jobId); RestoreJobSecrets secrets = payload.secrets(); RestoreJobStatus status = payload.status(); String jobAgent = payload.jobAgent(); Date expireAt = payload.expireAtAsDate(); Long sliceCount = payload.sliceCount(); // all updates are going to the same partition. We use unlogged explicitly. // Cassandra internally combine those updates into the same mutation. BatchStatement batchStatement = new BatchStatement(BatchStatement.Type.UNLOGGED); ByteBuffer wrappedSecrets; if (secrets != null) { try { byte[] secretBytes = MAPPER.writeValueAsBytes(secrets); wrappedSecrets = ByteBuffer.wrap(secretBytes); batchStatement.add(tableSchema.updateBlobSecrets() .bind(createdAt, jobId, wrappedSecrets)); } catch (JsonProcessingException e) { throw new DataObjectMappingException("Failed to serialize secrets", e); } updateBuilder.jobSecrets(secrets); } if (status != null) { batchStatement.add(tableSchema.updateStatus().bind(createdAt, jobId, status.name())); updateBuilder.jobStatus(status); } if (jobAgent != null) { batchStatement.add(tableSchema.updateJobAgent().bind(createdAt, jobId, jobAgent)); updateBuilder.jobAgent(jobAgent); } if (expireAt != null) { batchStatement.add(tableSchema.updateExpireAt().bind(createdAt, jobId, expireAt)); updateBuilder.expireAt(expireAt); } if (sliceCount != null) { batchStatement.add(tableSchema.updateSliceCount().bind(createdAt, jobId, sliceCount)); updateBuilder.sliceCount(sliceCount); } execute(batchStatement); return updateBuilder.build(); } ``` ########## integration-tests/src/integrationTest/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererPhaseSignalIntTest.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.restore; + +import java.math.BigInteger; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import org.junit.jupiter.api.Test; + +import com.google.inject.AbstractModule; +import org.apache.cassandra.distributed.api.ICluster; +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.sidecar.common.data.RestoreJobStatus; +import org.apache.cassandra.sidecar.common.request.data.CreateSliceRequestPayload; +import org.apache.cassandra.sidecar.common.request.data.UpdateRestoreJobRequestPayload; +import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName; +import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; +import org.apache.cassandra.sidecar.config.yaml.RestoreJobConfigurationImpl; +import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl; +import org.apache.cassandra.sidecar.db.RestoreRange; +import org.apache.cassandra.sidecar.db.RestoreRangeDatabaseAccessor; +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase; + +import static org.apache.cassandra.sidecar.restore.RestoreJobTestUtils.createJob; +import static org.apache.cassandra.sidecar.restore.RestoreJobTestUtils.disableRestoreProcessor; +import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Verifies that {@code STAGE_READY} / {@code IMPORT_READY} phase signals delivered via + * the {@code UpdateRestoreJobHandler} REST endpoint trigger immediate processing on the + * receiving Sidecar instance, without waiting for the next discovery loop cycle. + * + * <p>Both discovery loop delays are pinned to 1h via {@link #configurationOverrides()}, + * so any successful observation of restore ranges within seconds can only be the work of + * the wake-up path added by CASSSIDECAR-454 — not the periodic discovery loop. + */ +class RestoreJobDiscovererPhaseSignalIntTest extends SharedClusterSidecarIntegrationTestBase +{ + private static final QualifiedName USER_KEYSPACE_TABLE = new QualifiedName("restore_phase_signal_ks", "t"); + private static final QualifiedTableName SIDECAR_QUALIFIED_TABLE = + new QualifiedTableName(USER_KEYSPACE_TABLE.keyspace(), USER_KEYSPACE_TABLE.table()); + + @Override + protected Function<SidecarConfigurationImpl.Builder, SidecarConfigurationImpl.Builder> configurationOverrides() + { + return builder -> builder.restoreJobConfiguration( + RestoreJobConfigurationImpl.builder() + // Pin both discovery loops to 1h. Any range created within seconds + // is necessarily the wake-up path and not the discovery loop. + .jobDiscoveryActiveLoopDelay(MillisecondBoundConfiguration.parse("1h")) + .jobDiscoveryIdleLoopDelay(MillisecondBoundConfiguration.parse("1h")) + .build()); + } + + @Override + protected void startSidecar(ICluster<? extends IInstance> cluster) throws InterruptedException + { + // Disable the RestoreProcessor so range submission stops at the database write, + // letting the test assert on RestoreRangeDatabaseAccessor without S3/import side effects. + serverWrapper = startSidecarWithInstances(cluster, (AbstractModule) disableRestoreProcessor()); + } + + @Override + protected void initializeSchemaForTest() + { + createTestKeyspace(USER_KEYSPACE_TABLE, Map.of("datacenter1", 1)); + createTestTable(USER_KEYSPACE_TABLE, "CREATE TABLE %s (id text PRIMARY KEY, name text);"); + } + + @Override + protected void beforeTestStart() + { + waitForSchemaReady(30, TimeUnit.SECONDS); + } + + @Test + void testStageReadyImmediatelySubmitsSlices() + { + RestoreJobTestUtils.RestoreJobClient testClient = restoreJobClient(); + UUID jobId = createJob(testClient, SIDECAR_QUALIFIED_TABLE); + short bucketId = 0; + CreateSliceRequestPayload slicePayload = new CreateSliceRequestPayload( + "sliceId-stage", bucketId, "bucket", "key", "checksum", + BigInteger.valueOf(1L), BigInteger.valueOf(1500L), 100L, 100L); + testClient.createRestoreSlice(SIDECAR_QUALIFIED_TABLE, jobId, slicePayload); + + RestoreRangeDatabaseAccessor rangeAccessor = + serverWrapper.injector.getInstance(RestoreRangeDatabaseAccessor.class); + assertThat(rangeAccessor.findAll(jobId, bucketId)).isEmpty(); + + testClient.updateRestoreJob(SIDECAR_QUALIFIED_TABLE, jobId, + UpdateRestoreJobRequestPayload.builder() + .withStatus(RestoreJobStatus.STAGE_READY) + .build()); + + loopAssert(10, 500, () -> assertThat(rangeAccessor.findAll(jobId, bucketId)) + .describedAs("STAGE_READY should immediately create restore ranges via the wake-up path") + .isNotEmpty()); + } + + @Test + void testStageReadyCreatesCorrectTokenRanges() + { + RestoreJobTestUtils.RestoreJobClient testClient = restoreJobClient(); + UUID jobId = createJob(testClient, SIDECAR_QUALIFIED_TABLE); + short bucketId = 0; + CreateSliceRequestPayload slicePayload = new CreateSliceRequestPayload( + "sliceId-tokens", bucketId, "bucket", "key", "checksum", + BigInteger.valueOf(500L), BigInteger.valueOf(1500L), 100L, 100L); + testClient.createRestoreSlice(SIDECAR_QUALIFIED_TABLE, jobId, slicePayload); + + testClient.updateRestoreJob(SIDECAR_QUALIFIED_TABLE, jobId, + UpdateRestoreJobRequestPayload.builder() + .withStatus(RestoreJobStatus.STAGE_READY) + .build()); + + RestoreRangeDatabaseAccessor rangeAccessor = + serverWrapper.injector.getInstance(RestoreRangeDatabaseAccessor.class); + loopAssert(10, 500, () -> { + List<RestoreRange> ranges = rangeAccessor.findAll(jobId, bucketId); + assertThat(ranges).describedAs("STAGE_READY should produce trimmed ranges").isNotEmpty(); + for (RestoreRange range : ranges) + { + assertThat(range.startToken()).isNotNull(); + assertThat(range.endToken()).isNotNull(); Review Comment: Assert on the actual value, instead of not null. Start token should be 499 (as ranges have exclusive start end) End token should be 1500. ########## integration-tests/src/integrationTest/org/apache/cassandra/sidecar/restore/RestoreJobDiscovererPhaseSignalIntTest.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.restore; + +import java.math.BigInteger; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import org.junit.jupiter.api.Test; + +import com.google.inject.AbstractModule; +import org.apache.cassandra.distributed.api.ICluster; +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.sidecar.common.data.RestoreJobStatus; +import org.apache.cassandra.sidecar.common.request.data.CreateSliceRequestPayload; +import org.apache.cassandra.sidecar.common.request.data.UpdateRestoreJobRequestPayload; +import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName; +import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; +import org.apache.cassandra.sidecar.config.yaml.RestoreJobConfigurationImpl; +import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl; +import org.apache.cassandra.sidecar.db.RestoreRange; +import org.apache.cassandra.sidecar.db.RestoreRangeDatabaseAccessor; +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase; + +import static org.apache.cassandra.sidecar.restore.RestoreJobTestUtils.createJob; +import static org.apache.cassandra.sidecar.restore.RestoreJobTestUtils.disableRestoreProcessor; +import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Verifies that {@code STAGE_READY} / {@code IMPORT_READY} phase signals delivered via + * the {@code UpdateRestoreJobHandler} REST endpoint trigger immediate processing on the + * receiving Sidecar instance, without waiting for the next discovery loop cycle. + * + * <p>Both discovery loop delays are pinned to 1h via {@link #configurationOverrides()}, + * so any successful observation of restore ranges within seconds can only be the work of + * the wake-up path added by CASSSIDECAR-454 — not the periodic discovery loop. + */ +class RestoreJobDiscovererPhaseSignalIntTest extends SharedClusterSidecarIntegrationTestBase +{ + private static final QualifiedName USER_KEYSPACE_TABLE = new QualifiedName("restore_phase_signal_ks", "t"); + private static final QualifiedTableName SIDECAR_QUALIFIED_TABLE = + new QualifiedTableName(USER_KEYSPACE_TABLE.keyspace(), USER_KEYSPACE_TABLE.table()); + + @Override + protected Function<SidecarConfigurationImpl.Builder, SidecarConfigurationImpl.Builder> configurationOverrides() + { + return builder -> builder.restoreJobConfiguration( + RestoreJobConfigurationImpl.builder() + // Pin both discovery loops to 1h. Any range created within seconds + // is necessarily the wake-up path and not the discovery loop. + .jobDiscoveryActiveLoopDelay(MillisecondBoundConfiguration.parse("1h")) + .jobDiscoveryIdleLoopDelay(MillisecondBoundConfiguration.parse("1h")) + .build()); + } + + @Override + protected void startSidecar(ICluster<? extends IInstance> cluster) throws InterruptedException + { + // Disable the RestoreProcessor so range submission stops at the database write, + // letting the test assert on RestoreRangeDatabaseAccessor without S3/import side effects. + serverWrapper = startSidecarWithInstances(cluster, (AbstractModule) disableRestoreProcessor()); + } + + @Override + protected void initializeSchemaForTest() + { + createTestKeyspace(USER_KEYSPACE_TABLE, Map.of("datacenter1", 1)); + createTestTable(USER_KEYSPACE_TABLE, "CREATE TABLE %s (id text PRIMARY KEY, name text);"); + } + + @Override + protected void beforeTestStart() + { + waitForSchemaReady(30, TimeUnit.SECONDS); + } + + @Test + void testStageReadyImmediatelySubmitsSlices() + { + RestoreJobTestUtils.RestoreJobClient testClient = restoreJobClient(); + UUID jobId = createJob(testClient, SIDECAR_QUALIFIED_TABLE); + short bucketId = 0; + CreateSliceRequestPayload slicePayload = new CreateSliceRequestPayload( + "sliceId-stage", bucketId, "bucket", "key", "checksum", + BigInteger.valueOf(1L), BigInteger.valueOf(1500L), 100L, 100L); + testClient.createRestoreSlice(SIDECAR_QUALIFIED_TABLE, jobId, slicePayload); + + RestoreRangeDatabaseAccessor rangeAccessor = + serverWrapper.injector.getInstance(RestoreRangeDatabaseAccessor.class); + assertThat(rangeAccessor.findAll(jobId, bucketId)).isEmpty(); + + testClient.updateRestoreJob(SIDECAR_QUALIFIED_TABLE, jobId, + UpdateRestoreJobRequestPayload.builder() + .withStatus(RestoreJobStatus.STAGE_READY) + .build()); + + loopAssert(10, 500, () -> assertThat(rangeAccessor.findAll(jobId, bucketId)) + .describedAs("STAGE_READY should immediately create restore ranges via the wake-up path") + .isNotEmpty()); + } + + @Test + void testStageReadyCreatesCorrectTokenRanges() Review Comment: This test seems to be largely the same with `testStageReadyImmediatelySubmitsSlices`, can we merge them and add the token value assertion in the `loopAssert` lambda? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

