sarankk commented on code in PR #346:
URL: https://github.com/apache/cassandra-sidecar/pull/346#discussion_r3269368286
##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/restore/UpdateRestoreJobHandler.java:
##########
@@ -147,4 +154,22 @@ protected UpdateRestoreJobRequestPayload
extractParamsOrThrow(RoutingContext con
throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Invalid
request payload", decodeException);
}
}
+
+ private void notifyPhaseSignalMaybe(RestoreJob updatedJob)
+ {
+ if (updatedJob.status != RestoreJobStatus.IMPORT_READY &&
updatedJob.status != RestoreJobStatus.STAGE_READY)
Review Comment:
Do we want to process`STAGED` status update similarly as well? @yifan-c I
see you included this status as well
##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/restore/UpdateRestoreJobHandler.java:
##########
@@ -87,36 +91,39 @@ protected void handleInternal(RoutingContext context,
{
RoutingContextUtils
.getAsFuture(context, SC_RESTORE_JOB)
- .compose(job -> {
- if (job.status.isFinal())
+ .compose(existingJob -> {
+ if (existingJob.status.isFinal())
{
// skip the update, since the job is in the final state already
- logger.debug("The job has completed already. job={}", job);
+ logger.debug("The job has completed already. job={}",
existingJob);
return
Future.failedFuture(wrapHttpException(HttpResponseStatus.CONFLICT,
- "Job is already
in final state: " + job.status));
+ "Job is already
in final state: " + existingJob.status));
}
return executorPools.service()
- .executeBlocking(() ->
restoreJobDatabaseAccessor.update(requestPayload, job.jobId));
+ .executeBlocking(() ->
restoreJobDatabaseAccessor.update(requestPayload, existingJob));
})
- .onSuccess(job -> {
+ .onSuccess(updatedJob -> {
logger.info("Successfully updated restore job. job={}, request={},
remoteAddress={}, instance={}",
- job, requestPayload, remoteAddress, host);
- if (job.status == RestoreJobStatus.SUCCEEDED)
+ updatedJob, requestPayload, remoteAddress, host);
+ if (updatedJob.status == RestoreJobStatus.SUCCEEDED)
{
metrics.successfulJobs.metric.update(1);
- long startMillis = UUIDs.unixTimestamp(job.jobId);
+ long startMillis = UUIDs.unixTimestamp(updatedJob.jobId);
long durationMillis = System.currentTimeMillis() - startMillis;
// toNanos does not overflow. Nanos in `long` can at most
represent 106,751 days.
metrics.jobCompletionTime.metric.update(durationMillis,
TimeUnit.MILLISECONDS);
}
- if (job.secrets != null)
+ if (updatedJob.secrets != null)
{
metrics.tokenRefreshed.metric.update(1);
}
context.response().setStatusCode(HttpResponseStatus.OK.code()).end();
+ // Fire-and-forget on a worker thread — notifying the restore
system should not
+ // block the event loop or delay the HTTP response.
+ executorPools.service().runBlocking(() ->
notifyPhaseSignalMaybe(updatedJob));
Review Comment:
Let's use `executorPools.internal()` instead? since this is internal
handling and not client side facing
```suggestion
executorPools.internal().runBlocking(() ->
notifyPhaseSignalMaybe(updatedJob));
```
##########
server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java:
##########
@@ -628,6 +628,22 @@ Map<Integer, Map<UUID, RestoreJobStatus>> jobsByDay()
}
}
+ /**
+ * Immediately processes a restore job without waiting for the next
discovery loop iteration.
+ * Called by UpdateRestoreJobHandler after a phase signal (STAGE_READY) is
written to DB.
+ * This is safe to call concurrently with the discovery loop — the DB
write is the durable
+ * source of truth, and duplicate processing is deduplicated by existing
idempotency checks.
+ *
+ * @param restoreJob the restore job to process immediately
+ */
+ public void processJobNow(RestoreJob restoreJob)
+ {
+ initLocalDatacenterMaybe();
+ RestoreJobManagerGroup restoreJobManagers =
restoreJobManagerGroupSingleton.get();
+ restoreJobManagers.updateRestoreJob(restoreJob);
+ processSidecarManagedJobMaybe(restoreJob);
Review Comment:
fields within `JobIdsByDay` are updated from 2 different threads now. Also
we seem to have `isExecuting` check to avoid concurrent executions even within
`RestoreJobDiscoverer`. We should make `JobIdsByDay` thread safe. @yifan-c can
confirm if this is a concern
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]