This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 08657484 CASSSIDECAR-220: Add CdcRawDirectorySpaceCleaner for tracking 
and cleaning up the cdc_… (#203)
08657484 is described below

commit 08657484d8d280721e557612bc6ab1af7a2a0424
Author: jberragan <jberra...@gmail.com>
AuthorDate: Wed Apr 9 15:41:15 2025 -0700

    CASSSIDECAR-220: Add CdcRawDirectorySpaceCleaner for tracking and cleaning 
up the cdc_… (#203)
    
    Patch by James Berragan; Reviewed by Bernardo Botella, Jyothsna Konisa, 
Yifan Cai for CASSSIDECAR-220
---
 .../cassandra/sidecar/config/CdcConfiguration.java |  40 +-
 .../sidecar/config/yaml/CdcConfigurationImpl.java  | 188 ++++++++-
 .../sidecar/db/SystemViewsDatabaseAccessor.java    |  91 ++++
 .../sidecar/db/schema/SystemViewsSchema.java       |  68 +++
 .../cassandra/sidecar/metrics/CdcMetrics.java      |  66 +++
 .../cassandra/sidecar/metrics/ServerMetrics.java   |   5 +
 .../sidecar/metrics/ServerMetricsImpl.java         |   8 +
 .../cassandra/sidecar/modules/CdcModule.java       |  16 +
 .../modules/multibindings/PeriodicTaskMapKeys.java |   1 +
 .../modules/multibindings/TableSchemaMapKeys.java  |   1 +
 .../sidecar/tasks/CdcRawDirectorySpaceCleaner.java | 469 +++++++++++++++++++++
 .../apache/cassandra/sidecar/utils/CdcUtil.java    |  36 +-
 .../apache/cassandra/sidecar/utils/FileUtils.java  |  67 +++
 .../cassandra/sidecar/utils/TimeProvider.java      |  21 +-
 .../db/SystemViewsDatabaseAccessorIntTest.java     |  45 ++
 .../cassandra/sidecar/db/SidecarSchemaTest.java    |   4 +-
 .../tasks/CdcRawDirectorySpaceCleanerTest.java     | 188 +++++++++
 .../cassandra/sidecar/utils/CdcUtilTest.java       |  84 ++++
 .../cassandra/sidecar/utils/FileUtilsTest.java     |  24 ++
 19 files changed, 1416 insertions(+), 6 deletions(-)

diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/config/CdcConfiguration.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/config/CdcConfiguration.java
index a18b7735..314b1f15 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/config/CdcConfiguration.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/config/CdcConfiguration.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.sidecar.config;
 
 import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
+
 import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
 
 /**
@@ -31,14 +32,49 @@ public interface CdcConfiguration
     SecondBoundConfiguration segmentHardLinkCacheExpiry();
 
     /**
-     *
      * @return true if cdc feature is enabled
      */
     boolean isEnabled();
 
     /**
-     *
      * @return how frequently CDC configs are to be refreshed
      */
     MillisecondBoundConfiguration cdcConfigRefreshTime();
+
+    /* CdcRawDirectorySpaceCleaner Configuration */
+
+    /**
+     * @return the cadence at which the CdcRawDirectorySpaceCleaner period 
task should run to check and clean-up old `cdc_raw` log segments.
+     */
+    SecondBoundConfiguration cdcRawDirectorySpaceCleanerFrequency();
+
+    /**
+     * @return `true` if CdcRawDirectorySpaceCleaner should monitor the 
`cdc_raw` directory and clean up the oldest commit log segments.
+     */
+    boolean enableCdcRawDirectoryRoutineCleanUp();
+
+    /**
+     * @return fallback value for maximum directory size in bytes for the 
`cdc_raw` directory when can't be read from `system_views.settings` table.
+     */
+    long fallbackCdcRawDirectoryMaxSizeBytes();
+
+    /**
+     * @return max percent usage of the cdc_raw directory before 
CdcRawDirectorySpaceCleaner starts removing the oldest segments.
+     */
+    float cdcRawDirectoryMaxPercentUsage();
+
+    /**
+     * @return the critical time period in seconds that indicates the 
`cdc_raw` directory is not large enough to buffer this time-window of mutations.
+     */
+    SecondBoundConfiguration cdcRawDirectoryCriticalBufferWindow();
+
+    /**
+     * @return the low time period in seconds that indicates the `cdc_raw` 
directory is not large enough to buffer this time-window of mutations.
+     */
+    SecondBoundConfiguration cdcRawDirectoryLowBufferWindow();
+
+    /**
+     * @return the time period which the CdcRawDirectorySpaceCleaner should 
cache the cdc_total_space before refreshing.
+     */
+    SecondBoundConfiguration cacheMaxUsage();
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CdcConfigurationImpl.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CdcConfigurationImpl.java
index 0121e4ec..089a7ac0 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CdcConfigurationImpl.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CdcConfigurationImpl.java
@@ -41,6 +41,27 @@ public class CdcConfigurationImpl implements CdcConfiguration
             MillisecondBoundConfiguration.parse("30s");
     public static final SecondBoundConfiguration 
DEFAULT_SEGMENT_HARD_LINK_CACHE_EXPIRY =
             SecondBoundConfiguration.parse("5m");
+    public static final String CDC_RAW_CLEANER_FREQUENCY_PROPERTY = 
"cdc_raw_cleaner_frequency";
+    public static final SecondBoundConfiguration 
DEFAULT_CDC_RAW_CLEANER_FREQUENCY =
+    SecondBoundConfiguration.parse("1m");
+
+    public static final String ENABLE_CDC_RAW_CLEANER_PROPERTY = 
"enable_cdc_raw_cleaner";
+    public static final boolean DEFAULT_ENABLE_CDC_RAW_CLEANER_PROPERTY = true;
+
+    public static final String FALLBACK_CDC_RAW_MAX_DIRECTORY_SIZE_BYTES = 
"fallback_cdc_raw_max_directory_size_bytes";
+    public static final long DEFAULT_FALLBACK_CDC_RAW_MAX_DIRECTORY_SIZE_BYTES 
= 1L << 31; // 2 GiB
+
+    public static final String CDC_RAW_MAX_DIRECTORY_MAX_PERCENT = 
"cdc_raw_max_directory_max_percent";
+    public static final float DEFAULT_CDC_RAW_MAX_DIRECTORY_MAX_PERCENT = 1.0f;
+
+    public static final String CDC_RAW_MAX_CRITICAL_BUFFER_WINDOW = 
"cdc_raw_critical_buffer_window";
+    public static final SecondBoundConfiguration 
DEFAULT_CDC_RAW_MAX_CRITICAL_BUFFER_WINDOW = 
SecondBoundConfiguration.parse("15m");
+
+    public static final String CDC_RAW_MAX_LOW_BUFFER_WINDOW = 
"cdc_raw_low_buffer_window";
+    public static final SecondBoundConfiguration 
DEFAULT_CDC_RAW_MAX_LOW_BUFFER_WINDOW = SecondBoundConfiguration.parse("60m");
+
+    public static final String CDC_CACHE_MAX_USAGE_DURATION = 
"cdc_raw_cache_max_usage_duration";
+    public static final SecondBoundConfiguration 
DEFAULT_CDC_CACHE_MAX_USAGE_DURATION = SecondBoundConfiguration.parse("15m");
 
     @JsonProperty(value = IS_ENABLED_PROPERTY)
     private final boolean isEnabled;
@@ -48,22 +69,74 @@ public class CdcConfigurationImpl implements 
CdcConfiguration
     private final MillisecondBoundConfiguration cdcConfigRefreshTime;
     @JsonProperty(value = SEGMENT_HARD_LINK_CACHE_EXPIRY_PROPERTY)
     private SecondBoundConfiguration segmentHardLinkCacheExpiry;
-
+    @JsonProperty(value = CDC_RAW_CLEANER_FREQUENCY_PROPERTY)
+    private SecondBoundConfiguration cdcRawCleanerFrequency;
+    @JsonProperty(value = ENABLE_CDC_RAW_CLEANER_PROPERTY)
+    private boolean enableCdcRawCleaner;
+    @JsonProperty(value = FALLBACK_CDC_RAW_MAX_DIRECTORY_SIZE_BYTES)
+    private long fallbackCdcRawMaxDirectorySize;
+    @JsonProperty(value = CDC_RAW_MAX_DIRECTORY_MAX_PERCENT)
+    private float cdcRawMaxPercent;
+    @JsonProperty(value = CDC_RAW_MAX_CRITICAL_BUFFER_WINDOW)
+    private SecondBoundConfiguration cdcRawCriticalBufferWindow;
+    @JsonProperty(value = CDC_RAW_MAX_LOW_BUFFER_WINDOW)
+    private SecondBoundConfiguration cdcRawLowBufferWindow;
+    @JsonProperty(value = CDC_CACHE_MAX_USAGE_DURATION)
+    private SecondBoundConfiguration cacheMaxUsage;
 
     public CdcConfigurationImpl()
     {
         this.segmentHardLinkCacheExpiry = 
DEFAULT_SEGMENT_HARD_LINK_CACHE_EXPIRY;
         this.cdcConfigRefreshTime = DEFAULT_CDC_CONFIG_REFRESH_TIME;
         this.isEnabled = DEFAULT_IS_ENABLED;
+        this.cdcRawCleanerFrequency = DEFAULT_CDC_RAW_CLEANER_FREQUENCY;
+        this.enableCdcRawCleaner = DEFAULT_ENABLE_CDC_RAW_CLEANER_PROPERTY;
+        this.fallbackCdcRawMaxDirectorySize = 
DEFAULT_FALLBACK_CDC_RAW_MAX_DIRECTORY_SIZE_BYTES;
+        this.cdcRawMaxPercent = DEFAULT_CDC_RAW_MAX_DIRECTORY_MAX_PERCENT;
+        this.cdcRawCriticalBufferWindow = 
DEFAULT_CDC_RAW_MAX_CRITICAL_BUFFER_WINDOW;
+        this.cdcRawLowBufferWindow = DEFAULT_CDC_RAW_MAX_LOW_BUFFER_WINDOW;
+        this.cacheMaxUsage = DEFAULT_CDC_CACHE_MAX_USAGE_DURATION;
     }
 
     public CdcConfigurationImpl(boolean isEnabled,
                                 MillisecondBoundConfiguration 
cdcConfigRefreshTime,
                                 SecondBoundConfiguration 
segmentHardLinkCacheExpiry)
+    {
+        this(
+        isEnabled,
+        cdcConfigRefreshTime,
+        segmentHardLinkCacheExpiry,
+        DEFAULT_CDC_RAW_CLEANER_FREQUENCY,
+        DEFAULT_ENABLE_CDC_RAW_CLEANER_PROPERTY,
+        DEFAULT_FALLBACK_CDC_RAW_MAX_DIRECTORY_SIZE_BYTES,
+        DEFAULT_CDC_RAW_MAX_DIRECTORY_MAX_PERCENT,
+        DEFAULT_CDC_RAW_MAX_CRITICAL_BUFFER_WINDOW,
+        DEFAULT_CDC_RAW_MAX_LOW_BUFFER_WINDOW,
+        DEFAULT_CDC_CACHE_MAX_USAGE_DURATION
+        );
+    }
+
+    public CdcConfigurationImpl(boolean isEnabled,
+                                MillisecondBoundConfiguration 
cdcConfigRefreshTime,
+                                SecondBoundConfiguration 
segmentHardLinkCacheExpiry,
+                                SecondBoundConfiguration 
cdcRawCleanerFrequency,
+                                boolean enableCdcRawCleaner,
+                                long cdcRawMaxDirectorySize,
+                                float cdcRawMaxPercent,
+                                SecondBoundConfiguration 
cdcRawCriticalBufferWindow,
+                                SecondBoundConfiguration cdcRawLowBufferWindow,
+                                SecondBoundConfiguration cacheMaxUsage)
     {
         this.isEnabled = isEnabled;
         this.cdcConfigRefreshTime = cdcConfigRefreshTime;
         this.segmentHardLinkCacheExpiry = segmentHardLinkCacheExpiry;
+        this.cdcRawCleanerFrequency = cdcRawCleanerFrequency;
+        this.enableCdcRawCleaner = enableCdcRawCleaner;
+        this.fallbackCdcRawMaxDirectorySize = cdcRawMaxDirectorySize;
+        this.cdcRawMaxPercent = cdcRawMaxPercent;
+        this.cdcRawCriticalBufferWindow = cdcRawCriticalBufferWindow;
+        this.cdcRawLowBufferWindow = cdcRawLowBufferWindow;
+        this.cacheMaxUsage = cacheMaxUsage;
     }
 
     @Override
@@ -100,4 +173,117 @@ public class CdcConfigurationImpl implements 
CdcConfiguration
         LOGGER.warn("'segment_hardlink_cache_expiry_in_secs' is deprecated, 
use 'segment_hardlink_cache_expiry' instead");
         this.segmentHardLinkCacheExpiry = new 
SecondBoundConfiguration(segmentHardlinkCacheExpiryInSecs, TimeUnit.SECONDS);
     }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    @JsonProperty(value = CDC_RAW_CLEANER_FREQUENCY_PROPERTY)
+    public SecondBoundConfiguration cdcRawDirectorySpaceCleanerFrequency()
+    {
+        return cdcRawCleanerFrequency;
+    }
+
+    @JsonProperty(value = CDC_RAW_CLEANER_FREQUENCY_PROPERTY)
+    public void 
setCdcRawDirectorySpaceCleanerFrequency(SecondBoundConfiguration 
cdcRawCleanerFrequency)
+    {
+        this.cdcRawCleanerFrequency = cdcRawCleanerFrequency;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    @JsonProperty(value = ENABLE_CDC_RAW_CLEANER_PROPERTY)
+    public boolean enableCdcRawDirectoryRoutineCleanUp()
+    {
+        return enableCdcRawCleaner;
+    }
+
+    @JsonProperty(value = ENABLE_CDC_RAW_CLEANER_PROPERTY)
+    public void setEnableCdcRawCleaner(boolean enableCdcRawCleaner)
+    {
+        this.enableCdcRawCleaner = enableCdcRawCleaner;
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    @JsonProperty(value = FALLBACK_CDC_RAW_MAX_DIRECTORY_SIZE_BYTES)
+    public long fallbackCdcRawDirectoryMaxSizeBytes()
+    {
+        return fallbackCdcRawMaxDirectorySize;
+    }
+
+    @JsonProperty(value = FALLBACK_CDC_RAW_MAX_DIRECTORY_SIZE_BYTES)
+    public void setFallbackCdcRawDirectoryMaxSizeBytes(long 
fallbackCdcRawMaxDirectorySize)
+    {
+        this.fallbackCdcRawMaxDirectorySize = fallbackCdcRawMaxDirectorySize;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    @JsonProperty(value = CDC_RAW_MAX_DIRECTORY_MAX_PERCENT)
+    public float cdcRawDirectoryMaxPercentUsage()
+    {
+        return cdcRawMaxPercent;
+    }
+
+    @JsonProperty(value = CDC_RAW_MAX_DIRECTORY_MAX_PERCENT)
+    public void setCdcRawDirectoryMaxPercentUsage(long cdcRawMaxPercent)
+    {
+        this.cdcRawMaxPercent = cdcRawMaxPercent;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    @JsonProperty(value = CDC_RAW_MAX_CRITICAL_BUFFER_WINDOW)
+    public SecondBoundConfiguration cdcRawDirectoryCriticalBufferWindow()
+    {
+        return cdcRawCriticalBufferWindow;
+    }
+
+    @JsonProperty(value = CDC_RAW_MAX_CRITICAL_BUFFER_WINDOW)
+    public void 
setCdcRawDirectoryCriticalBufferWindow(SecondBoundConfiguration 
cdcRawCriticalBufferWindow)
+    {
+        this.cdcRawCriticalBufferWindow = cdcRawCriticalBufferWindow;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    @JsonProperty(value = CDC_RAW_MAX_LOW_BUFFER_WINDOW)
+    public SecondBoundConfiguration cdcRawDirectoryLowBufferWindow()
+    {
+        return cdcRawLowBufferWindow;
+    }
+
+    @JsonProperty(value = CDC_RAW_MAX_LOW_BUFFER_WINDOW)
+    public void setCdcRawDirectoryLowBufferWindow(SecondBoundConfiguration 
cdcRawLowBufferWindow)
+    {
+        this.cdcRawLowBufferWindow = cdcRawLowBufferWindow;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @JsonProperty(value = CDC_CACHE_MAX_USAGE_DURATION)
+    @Override
+    public SecondBoundConfiguration cacheMaxUsage()
+    {
+        return cacheMaxUsage;
+    }
+
+    @JsonProperty(value = CDC_CACHE_MAX_USAGE_DURATION)
+    public void setCacheMaxUsage(SecondBoundConfiguration cacheMaxUsage)
+    {
+        this.cacheMaxUsage = cacheMaxUsage;
+    }
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessor.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessor.java
new file mode 100644
index 00000000..60280c26
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessor.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.ResultSet;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.db.schema.SystemViewsSchema;
+import org.apache.cassandra.sidecar.exceptions.SchemaUnavailableException;
+import org.apache.cassandra.sidecar.utils.FileUtils;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Database Accessor that queries cassandra to get information maintained 
under system_auth keyspace.
+ */
+@Singleton
+public class SystemViewsDatabaseAccessor extends 
DatabaseAccessor<SystemViewsSchema>
+{
+    private static final String YAML_PROP_IN_MB = "cdc_total_space_in_mb";
+    private static final String YAML_PROP_WITH_UNIT = "cdc_total_space"; // 
expects value with units e.g. "5MiB"
+
+    @Inject
+    public SystemViewsDatabaseAccessor(SystemViewsSchema systemViewsSchema,
+                                       CQLSessionProvider sessionProvider)
+    {
+        super(systemViewsSchema, sessionProvider);
+    }
+
+    @Nullable
+    public Long getCdcTotalSpaceSetting() throws SchemaUnavailableException
+    {
+        // attempt to parse Cassandra v4.0 'cdc_total_space_in_mb' yaml prop
+        Map<String, String> settings = getSettings(YAML_PROP_IN_MB, 
YAML_PROP_WITH_UNIT);
+        String cdcTotalSpaceInMb = settings.get(YAML_PROP_IN_MB);
+        if (cdcTotalSpaceInMb != null)
+        {
+            return FileUtils.mbStringToBytes(cdcTotalSpaceInMb);
+        }
+
+        // otherwise parse current (v5.0+) 'cdc_total_space' yaml prop
+        String storageStringToBytes = settings.get(YAML_PROP_WITH_UNIT);
+        if (storageStringToBytes != null)
+        {
+            return FileUtils.storageStringToBytes(storageStringToBytes);
+        }
+
+        return null;
+    }
+
+    /**
+     * Load a setting values from the `system_views.settings` table.
+     *
+     * @param names names of settings
+     * @return map of setting values keyed on `name` loaded from the 
`system_views.settings` table.
+     */
+    @NotNull
+    public Map<String, String> getSettings(String... names) throws 
SchemaUnavailableException
+    {
+        BoundStatement statement = 
tableSchema.selectSettings().bind(Arrays.asList(names));
+        ResultSet result = execute(statement);
+        return result.all()
+                     .stream()
+                     .collect(Collectors.toMap(
+                              row -> row.getString(0),
+                              row -> row.getString(1))
+                     );
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/db/schema/SystemViewsSchema.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/db/schema/SystemViewsSchema.java
new file mode 100644
index 00000000..e640f42a
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/db/schema/SystemViewsSchema.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db.schema;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.exceptions.SchemaUnavailableException;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Schema for getting information stored in system_views keyspace.
+ */
+@Singleton
+public class SystemViewsSchema extends CassandraSystemTableSchema
+{
+    protected static final String SYSTEM_VIEWS_KEYSPACE_NAME = "system_views";
+    protected static final String SYSTEM_VIEWS_SETTINGS_TABLE_NAME = 
"settings";
+    private PreparedStatement selectSettings;
+
+    @Override
+    protected String keyspaceName()
+    {
+        return SYSTEM_VIEWS_KEYSPACE_NAME;
+    }
+
+    @Override
+    protected String tableName()
+    {
+        return SYSTEM_VIEWS_SETTINGS_TABLE_NAME;
+    }
+
+    public PreparedStatement selectSettings() throws SchemaUnavailableException
+    {
+        ensureSchemaAvailable();
+        return selectSettings;
+    }
+
+    @Override
+    protected void prepareStatements(@NotNull Session session)
+    {
+        this.selectSettings = session.prepare("SELECT name, value FROM 
system_views.settings WHERE name IN ?");
+    }
+
+    protected void ensureSchemaAvailable() throws SchemaUnavailableException
+    {
+        if (selectSettings == null)
+        {
+            throw new SchemaUnavailableException(keyspaceName(), tableName());
+        }
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/CdcMetrics.java 
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/CdcMetrics.java
new file mode 100644
index 00000000..ac847d49
--- /dev/null
+++ b/server/src/main/java/org/apache/cassandra/sidecar/metrics/CdcMetrics.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.metrics;
+
+import java.util.Objects;
+import java.util.function.Function;
+
+import com.codahale.metrics.DefaultSettableGauge;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricRegistry;
+
+import static org.apache.cassandra.sidecar.metrics.ServerMetrics.SERVER_PREFIX;
+
+/**
+ * Tracks metrics related to cdc functionality provided by Sidecar.
+ */
+public class CdcMetrics
+{
+    public static final String DOMAIN = SERVER_PREFIX + ".Cdc";
+    protected final MetricRegistry metricRegistry;
+    public final NamedMetric<DeltaGauge> cdcRawCleanerFailed;
+    public final NamedMetric<DeltaGauge> orphanedIdx;
+    public final NamedMetric<DefaultSettableGauge<Integer>> oldestSegmentAge;
+    public final NamedMetric<DeltaGauge> totalConsumedCdcBytes;
+    public final NamedMetric<DefaultSettableGauge<Long>> totalCdcSpaceUsed;
+    public final NamedMetric<DeltaGauge> deletedSegment;
+    public final NamedMetric<DeltaGauge> lowCdcRawSpace;
+    public final NamedMetric<DeltaGauge> criticalCdcRawSpace;
+
+    public CdcMetrics(MetricRegistry metricRegistry)
+    {
+        this.metricRegistry = Objects.requireNonNull(metricRegistry, "Metric 
registry can not be null");
+        this.cdcRawCleanerFailed = createMetric("CleanerFailed", name -> 
metricRegistry.gauge(name, DeltaGauge::new));
+        this.totalConsumedCdcBytes = createMetric("TotalConsumedBytes", name 
-> metricRegistry.gauge(name, DeltaGauge::new));
+        this.totalCdcSpaceUsed = createMetric("TotalSpaceUsed", name -> 
metricRegistry.gauge(name, () -> new DefaultSettableGauge<>(0L)));
+        this.orphanedIdx = createMetric("OrphanedIdxFile", name -> 
metricRegistry.gauge(name, DeltaGauge::new));
+        this.deletedSegment = createMetric("DeletedSegment", name -> 
metricRegistry.gauge(name, DeltaGauge::new));
+        this.oldestSegmentAge = createMetric("OldestSegmentAgeSeconds", name 
-> metricRegistry.gauge(name, () -> new DefaultSettableGauge<>(0)));
+        this.lowCdcRawSpace = createMetric("LowSpace", name -> 
metricRegistry.gauge(name, DeltaGauge::new));
+        this.criticalCdcRawSpace = createMetric("CriticalSpace", name -> 
metricRegistry.gauge(name, DeltaGauge::new));
+    }
+
+    private <T extends Metric> NamedMetric<T> createMetric(String simpleName, 
Function<String, T> metricCreator)
+    {
+        return NamedMetric.builder(metricCreator)
+                          .withDomain(DOMAIN)
+                          .withName(simpleName)
+                          .build();
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java 
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java
index ea8d7857..fb8be759 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java
@@ -64,4 +64,9 @@ public interface ServerMetrics
      * @return metrics related to coordination functionality that are tracked
      */
     CoordinationMetrics coordination();
+
+    /**
+     * @return metrics tracked by server for cdc functionality.
+     */
+    CdcMetrics cdc();
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetricsImpl.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetricsImpl.java
index 15e31e6a..2d4e7da1 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetricsImpl.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetricsImpl.java
@@ -36,6 +36,7 @@ public class ServerMetricsImpl implements ServerMetrics
     protected final SchemaReportingMetrics schemaReportingMetrics;
     protected final CacheMetrics cacheMetrics;
     protected final CoordinationMetrics coordinationMetrics;
+    protected final CdcMetrics cdcMetrics;
 
     public ServerMetricsImpl(MetricRegistry metricRegistry)
     {
@@ -48,6 +49,7 @@ public class ServerMetricsImpl implements ServerMetrics
         this.schemaReportingMetrics = new 
SchemaReportingMetrics(metricRegistry);
         this.cacheMetrics = new CacheMetrics(metricRegistry);
         this.coordinationMetrics = new CoordinationMetrics(metricRegistry);
+        this.cdcMetrics = new CdcMetrics(metricRegistry);
     }
 
     @Override
@@ -91,4 +93,10 @@ public class ServerMetricsImpl implements ServerMetrics
     {
         return coordinationMetrics;
     }
+
+    @Override
+    public CdcMetrics cdc()
+    {
+        return cdcMetrics;
+    }
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
index 3b6da2c4..322b80bf 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
@@ -35,6 +35,7 @@ import 
org.apache.cassandra.sidecar.coordination.SidecarPeerHealthMonitorTask;
 import org.apache.cassandra.sidecar.coordination.SidecarPeerHealthProvider;
 import org.apache.cassandra.sidecar.coordination.SidecarPeerProvider;
 import org.apache.cassandra.sidecar.db.schema.ConfigsSchema;
+import org.apache.cassandra.sidecar.db.schema.SystemViewsSchema;
 import org.apache.cassandra.sidecar.db.schema.TableSchema;
 import org.apache.cassandra.sidecar.handlers.cdc.AllServiceConfigHandler;
 import org.apache.cassandra.sidecar.handlers.cdc.DeleteServiceConfigHandler;
@@ -47,6 +48,7 @@ import 
org.apache.cassandra.sidecar.modules.multibindings.TableSchemaMapKeys;
 import org.apache.cassandra.sidecar.modules.multibindings.VertxRouteMapKeys;
 import org.apache.cassandra.sidecar.routes.RouteBuilder;
 import org.apache.cassandra.sidecar.routes.VertxRoute;
+import org.apache.cassandra.sidecar.tasks.CdcRawDirectorySpaceCleaner;
 import org.apache.cassandra.sidecar.tasks.PeriodicTask;
 import org.apache.cassandra.sidecar.utils.SidecarClientProvider;
 
@@ -63,6 +65,13 @@ public class CdcModule extends AbstractModule
         return task;
     }
 
+    @ProvidesIntoMap
+    
@KeyClassMapKey(PeriodicTaskMapKeys.CdcRawDirectorySpaceCleanerTaskKey.class)
+    PeriodicTask 
cdcRawDirectorySpaceCleanercPeriodicTask(CdcRawDirectorySpaceCleaner 
cleanerTask)
+    {
+        return cleanerTask;
+    }
+
     @ProvidesIntoMap
     @KeyClassMapKey(TableSchemaMapKeys.ConfigsSchemaKey.class)
     TableSchema configsSchema(ServiceConfiguration serviceConfiguration)
@@ -70,6 +79,13 @@ public class CdcModule extends AbstractModule
         return new ConfigsSchema(serviceConfiguration);
     }
 
+    @ProvidesIntoMap
+    @KeyClassMapKey(TableSchemaMapKeys.SystemViewsSchemaKey.class)
+    TableSchema systemViewssSchema(SystemViewsSchema schema)
+    {
+        return schema;
+    }
+
     @ProvidesIntoMap
     @KeyClassMapKey(VertxRouteMapKeys.ListCdcSegmentsRouteKey.class)
     VertxRoute listCdcSegmentsRoute(RouteBuilder.Factory factory,
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/PeriodicTaskMapKeys.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/PeriodicTaskMapKeys.java
index 7099c7dc..57a4545b 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/PeriodicTaskMapKeys.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/PeriodicTaskMapKeys.java
@@ -33,4 +33,5 @@ public interface PeriodicTaskMapKeys
     interface SchemaReportingTaskKey extends ClassKey {}
     interface SidecarPeerHealthMonitorTaskKey extends ClassKey {}
     interface SidecarSchemaInitializerTaskKey extends ClassKey {}
+    interface CdcRawDirectorySpaceCleanerTaskKey extends ClassKey {}
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/TableSchemaMapKeys.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/TableSchemaMapKeys.java
index fdf2ee44..c3251e33 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/TableSchemaMapKeys.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/TableSchemaMapKeys.java
@@ -30,4 +30,5 @@ public interface TableSchemaMapKeys
     interface SidecarLeaseSchemaKey extends ClassKey {}
     interface SidecarRolePermissionsSchemaKey extends ClassKey {}
     interface SystemAuthSchemaKey extends ClassKey {}
+    interface SystemViewsSchemaKey extends ClassKey {}
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleaner.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleaner.java
new file mode 100644
index 00000000..ba5bb3ed
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleaner.java
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import org.apache.cassandra.sidecar.config.CdcConfiguration;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.db.SystemViewsDatabaseAccessor;
+import org.apache.cassandra.sidecar.exceptions.SchemaUnavailableException;
+import org.apache.cassandra.sidecar.metrics.CdcMetrics;
+import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+import org.apache.cassandra.sidecar.utils.FileUtils;
+import org.apache.cassandra.sidecar.utils.TimeProvider;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.cassandra.sidecar.utils.CdcUtil.isLogFile;
+import static org.apache.cassandra.sidecar.utils.CdcUtil.parseSegmentId;
+
+/**
+ * PeriodTask to monitor and remove the oldest commit log segments in the 
`cdc_raw` directory
+ * when the space used hits the `cdc_total_space` limit set in the yaml file.
+ */
+@Singleton
+public class CdcRawDirectorySpaceCleaner implements PeriodicTask
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CdcRawDirectorySpaceCleaner.class);
+
+    public static final String CDC_DIR_NAME = "cdc_raw";
+
+    private final TimeProvider timeProvider;
+    private final SystemViewsDatabaseAccessor systemViewsDatabaseAccessor;
+    private final CdcConfiguration cdcConfiguration;
+    private final InstancesMetadata instancesMetadata;
+    private final CdcMetrics cdcMetrics;
+
+    // non-volatile variables, PeriodicTaskExecutor should ensure memory 
visibility
+    @Nullable
+    private Long maxUsageBytes = null;
+    // lazily loaded from system_views.settings if available
+    private Long maxUsageLastReadNanos = null;
+    // cdc file -> file size in bytes. It memorizes the file set of the last 
time the checker runs.
+    private Map<CdcRawSegmentFile, Long> priorCdcFiles = new HashMap<>();
+
+    @Inject
+    public CdcRawDirectorySpaceCleaner(TimeProvider timeProvider,
+                                       SystemViewsDatabaseAccessor 
systemViewsDatabaseAccessor,
+                                       ServiceConfiguration 
serviceConfiguration,
+                                       InstancesMetadata instancesMetadata,
+                                       SidecarMetrics metrics)
+    {
+        this.timeProvider = timeProvider;
+        this.systemViewsDatabaseAccessor = systemViewsDatabaseAccessor;
+        this.cdcConfiguration = serviceConfiguration.cdcConfiguration();
+        this.instancesMetadata = instancesMetadata;
+        this.cdcMetrics = metrics.server().cdc();
+    }
+
+    @Override
+    public DurationSpec delay()
+    {
+        return cdcConfiguration.cdcRawDirectorySpaceCleanerFrequency();
+    }
+
+    @Override
+    public void execute(Promise<Void> promise)
+    {
+        try
+        {
+            routineCleanUp();
+            promise.tryComplete();
+        }
+        catch (Throwable t)
+        {
+            LOGGER.warn("Failed to perform routine clean-up of cdc_raw 
directory", t);
+            cdcMetrics.cdcRawCleanerFailed.metric.update(1L);
+            promise.fail(t);
+        }
+    }
+
+    /**
+     * @return true if we need to refresh the cached `cdc_total_space` value.
+     */
+    protected boolean shouldRefreshCachedMaxUsage()
+    {
+        return maxUsageLastReadNanos == null ||
+               (timeProvider.nanoTime() - maxUsageLastReadNanos) >=
+               
TimeUnit.MILLISECONDS.toNanos(cdcConfiguration.cacheMaxUsage().toMillis());
+    }
+
+    protected long maxUsage()
+    {
+        if (!shouldRefreshCachedMaxUsage())
+        {
+            return Objects.requireNonNull(maxUsageBytes,
+                                          "maxUsageBytes cannot be null if 
maxUsageLastReadNanos is non-null");
+        }
+
+        try
+        {
+            Long newValue = 
systemViewsDatabaseAccessor.getCdcTotalSpaceSetting();
+            if (newValue != null)
+            {
+                if (!newValue.equals(maxUsageBytes))
+                {
+                    LOGGER.info(
+                    "Change in cdc_total_space from system_views.settings 
prev={} latest={}",
+                    maxUsageBytes, newValue);
+                    this.maxUsageBytes = newValue;
+                    this.maxUsageLastReadNanos = timeProvider.nanoTime();
+                    return newValue;
+                }
+            }
+        }
+        catch (SchemaUnavailableException e)
+        {
+            LOGGER.debug("Could not read cdc_total_space from 
system_views.settings", e);
+        }
+        catch (Throwable t)
+        {
+            LOGGER.warn("Error reading cdc_total_space from 
system_views.settings", t);
+        }
+
+        LOGGER.warn("Could not read cdc_total_space from 
system_views.settings, falling back to props");
+        return cdcConfiguration.fallbackCdcRawDirectoryMaxSizeBytes();
+    }
+
+    @Override
+    public ScheduleDecision scheduleDecision()
+    {
+        if (cdcConfiguration.enableCdcRawDirectoryRoutineCleanUp())
+        {
+            return ScheduleDecision.EXECUTE;
+        }
+        LOGGER.debug("Skipping CdcRawDirectorySpaceCleaner: feature is 
disabled");
+        return ScheduleDecision.SKIP;
+    }
+
+    protected void routineCleanUp()
+    {
+        for (InstanceMetadata instanceMetadata : instancesMetadata.instances())
+        {
+            List<String> dataDirectories = instanceMetadata.dataDirs();
+            dataDirectories.stream()
+                           .map(dir -> new File(dir, CDC_DIR_NAME))
+                           .forEach(this::cleanUpCdcRawDirectory);
+        }
+    }
+
+    protected void cleanUpCdcRawDirectory(File cdcRawDirectory)
+    {
+        if (!cdcRawDirectory.exists() || !cdcRawDirectory.isDirectory())
+        {
+            LOGGER.debug("Skipping CdcRawDirectorySpaceCleaner: CDC directory 
does not exist: {}", cdcRawDirectory);
+            return;
+        }
+
+        List<CdcRawSegmentFile> segmentFiles = Optional
+                                               .ofNullable(
+                                               
cdcRawDirectory.listFiles(this::validSegmentFilter))
+                                               .map(files -> 
Arrays.stream(files)
+                                                                   
.map(CdcRawSegmentFile::new)
+                                                                   .filter(
+                                                                   
CdcRawSegmentFile::indexExists)
+                                                                   
.collect(Collectors.toList())
+                                               )
+                                               .orElseGet(List::of);
+        publishCdcStats(segmentFiles);
+        if (segmentFiles.size() < 2)
+        {
+            LOGGER.debug("Skipping cdc data cleaner routine cleanup: No cdc 
data or only one single cdc segment is found.");
+            return;
+        }
+
+        long directorySize = FileUtils.directorySize(cdcRawDirectory);
+        long upperLimitBytes =
+        (long) (maxUsage() * 
cdcConfiguration.cdcRawDirectoryMaxPercentUsage());
+        // Sort the files by segmentId to delete commit log segments in write 
order
+        // The latest file is the current active segment, but it could be 
created before the retention duration, e.g. slow data ingress
+        Collections.sort(segmentFiles);
+        long nowInMillis = timeProvider.currentTimeMillis();
+
+        // track the age of the oldest commit log segment to give indication 
of the time-window buffer available
+        cdcMetrics.oldestSegmentAge.metric.setValue(
+        (int) MILLISECONDS.toSeconds(nowInMillis - 
segmentFiles.get(0).lastModified()));
+
+        if (directorySize > upperLimitBytes)
+        {
+            if (segmentFiles.get(0).segmentId > segmentFiles.get(1).segmentId)
+            {
+                LOGGER.error("Cdc segments sorted incorrectly {} before {}",
+                             segmentFiles.get(0).segmentId, 
segmentFiles.get(1).segmentId);
+            }
+
+            long criticalMillis = 
cdcConfiguration.cdcRawDirectoryCriticalBufferWindow().toMillis();
+            long lowMillis = 
cdcConfiguration.cdcRawDirectoryLowBufferWindow().toMillis();
+
+            // we keep the last commit log segment as it may still be actively 
written to
+            int i = 0;
+            while (i < segmentFiles.size() - 1 && directorySize > 
upperLimitBytes)
+            {
+                CdcRawSegmentFile segment = segmentFiles.get(i);
+                long ageMillis = nowInMillis - segment.lastModified();
+
+                if (ageMillis < criticalMillis)
+                {
+                    LOGGER.error("Insufficient Cdc buffer size to maintain 
{}-minute window segment={} maxSize={} ageMinutes={}",
+                                 MILLISECONDS.toMinutes(criticalMillis), 
segment, upperLimitBytes,
+                                 MILLISECONDS.toMinutes(ageMillis));
+                    cdcMetrics.criticalCdcRawSpace.metric.update(1);
+                }
+                else if (ageMillis < lowMillis)
+                {
+                    LOGGER.warn("Insufficient Cdc buffer size to maintain 
{}-minute window segment={} maxSize={} ageMinutes={}",
+                                MILLISECONDS.toMinutes(lowMillis), segment, 
upperLimitBytes,
+                                MILLISECONDS.toMinutes(ageMillis));
+                    cdcMetrics.lowCdcRawSpace.metric.update(1);
+                }
+                long length = 0;
+                try
+                {
+                    length = deleteSegment(segment);
+                    cdcMetrics.deletedSegment.metric.update(length);
+                }
+                catch (IOException e)
+                {
+                    LOGGER.warn("Failed to delete cdc segment", e);
+                }
+                directorySize -= length;
+                i++;
+            }
+        }
+
+        try
+        {
+            cleanupOrphanedIdxFiles(cdcRawDirectory);
+        }
+        catch (IOException e)
+        {
+            LOGGER.warn("Failed to clean up orphaned idx files", e);
+        }
+    }
+
+    protected boolean validSegmentFilter(File file)
+    {
+        return file.isFile() && isLogFile(file.getName());
+    }
+
+    protected long deleteSegment(CdcRawSegmentFile segment) throws IOException
+    {
+        final long numBytes = segment.length() + segment.indexLength();
+        LOGGER.info("Deleting Cdc segment path={} lastModified={} 
numBytes={}", segment,
+                    segment.lastModified(), numBytes);
+        Files.deleteIfExists(segment.path());
+        Files.deleteIfExists(segment.indexPath());
+        return numBytes;
+    }
+
+    // runs optionally if detects orphaned and old index files
+    private void cleanupOrphanedIdxFiles(File cdcDir) throws IOException
+    {
+        final File[] indexFiles =
+        cdcDir.listFiles(f -> f.isFile() && 
CdcUtil.isValidIdxFile(f.getName()));
+        if (indexFiles == null || indexFiles.length == 0)
+            return; // exit early when finding no index files
+
+        final File[] cdcSegments =
+        cdcDir.listFiles(f -> f.isFile() && CdcUtil.isLogFile(f.getName()));
+        Set<String> cdcFileNames = Set.of();
+        if (cdcSegments != null)
+        {
+            cdcFileNames = new HashSet<>(cdcSegments.length);
+            for (File f : cdcSegments)
+            {
+                cdcFileNames.add(f.getName());
+            }
+        }
+
+        // now, delete all old index files that have no corresponding log 
files.
+        for (File idxFile : indexFiles)
+        {
+            final String cdcFileName = 
CdcUtil.idxToLogFileName(idxFile.getName());
+            if (!cdcFileNames.contains(cdcFileName))
+            {  // found an orphaned index file
+                LOGGER.warn("Orphaned Cdc idx file found with no corresponding 
Cdc segment path={}",
+                            idxFile.toPath());
+                cdcMetrics.orphanedIdx.metric.update(1L);
+                Files.deleteIfExists(idxFile.toPath());
+            }
+        }
+    }
+
+    private void publishCdcStats(@Nullable List<CdcRawSegmentFile> cdcFiles)
+    {
+        // no cdc data consumed or exist
+        boolean noCdcFiles = cdcFiles == null || cdcFiles.isEmpty();
+        if (noCdcFiles && priorCdcFiles.isEmpty())
+            return;
+
+        Map<CdcRawSegmentFile, Long> currentFiles;
+        long totalCurrentBytes = 0L;
+        if (noCdcFiles)
+        {
+            currentFiles = new HashMap<>();
+        }
+        else
+        {
+            currentFiles = new HashMap<>(cdcFiles.size());
+            for (CdcRawSegmentFile segment : cdcFiles)
+            {
+                if (segment.exists())
+                {
+                    long len = segment.length();
+                    currentFiles.put(segment, len);
+                    totalCurrentBytes += len;
+                }
+            }
+        }
+
+        // skip publishing. there is no cdc data consumed and no data exist.
+        if (totalCurrentBytes == 0L && priorCdcFiles.isEmpty())
+        {
+            priorCdcFiles = currentFiles;
+            return;
+        }
+
+        // consumed files is the files exist in the prior round but now are 
deleted.
+        Set<CdcRawSegmentFile> consumedFiles =
+        Sets.difference(priorCdcFiles.keySet(), currentFiles.keySet());
+        long totalConsumedBytes =
+        consumedFiles.stream().map(priorCdcFiles::get).reduce(0L, Long::sum);
+        priorCdcFiles.clear();
+        priorCdcFiles = currentFiles;
+        cdcMetrics.totalConsumedCdcBytes.metric.update(totalConsumedBytes);
+        cdcMetrics.totalCdcSpaceUsed.metric.setValue(totalCurrentBytes);
+    }
+
+    /**
+     * Helper class for the CdcRawDirectorySpaceCleaner to track log segment 
files and associated idx file in the cdc_raw directory
+     */
+    protected static class CdcRawSegmentFile implements 
Comparable<CdcRawSegmentFile>
+    {
+        private final File file;
+        private final File indexFile;
+        private final long segmentId;
+        private final long len;
+
+        CdcRawSegmentFile(File logFile)
+        {
+            this.file = logFile;
+            final String name = logFile.getName();
+            this.segmentId = parseSegmentId(name);
+            this.len = logFile.length();
+            this.indexFile = CdcUtil.getIdxFile(logFile);
+        }
+
+        public boolean exists()
+        {
+            return file.exists();
+        }
+
+        public boolean indexExists()
+        {
+            return indexFile.exists();
+        }
+
+        public long length()
+        {
+            return len;
+        }
+
+        public long indexLength()
+        {
+            return indexFile.length();
+        }
+
+        public long lastModified()
+        {
+            return file.lastModified();
+        }
+
+        public Path path()
+        {
+            return file.toPath();
+        }
+
+        public Path indexPath()
+        {
+            return indexFile.toPath();
+        }
+
+        @Override
+        public int compareTo(@NotNull CdcRawSegmentFile o)
+        {
+            return Long.compare(segmentId, o.segmentId);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return file.hashCode();
+        }
+
+        @Override
+        public boolean equals(Object other)
+        {
+            if (this == other)
+            {
+                return true;
+            }
+            if (other == null || this.getClass() != other.getClass())
+            {
+                return false;
+            }
+
+            CdcRawSegmentFile that = (CdcRawSegmentFile) other;
+            return file.equals(that.file);
+        }
+
+        @Override
+        public String toString()
+        {
+            return file.getAbsolutePath();
+        }
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/utils/CdcUtil.java 
b/server/src/main/java/org/apache/cassandra/sidecar/utils/CdcUtil.java
index 91aa3029..d7ca8d9d 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/utils/CdcUtil.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/utils/CdcUtil.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.util.List;
+import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 /**
@@ -37,7 +38,8 @@ public final class CdcUtil
     private static final int IDX_FILE_EXTENSION_LENGTH = 
IDX_FILE_EXTENSION.length();
     private static final String LOG_FILE_COMPLETE_INDICATOR = "COMPLETED";
     private static final String FILENAME_EXTENSION = "(" + IDX_FILE_EXTENSION 
+ "|" + LOG_FILE_EXTENSION + ")";
-    private static final Pattern SEGMENT_PATTERN = 
Pattern.compile(FILENAME_PREFIX + "((\\d+)(" + SEPARATOR + "\\d+)?)" + 
FILENAME_EXTENSION);
+    static final Pattern SEGMENT_PATTERN = Pattern.compile(FILENAME_PREFIX + 
"(?:\\d+" + SEPARATOR + ")?" + "(\\d+)" + FILENAME_EXTENSION);
+    public static final Pattern IDX_FILE_PATTERN = 
Pattern.compile(FILENAME_PREFIX + "(?:\\d+" + SEPARATOR + ")?" + "(\\d+)" + 
IDX_FILE_EXTENSION);
 
     private static final int READ_INDEX_FILE_MAX_RETRY = 5;
 
@@ -102,6 +104,28 @@ public final class CdcUtil
         return new CdcIndex(latestPosition, isCompleted);
     }
 
+    /**
+     * @param idxFileName Commit log segment idx filename
+     * @return log segment filename for associated idx file
+     */
+    public static String idxToLogFileName(String idxFileName)
+    {
+        return idxFileName.substring(0, idxFileName.length() - 
IDX_FILE_EXTENSION.length()) + LOG_FILE_EXTENSION;
+    }
+
+    public static long parseSegmentId(String name)
+    {
+        final Matcher matcher = SEGMENT_PATTERN.matcher(name);
+        if (matcher.matches())
+        {
+            return Long.parseLong(matcher.group(1));
+        }
+        else
+        {
+            throw new IllegalStateException("Invalid CommitLog name: " + name);
+        }
+    }
+
     /**
      * Class representing Cdc index
      */
@@ -119,6 +143,7 @@ public final class CdcUtil
 
     /**
      * Validate for the cdc (log or index) file name.see {@link 
SEGMENT_PATTERN} for the format
+     *
      * @param fileName name of the file
      * @return true if the name is valid; otherwise, false
      */
@@ -132,6 +157,15 @@ public final class CdcUtil
         return isValid(fileName) && fileName.endsWith(LOG_FILE_EXTENSION);
     }
 
+    /**
+     * @param idxFileName name of the file.
+     * @return true if the filename is a valid cdc_raw log segment idx file
+     */
+    public static boolean isValidIdxFile(String idxFileName)
+    {
+        return IDX_FILE_PATTERN.matcher(idxFileName).matches();
+    }
+
     public static boolean isIndexFile(String fileName)
     {
         return isValid(fileName) && matchIndexExtension(fileName);
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/utils/FileUtils.java 
b/server/src/main/java/org/apache/cassandra/sidecar/utils/FileUtils.java
index 51172e83..d8bddb75 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/utils/FileUtils.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/utils/FileUtils.java
@@ -18,11 +18,19 @@
 
 package org.apache.cassandra.sidecar.utils;
 
+import java.io.File;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.jetbrains.annotations.Nullable;
+
 /**
  * Encompasses utilities for files
  */
 public class FileUtils
 {
+    private static final Pattern STORAGE_UNIT_PATTERN = 
Pattern.compile("(\\d+)(GiB|MiB|KiB|B)?");
+
     /**
      * Resolves the home directory from the input {@code directory} string 
when the string begins with {@code ~}.
      *
@@ -35,4 +43,63 @@ public class FileUtils
             return directory;
         return System.getProperty("user.home") + directory.substring(1);
     }
+
+    /**
+     * @param directory the directory
+     * @return the size in bytes of all files in a directory, non-recursively.
+     */
+    public static long directorySize(File directory)
+    {
+        long size = 0;
+        final File[] files = directory.listFiles();
+        if (files != null)
+        {
+            for (File file : files)
+            {
+                if (file.isFile())
+                {
+                    size += file.length();
+                }
+            }
+        }
+        return size;
+    }
+
+    public static long mbStringToBytes(String str)
+    {
+        return Long.parseLong(str) * (1 << 20);  // note the prop name uses 
'mb' but Cassandra parses as MiB
+    }
+
+    @Nullable
+    public static Long storageStringToBytes(String str)
+    {
+        final Matcher matcher = STORAGE_UNIT_PATTERN.matcher(str);
+        if (matcher.find())
+        {
+            return Long.parseLong(matcher.group(1)) * 
storageUnitToBytes(matcher.group(2));
+        }
+        return null;
+    }
+
+    public static long storageUnitToBytes(String unit)
+    {
+        if (unit == null)
+        {
+            return 1;
+        }
+
+        switch (unit)
+        {
+            case "GiB":
+                return 1 << 30;
+            case "MiB":
+                return 1 << 20;
+            case "KiB":
+                return 1024;
+            case "":
+            case "B":
+                return 1;
+        }
+        throw new IllegalStateException("Unexpected data storage unit: " + 
unit);
+    }
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/utils/TimeProvider.java 
b/server/src/main/java/org/apache/cassandra/sidecar/utils/TimeProvider.java
index 9f01cbe2..ac59bf7f 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/utils/TimeProvider.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/utils/TimeProvider.java
@@ -23,10 +23,29 @@ package org.apache.cassandra.sidecar.utils;
  */
 public interface TimeProvider
 {
-    TimeProvider DEFAULT_TIME_PROVIDER = System::currentTimeMillis;
+    TimeProvider DEFAULT_TIME_PROVIDER = new TimeProvider()
+    {
+        public long currentTimeMillis()
+        {
+            return System.currentTimeMillis();
+        }
+
+        public long nanoTime()
+        {
+            return System.nanoTime();
+        }
+    };
 
     /**
      * @return the current time in milliseconds
      */
     long currentTimeMillis();
+
+    /**
+     * @return the current value of the high-resolution time source in 
nanosecond.
+     */
+    default long nanoTime()
+    {
+        throw new UnsupportedOperationException("Nano time is unsupported");
+    }
 }
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessorIntTest.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessorIntTest.java
new file mode 100644
index 00000000..3ddb384b
--- /dev/null
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessorIntTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class SystemViewsDatabaseAccessorIntTest extends IntegrationTestBase
+{
+    @CassandraIntegrationTest(buildCluster = false)
+    void testReadSettings(ConfigurableCassandraTestContext 
cassandraTestContext)
+    {
+        long cdcSizeLimitInMiB = 5;
+        cassandraTestContext.configureAndStartCluster(
+        builder ->
+        builder.appendConfig(config -> config.set("cdc_total_space_in_mb", 
String.valueOf(cdcSizeLimitInMiB)))
+        );
+        waitForSchemaReady(10, TimeUnit.SECONDS);
+
+        SystemViewsDatabaseAccessor accessor = 
injector.getInstance(SystemViewsDatabaseAccessor.class);
+        Long cdcTotalSpaceSettings = accessor.getCdcTotalSpaceSetting();
+        
assertThat(cdcTotalSpaceSettings).isNotNull().isEqualTo(cdcSizeLimitInMiB * 
1024 * 1024);
+    }
+}
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java 
b/server/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java
index e0271b77..7e8ed080 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java
@@ -203,7 +203,9 @@ public class SidecarSchemaTest
 
             "INSERT INTO sidecar_internal.configs (service, config) VALUES (?, 
?) IF NOT EXISTS",
 
-            "DELETE FROM sidecar_internal.configs WHERE service=?"
+            "DELETE FROM sidecar_internal.configs WHERE service=?",
+
+            "SELECT name, value FROM system_views.settings WHERE name IN ?"
             );
 
             assertThat(interceptedPrepStmts).as("Intercepted statements match 
expected statements")
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleanerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleanerTest.java
new file mode 100644
index 00000000..b0366569
--- /dev/null
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleanerTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.commons.lang3.RandomUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import com.codahale.metrics.MetricRegistry;
+import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
+import org.apache.cassandra.sidecar.cluster.InstancesMetadataImpl;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
+import org.apache.cassandra.sidecar.config.CdcConfiguration;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.config.yaml.CdcConfigurationImpl;
+import org.apache.cassandra.sidecar.db.SystemViewsDatabaseAccessor;
+import org.apache.cassandra.sidecar.metrics.CdcMetrics;
+import org.apache.cassandra.sidecar.metrics.ServerMetrics;
+import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+import org.apache.cassandra.sidecar.utils.TimeProvider;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for the {@link CdcRawDirectorySpaceCleaner}
+ */
+public class CdcRawDirectorySpaceCleanerTest
+{
+    private static final MetricRegistry METRIC_REGISTRY = new MetricRegistry();
+    private static final String TEST_SEGMENT_FILE_NAME_1 = 
"CommitLog-2-1250512736956320000.log";
+    private static final String TEST_SEGMENT_FILE_NAME_2 = 
"CommitLog-2-1260512736956320000.log";
+    private static final String TEST_SEGMENT_FILE_NAME_3 = 
"CommitLog-2-1340512736956320000.log";
+    private static final String TEST_ORPHANED_SEGMENT_FILE_NAME = 
"CommitLog-2-1240512736956320000.log";
+    private static final String TEST_INTACT_SEGMENT_FILE_NAME = 
"CommitLog-2-1340512736959990000.log";
+
+    @Test
+    public void testCdcRawDirectorySpaceCleaner(@TempDir Path tempDir) throws 
IOException
+    {
+        TimeProvider timeProvider = TimeProvider.DEFAULT_TIME_PROVIDER;
+        SystemViewsDatabaseAccessor systemViewsDatabaseAccessor = 
mock(SystemViewsDatabaseAccessor.class);
+        when(systemViewsDatabaseAccessor.getSettings(any()))
+        .thenAnswer((Answer<Map<String, String>>) invocation -> 
Map.of("cdc_total_space", "1MiB"));
+        
when(systemViewsDatabaseAccessor.getCdcTotalSpaceSetting()).thenCallRealMethod();
+        CdcConfiguration cdcConfiguration = new CdcConfigurationImpl();
+        ServiceConfiguration serviceConfiguration = 
mock(ServiceConfiguration.class);
+        
when(serviceConfiguration.cdcConfiguration()).thenReturn(cdcConfiguration);
+
+        InstancesMetadata instancesMetadata = mockInstanceMetadata(tempDir);
+        SidecarMetrics sidecarMetrics = mock(SidecarMetrics.class);
+        ServerMetrics serverMetrics = mock(ServerMetrics.class);
+        CdcMetrics cdcMetrics = new CdcMetrics(METRIC_REGISTRY);
+        when(sidecarMetrics.server()).thenReturn(serverMetrics);
+        when(serverMetrics.cdc()).thenReturn(cdcMetrics);
+        CdcRawDirectorySpaceCleaner cleaner = new CdcRawDirectorySpaceCleaner(
+        timeProvider,
+        systemViewsDatabaseAccessor,
+        serviceConfiguration,
+        instancesMetadata,
+        sidecarMetrics
+        );
+
+        checkExists(tempDir, TEST_ORPHANED_SEGMENT_FILE_NAME, true, false);
+        checkExists(tempDir, TEST_SEGMENT_FILE_NAME_1);
+        checkExists(tempDir, TEST_SEGMENT_FILE_NAME_2);
+        checkExists(tempDir, TEST_SEGMENT_FILE_NAME_3);
+        checkExists(tempDir, TEST_INTACT_SEGMENT_FILE_NAME, false, true);
+
+        assertEquals(0L, cdcMetrics.criticalCdcRawSpace.metric.getValue());
+        assertEquals(0L, cdcMetrics.orphanedIdx.metric.getValue());
+        assertEquals(0L, cdcMetrics.deletedSegment.metric.getValue());
+
+        cleaner.routineCleanUp();
+
+        // earliest cdc segment should be deleted along with orphaned idx file
+        checkNotExists(tempDir, TEST_ORPHANED_SEGMENT_FILE_NAME);
+        checkNotExists(tempDir, TEST_SEGMENT_FILE_NAME_1);
+
+        // latest cdc segments should still exist as long as we have free 
buffer space
+        checkExists(tempDir, TEST_SEGMENT_FILE_NAME_2);
+        checkExists(tempDir, TEST_SEGMENT_FILE_NAME_3);
+        checkExists(tempDir, TEST_INTACT_SEGMENT_FILE_NAME, false, true);
+
+        // verify metrics match expected
+        assertEquals(1L, cdcMetrics.criticalCdcRawSpace.metric.getValue());
+        assertEquals(1L, cdcMetrics.orphanedIdx.metric.getValue());
+        assertTrue(cdcMetrics.totalCdcSpaceUsed.metric.getValue() > 2097152L);
+        assertTrue(cdcMetrics.deletedSegment.metric.getValue() > 2097152L);
+        assertEquals(0, cdcMetrics.oldestSegmentAge.metric.getValue());
+
+        // delete all cdc files, in order to test the scenario that we do not 
have current cdc file, but have cdc file in the prior round.
+        // We do not expect all CDC file to be cleaned up in a running system. 
But test it for robustness.
+        Files.deleteIfExists(Paths.get(tempDir.toString(), 
CdcRawDirectorySpaceCleaner.CDC_DIR_NAME, TEST_INTACT_SEGMENT_FILE_NAME));
+        cleaner.routineCleanUp(); // it should run fine.
+    }
+
+    /* test utils */
+
+    private static InstancesMetadata mockInstanceMetadata(Path tempDir) throws 
IOException
+    {
+        InstanceMetadata instanceMetadata = mock(InstanceMetadata.class);
+
+        File cdcDir = 
Files.createDirectory(tempDir.resolve(CdcRawDirectorySpaceCleaner.CDC_DIR_NAME)).toFile();
+        writeCdcSegment(cdcDir, TEST_ORPHANED_SEGMENT_FILE_NAME, 67108864, 
true, true, false);
+        writeCdcSegment(cdcDir, TEST_SEGMENT_FILE_NAME_1, 2097152, true);
+        writeCdcSegment(cdcDir, TEST_SEGMENT_FILE_NAME_2, 524288, true);
+        writeCdcSegment(cdcDir, TEST_SEGMENT_FILE_NAME_3, 1024, false);
+
+        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
+        writeCdcSegment(cdcDir, TEST_INTACT_SEGMENT_FILE_NAME, 
RandomUtils.nextInt(128, 256), false, false, true);
+
+        
when(instanceMetadata.dataDirs()).thenReturn(List.of(cdcDir.getParent()));
+        return new InstancesMetadataImpl(instanceMetadata, 
DnsResolver.DEFAULT);
+    }
+
+    private static void writeCdcSegment(File cdcDir, String filename, int 
size, boolean complete) throws IOException
+    {
+        writeCdcSegment(cdcDir, filename, size, complete, false, false);
+    }
+
+    private static void writeCdcSegment(File cdcDir, String filename, int 
size, boolean complete, boolean orphaned, boolean intact) throws IOException
+    {
+        if (!orphaned)
+        {
+            final File f1 = new File(cdcDir, filename);
+            assertTrue(f1.createNewFile());
+            Files.write(f1.toPath(), RandomUtils.nextBytes(size));
+        }
+
+        if (!intact)
+        {
+            final File f2 = new File(cdcDir, CdcUtil.getIdxFileName(filename));
+            assertTrue(f2.createNewFile());
+            Files.write(f2.toPath(), (size + (complete ? "\nCOMPLETED" : 
"")).getBytes(StandardCharsets.UTF_8));
+        }
+    }
+
+    private void checkExists(Path tempDir, String logFileName)
+    {
+        checkExists(tempDir, logFileName, false, false);
+    }
+
+    private void checkExists(Path tempDir, String logFileName, boolean 
orphaned, boolean intact)
+    {
+        assertEquals(!orphaned, Files.exists(Paths.get(tempDir.toString(), 
CdcRawDirectorySpaceCleaner.CDC_DIR_NAME, logFileName)));
+        assertEquals(!intact, Files.exists(Paths.get(tempDir.toString(), 
CdcRawDirectorySpaceCleaner.CDC_DIR_NAME, 
CdcUtil.getIdxFileName(logFileName))));
+    }
+
+    private void checkNotExists(Path tempDir, String logFileName)
+    {
+        assertFalse(Files.exists(Paths.get(tempDir.toString(), 
CdcRawDirectorySpaceCleaner.CDC_DIR_NAME, logFileName)));
+        assertFalse(Files.exists(Paths.get(tempDir.toString(), 
CdcRawDirectorySpaceCleaner.CDC_DIR_NAME, 
CdcUtil.getIdxFileName(logFileName))));
+    }
+}
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/utils/CdcUtilTest.java 
b/server/src/test/java/org/apache/cassandra/sidecar/utils/CdcUtilTest.java
new file mode 100644
index 00000000..4645b930
--- /dev/null
+++ b/server/src/test/java/org/apache/cassandra/sidecar/utils/CdcUtilTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests for the {@link CdcUtil}
+ */
+public class CdcUtilTest
+{
+    @Test
+    public void testMatcher()
+    {
+        
assertTrue(CdcUtil.SEGMENT_PATTERN.matcher("CommitLog-7-1689642717704.log").matches());
+        
assertTrue(CdcUtil.SEGMENT_PATTERN.matcher("CommitLog-12345.log").matches());
+        
assertTrue(CdcUtil.SEGMENT_PATTERN.matcher("CommitLog-2-1340512736956320000.log").matches());
+        
assertTrue(CdcUtil.SEGMENT_PATTERN.matcher("CommitLog-2-1340512736959990000.log").matches());
+        assertTrue(CdcUtil.isValid("CommitLog-7-1689642717704.log"));
+        assertTrue(CdcUtil.isValid("CommitLog-12345.log"));
+        assertTrue(CdcUtil.isValid("CommitLog-2-1340512736956320000.log"));
+        assertTrue(CdcUtil.isValid("CommitLog-2-1340512736959990000.log"));
+        assertTrue(CdcUtil.isLogFile("CommitLog-7-1689642717704.log"));
+        assertTrue(CdcUtil.isLogFile("CommitLog-12345.log"));
+        assertTrue(CdcUtil.isLogFile("CommitLog-2-1340512736956320000.log"));
+        assertTrue(CdcUtil.isLogFile("CommitLog-2-1340512736959990000.log"));
+        
assertTrue(CdcUtil.IDX_FILE_PATTERN.matcher("CommitLog-7-1689642717704_cdc.idx").matches());
+        
assertTrue(CdcUtil.IDX_FILE_PATTERN.matcher("CommitLog-12345_cdc.idx").matches());
+        
assertTrue(CdcUtil.IDX_FILE_PATTERN.matcher("CommitLog-2-1240512736956320000_cdc.idx").matches());
+        
assertTrue(CdcUtil.IDX_FILE_PATTERN.matcher("CommitLog-2-1340512736956320000_cdc.idx").matches());
+
+        assertFalse(CdcUtil.isValid("CommitLog-abc.log"));
+        assertFalse(CdcUtil.isValid("abc-7-1689642717704.log"));
+        assertFalse(CdcUtil.isValid("abc-1689642717704.log"));
+        assertFalse(CdcUtil.isLogFile("CommitLog-abc.log"));
+        assertFalse(CdcUtil.isLogFile("abc-7-1689642717704.log"));
+        assertFalse(CdcUtil.isLogFile("abc-1689642717704.log"));
+        assertFalse(CdcUtil.isLogFile("CommitLog-7-1689642717704"));
+        assertFalse(CdcUtil.isLogFile("CommitLog-12345"));
+        assertFalse(CdcUtil.isLogFile("CommitLog-2-1340512736956320000"));
+        assertFalse(CdcUtil.isLogFile("CommitLog-2-1340512736959990000"));
+    }
+
+    @Test
+    public void testExtractSegmentIdMatcher()
+    {
+        assertEquals(12345L, CdcUtil.parseSegmentId("CommitLog-12345.log"));
+        assertEquals(1689642717704L, 
CdcUtil.parseSegmentId("CommitLog-7-1689642717704.log"));
+        assertEquals(1340512736956320000L, 
CdcUtil.parseSegmentId("CommitLog-2-1340512736956320000.log"));
+        assertEquals(1340512736959990000L, 
CdcUtil.parseSegmentId("CommitLog-2-1340512736959990000.log"));
+        assertEquals(12345L, CdcUtil.parseSegmentId("CommitLog-6-12345.log"));
+        assertEquals(1646094405659L, 
CdcUtil.parseSegmentId("CommitLog-7-1646094405659.log"));
+        assertEquals(1646094405659L, 
CdcUtil.parseSegmentId("CommitLog-1646094405659.log"));
+    }
+
+    @Test
+    public void testIdxToLogFileName()
+    {
+        assertEquals("CommitLog-7-1689642717704.log", 
CdcUtil.idxToLogFileName("CommitLog-7-1689642717704_cdc.idx"));
+        assertEquals("CommitLog-12345.log", 
CdcUtil.idxToLogFileName("CommitLog-12345_cdc.idx"));
+        assertEquals("CommitLog-2-1240512736956320000.log", 
CdcUtil.idxToLogFileName("CommitLog-2-1240512736956320000_cdc.idx"));
+        assertEquals("CommitLog-2-1340512736956320000.log", 
CdcUtil.idxToLogFileName("CommitLog-2-1340512736956320000_cdc.idx"));
+    }
+}
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/utils/FileUtilsTest.java 
b/server/src/test/java/org/apache/cassandra/sidecar/utils/FileUtilsTest.java
index eba7be16..f545d529 100644
--- a/server/src/test/java/org/apache/cassandra/sidecar/utils/FileUtilsTest.java
+++ b/server/src/test/java/org/apache/cassandra/sidecar/utils/FileUtilsTest.java
@@ -18,8 +18,11 @@
 
 package org.apache.cassandra.sidecar.utils;
 
+import java.util.Objects;
+
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
@@ -37,4 +40,25 @@ class FileUtilsTest
         
assertThat(FileUtils.maybeResolveHomeDirectory("~/.ccm")).isEqualTo(System.getProperty("user.home")
 + "/.ccm");
         
assertThat(FileUtils.maybeResolveHomeDirectory("/dev/null")).isEqualTo("/dev/null");
     }
+
+    @Test
+    public void testStorageStringPatterns()
+    {
+        assertEquals(1, FileUtils.storageUnitToBytes("B"));
+        assertEquals(1024, FileUtils.storageUnitToBytes("KiB"));
+        assertEquals(1048576, FileUtils.storageUnitToBytes("MiB"));
+        assertEquals(1073741824, FileUtils.storageUnitToBytes("GiB"));
+
+        assertEquals(1048576L, FileUtils.mbStringToBytes("1"));
+        assertEquals(524288000L, FileUtils.mbStringToBytes("500"));
+
+        assertEquals(1L, 
Objects.requireNonNull(FileUtils.storageStringToBytes("1")).longValue());
+        assertEquals(1L, 
Objects.requireNonNull(FileUtils.storageStringToBytes("1B")).longValue());
+        assertEquals(500L, 
Objects.requireNonNull(FileUtils.storageStringToBytes("500B")).longValue());
+        assertEquals(1024, 
Objects.requireNonNull(FileUtils.storageStringToBytes("1KiB")).longValue());
+        assertEquals(1048576, 
Objects.requireNonNull(FileUtils.storageStringToBytes("1024KiB")).longValue());
+        assertEquals(1048576, 
Objects.requireNonNull(FileUtils.storageStringToBytes("1MiB")).longValue());
+        assertEquals(4294967296L, 
Objects.requireNonNull(FileUtils.storageStringToBytes("4096MiB")).longValue());
+        assertEquals(536870912000L, 
Objects.requireNonNull(FileUtils.storageStringToBytes("500GiB")).longValue());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to