sarankk commented on code in PR #346:
URL: https://github.com/apache/cassandra-sidecar/pull/346#discussion_r3269368286


##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/restore/UpdateRestoreJobHandler.java:
##########
@@ -147,4 +154,22 @@ protected UpdateRestoreJobRequestPayload 
extractParamsOrThrow(RoutingContext con
             throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Invalid 
request payload", decodeException);
         }
     }
+
+    private void notifyPhaseSignalMaybe(RestoreJob updatedJob)
+    {
+        if (updatedJob.status != RestoreJobStatus.IMPORT_READY && 
updatedJob.status != RestoreJobStatus.STAGE_READY)

Review Comment:
   Do we want to process`STAGED` status update similarly as well? @yifan-c I 
see you included this status as well 



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/restore/UpdateRestoreJobHandler.java:
##########
@@ -87,36 +91,39 @@ protected void handleInternal(RoutingContext context,
     {
         RoutingContextUtils
         .getAsFuture(context, SC_RESTORE_JOB)
-        .compose(job -> {
-            if (job.status.isFinal())
+        .compose(existingJob -> {
+            if (existingJob.status.isFinal())
             {
                 // skip the update, since the job is in the final state already
-                logger.debug("The job has completed already. job={}", job);
+                logger.debug("The job has completed already. job={}", 
existingJob);
                 return 
Future.failedFuture(wrapHttpException(HttpResponseStatus.CONFLICT,
-                                                             "Job is already 
in final state: " + job.status));
+                                                             "Job is already 
in final state: " + existingJob.status));
             }
 
             return executorPools.service()
-                                .executeBlocking(() -> 
restoreJobDatabaseAccessor.update(requestPayload, job.jobId));
+                                .executeBlocking(() -> 
restoreJobDatabaseAccessor.update(requestPayload, existingJob));
         })
-        .onSuccess(job -> {
+        .onSuccess(updatedJob -> {
             logger.info("Successfully updated restore job. job={}, request={}, 
remoteAddress={}, instance={}",
-                        job, requestPayload, remoteAddress, host);
-            if (job.status == RestoreJobStatus.SUCCEEDED)
+                        updatedJob, requestPayload, remoteAddress, host);
+            if (updatedJob.status == RestoreJobStatus.SUCCEEDED)
             {
                 metrics.successfulJobs.metric.update(1);
-                long startMillis = UUIDs.unixTimestamp(job.jobId);
+                long startMillis = UUIDs.unixTimestamp(updatedJob.jobId);
                 long durationMillis = System.currentTimeMillis() - startMillis;
                 // toNanos does not overflow. Nanos in `long` can at most 
represent 106,751 days.
                 metrics.jobCompletionTime.metric.update(durationMillis, 
TimeUnit.MILLISECONDS);
             }
 
-            if (job.secrets != null)
+            if (updatedJob.secrets != null)
             {
                 metrics.tokenRefreshed.metric.update(1);
             }
 
             
context.response().setStatusCode(HttpResponseStatus.OK.code()).end();
+            // Fire-and-forget on a worker thread — notifying the restore 
system should not
+            // block the event loop or delay the HTTP response.
+            executorPools.service().runBlocking(() -> 
notifyPhaseSignalMaybe(updatedJob));

Review Comment:
   Let's use `executorPools.internal()` instead? since this is internal 
handling and not client side facing 
   ```suggestion
               executorPools.internal().runBlocking(() -> 
notifyPhaseSignalMaybe(updatedJob));
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java:
##########
@@ -628,6 +628,22 @@ Map<Integer, Map<UUID, RestoreJobStatus>> jobsByDay()
         }
     }
 
+    /**
+     * Immediately processes a restore job without waiting for the next 
discovery loop iteration.
+     * Called by UpdateRestoreJobHandler after a phase signal (STAGE_READY) is 
written to DB.
+     * This is safe to call concurrently with the discovery loop — the DB 
write is the durable
+     * source of truth, and duplicate processing is deduplicated by existing 
idempotency checks.
+     *
+     * @param restoreJob the restore job to process immediately
+     */
+    public void processJobNow(RestoreJob restoreJob)
+    {
+        initLocalDatacenterMaybe();
+        RestoreJobManagerGroup restoreJobManagers = 
restoreJobManagerGroupSingleton.get();
+        restoreJobManagers.updateRestoreJob(restoreJob);
+        processSidecarManagedJobMaybe(restoreJob);

Review Comment:
   fields within `JobIdsByDay` are updated from 2 different threads now. Also 
we seem to have `isExecuting` check to avoid concurrent executions even within 
`RestoreJobDiscoverer`. We should make `JobIdsByDay` thread safe. @yifan-c can 
confirm if this is a concern



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to