pauloricardomg commented on code in PR #339:
URL: https://github.com/apache/cassandra-sidecar/pull/339#discussion_r3237449717


##########
server/src/main/java/org/apache/cassandra/sidecar/job/storage/StorageProvider.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job.storage;
+
+import java.io.Closeable;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A provider-agnostic storage abstraction for durable operational job state.
+ * Each {@code StorageProvider} instance is scoped to a single cluster. The 
cluster
+ * identity is configuration, not a per-call parameter.
+ * <p>
+ * This interface defines a data access pattern for persisting, modifying, and 
querying OperationalJobs.
+ * Higher-level coordination logic, such as clearing the active operation lock 
when a job reaches
+ * a terminal state, belongs in the layers that depend on this interface. 
+ */
+public interface StorageProvider extends Closeable
+{
+    // --- Operational Job Storage ---
+
+    /**
+     * Persist a new operational job record.
+     * <p>
+     * Implementations must provide upsert semantics as a retry safety net: if 
called again with the
+     * same job ID (e.g., due to a network timeout where the caller does not 
know if the first write
+     * succeeded), the existing record should be overwritten rather than 
throwing.
+     * <p>
+     * Implementations should throw {@link StorageProviderException} on write 
failure.
+     *
+     * @param job the job record to store
+     */
+    void persistJob(OperationalJobRecord job);
+
+    /**
+     * Find a job by its ID.
+     *
+     * @param jobId the job identifier
+     * @return the job record, or {@code null} if not found
+     */
+    @Nullable
+    OperationalJobRecord findJob(UUID jobId);
+
+    /**
+     * Update the status of an existing job.
+     * <p>
+     * Implementations should throw {@link StorageProviderException} on write 
failure.
+     *
+     * @param jobId         the job identifier
+     * @param operationType the operation type (e.g. "restart")
+     * @param status        the new status
+     */
+    void updateJobStatus(UUID jobId, String operationType, 
OperationalJobStatus status);
+
+    /**
+     * Retrieve stored job records, up to the specified limit.

Review Comment:
   nit: Retrieve stored job records by descending time order



##########
server/src/main/java/org/apache/cassandra/sidecar/db/ActiveClusterOpsDatabaseAccessor.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.db.schema.ActiveClusterOpsSchema;
+import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * Database accessor for the {@code active_cluster_ops} table.
+ * Provides mutual exclusion for active operations via lightweight 
transactions (LWT).
+ */
+@Singleton
+public class ActiveClusterOpsDatabaseAccessor extends 
DatabaseAccessor<ActiveClusterOpsSchema>
+{
+    @Inject
+    public ActiveClusterOpsDatabaseAccessor(SidecarSchema sidecarSchema, 
CQLSessionProvider sessionProvider)
+    {
+        this(sidecarSchema.tableSchema(ActiveClusterOpsSchema.class), 
sessionProvider);
+    }
+
+    @VisibleForTesting
+    public ActiveClusterOpsDatabaseAccessor(ActiveClusterOpsSchema schema, 
CQLSessionProvider sessionProvider)
+    {
+        super(schema, sessionProvider);
+    }
+
+    public boolean trySetActiveOperation(String clusterName, String 
operationType, UUID operationId)
+    {
+        BoundStatement statement = 
tableSchema.trySetActive().bind(clusterName, operationType, operationId);
+        statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+        statement.setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL);
+        ResultSet resultSet = execute(statement);
+        return resultSet.wasApplied();
+    }
+
+    @Nullable
+    public UUID getActiveOperation(String clusterName, String operationType)
+    {
+        BoundStatement statement = 
tableSchema.getActiveByType().bind(clusterName, operationType);
+        statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+        ResultSet resultSet = execute(statement);
+        Row row = resultSet.one();
+        return row == null ? null : row.getUUID("operation_id");
+    }
+
+    @NotNull
+    public Map<String, UUID> getActiveOperations(String clusterName)
+    {
+        BoundStatement statement = tableSchema.getActive().bind(clusterName);
+        statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+        ResultSet resultSet = execute(statement);
+        Map<String, UUID> activeOps = new HashMap<>();
+        for (Row row : resultSet)
+        {
+            activeOps.put(row.getString("operation_type"), 
row.getUUID("operation_id"));
+        }
+        return activeOps;
+    }
+
+    public boolean clearActiveOperation(String clusterName, String 
operationType, UUID operationId)
+    {
+        BoundStatement statement = tableSchema.clearActive().bind(clusterName, 
operationType, operationId);
+        statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+        statement.setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL);

Review Comment:
   Shouldn't we be using global SERIAL to ensure the active operation will be 
consistent across all DCs?



##########
server/src/main/java/org/apache/cassandra/sidecar/db/ActiveClusterOpsDatabaseAccessor.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.db.schema.ActiveClusterOpsSchema;
+import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * Database accessor for the {@code active_cluster_ops} table.
+ * Provides mutual exclusion for active operations via lightweight 
transactions (LWT).
+ */
+@Singleton
+public class ActiveClusterOpsDatabaseAccessor extends 
DatabaseAccessor<ActiveClusterOpsSchema>
+{
+    @Inject
+    public ActiveClusterOpsDatabaseAccessor(SidecarSchema sidecarSchema, 
CQLSessionProvider sessionProvider)
+    {
+        this(sidecarSchema.tableSchema(ActiveClusterOpsSchema.class), 
sessionProvider);
+    }
+
+    @VisibleForTesting
+    public ActiveClusterOpsDatabaseAccessor(ActiveClusterOpsSchema schema, 
CQLSessionProvider sessionProvider)
+    {
+        super(schema, sessionProvider);
+    }
+
+    public boolean trySetActiveOperation(String clusterName, String 
operationType, UUID operationId)
+    {
+        BoundStatement statement = 
tableSchema.trySetActive().bind(clusterName, operationType, operationId);
+        statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+        statement.setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL);

Review Comment:
   Shouldn't we be using global `SERIAL` to ensure the active operation will be 
consistent across all DCs? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to