This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push: new 08657484 CASSSIDECAR-220: Add CdcRawDirectorySpaceCleaner for tracking and cleaning up the cdc_… (#203) 08657484 is described below commit 08657484d8d280721e557612bc6ab1af7a2a0424 Author: jberragan <jberra...@gmail.com> AuthorDate: Wed Apr 9 15:41:15 2025 -0700 CASSSIDECAR-220: Add CdcRawDirectorySpaceCleaner for tracking and cleaning up the cdc_… (#203) Patch by James Berragan; Reviewed by Bernardo Botella, Jyothsna Konisa, Yifan Cai for CASSSIDECAR-220 --- .../cassandra/sidecar/config/CdcConfiguration.java | 40 +- .../sidecar/config/yaml/CdcConfigurationImpl.java | 188 ++++++++- .../sidecar/db/SystemViewsDatabaseAccessor.java | 91 ++++ .../sidecar/db/schema/SystemViewsSchema.java | 68 +++ .../cassandra/sidecar/metrics/CdcMetrics.java | 66 +++ .../cassandra/sidecar/metrics/ServerMetrics.java | 5 + .../sidecar/metrics/ServerMetricsImpl.java | 8 + .../cassandra/sidecar/modules/CdcModule.java | 16 + .../modules/multibindings/PeriodicTaskMapKeys.java | 1 + .../modules/multibindings/TableSchemaMapKeys.java | 1 + .../sidecar/tasks/CdcRawDirectorySpaceCleaner.java | 469 +++++++++++++++++++++ .../apache/cassandra/sidecar/utils/CdcUtil.java | 36 +- .../apache/cassandra/sidecar/utils/FileUtils.java | 67 +++ .../cassandra/sidecar/utils/TimeProvider.java | 21 +- .../db/SystemViewsDatabaseAccessorIntTest.java | 45 ++ .../cassandra/sidecar/db/SidecarSchemaTest.java | 4 +- .../tasks/CdcRawDirectorySpaceCleanerTest.java | 188 +++++++++ .../cassandra/sidecar/utils/CdcUtilTest.java | 84 ++++ .../cassandra/sidecar/utils/FileUtilsTest.java | 24 ++ 19 files changed, 1416 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/apache/cassandra/sidecar/config/CdcConfiguration.java b/server/src/main/java/org/apache/cassandra/sidecar/config/CdcConfiguration.java index a18b7735..314b1f15 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/config/CdcConfiguration.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/config/CdcConfiguration.java @@ -18,6 +18,7 @@ package org.apache.cassandra.sidecar.config; import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; + import org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration; /** @@ -31,14 +32,49 @@ public interface CdcConfiguration SecondBoundConfiguration segmentHardLinkCacheExpiry(); /** - * * @return true if cdc feature is enabled */ boolean isEnabled(); /** - * * @return how frequently CDC configs are to be refreshed */ MillisecondBoundConfiguration cdcConfigRefreshTime(); + + /* CdcRawDirectorySpaceCleaner Configuration */ + + /** + * @return the cadence at which the CdcRawDirectorySpaceCleaner period task should run to check and clean-up old `cdc_raw` log segments. + */ + SecondBoundConfiguration cdcRawDirectorySpaceCleanerFrequency(); + + /** + * @return `true` if CdcRawDirectorySpaceCleaner should monitor the `cdc_raw` directory and clean up the oldest commit log segments. + */ + boolean enableCdcRawDirectoryRoutineCleanUp(); + + /** + * @return fallback value for maximum directory size in bytes for the `cdc_raw` directory when can't be read from `system_views.settings` table. + */ + long fallbackCdcRawDirectoryMaxSizeBytes(); + + /** + * @return max percent usage of the cdc_raw directory before CdcRawDirectorySpaceCleaner starts removing the oldest segments. + */ + float cdcRawDirectoryMaxPercentUsage(); + + /** + * @return the critical time period in seconds that indicates the `cdc_raw` directory is not large enough to buffer this time-window of mutations. + */ + SecondBoundConfiguration cdcRawDirectoryCriticalBufferWindow(); + + /** + * @return the low time period in seconds that indicates the `cdc_raw` directory is not large enough to buffer this time-window of mutations. + */ + SecondBoundConfiguration cdcRawDirectoryLowBufferWindow(); + + /** + * @return the time period which the CdcRawDirectorySpaceCleaner should cache the cdc_total_space before refreshing. + */ + SecondBoundConfiguration cacheMaxUsage(); } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CdcConfigurationImpl.java b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CdcConfigurationImpl.java index 0121e4ec..089a7ac0 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CdcConfigurationImpl.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CdcConfigurationImpl.java @@ -41,6 +41,27 @@ public class CdcConfigurationImpl implements CdcConfiguration MillisecondBoundConfiguration.parse("30s"); public static final SecondBoundConfiguration DEFAULT_SEGMENT_HARD_LINK_CACHE_EXPIRY = SecondBoundConfiguration.parse("5m"); + public static final String CDC_RAW_CLEANER_FREQUENCY_PROPERTY = "cdc_raw_cleaner_frequency"; + public static final SecondBoundConfiguration DEFAULT_CDC_RAW_CLEANER_FREQUENCY = + SecondBoundConfiguration.parse("1m"); + + public static final String ENABLE_CDC_RAW_CLEANER_PROPERTY = "enable_cdc_raw_cleaner"; + public static final boolean DEFAULT_ENABLE_CDC_RAW_CLEANER_PROPERTY = true; + + public static final String FALLBACK_CDC_RAW_MAX_DIRECTORY_SIZE_BYTES = "fallback_cdc_raw_max_directory_size_bytes"; + public static final long DEFAULT_FALLBACK_CDC_RAW_MAX_DIRECTORY_SIZE_BYTES = 1L << 31; // 2 GiB + + public static final String CDC_RAW_MAX_DIRECTORY_MAX_PERCENT = "cdc_raw_max_directory_max_percent"; + public static final float DEFAULT_CDC_RAW_MAX_DIRECTORY_MAX_PERCENT = 1.0f; + + public static final String CDC_RAW_MAX_CRITICAL_BUFFER_WINDOW = "cdc_raw_critical_buffer_window"; + public static final SecondBoundConfiguration DEFAULT_CDC_RAW_MAX_CRITICAL_BUFFER_WINDOW = SecondBoundConfiguration.parse("15m"); + + public static final String CDC_RAW_MAX_LOW_BUFFER_WINDOW = "cdc_raw_low_buffer_window"; + public static final SecondBoundConfiguration DEFAULT_CDC_RAW_MAX_LOW_BUFFER_WINDOW = SecondBoundConfiguration.parse("60m"); + + public static final String CDC_CACHE_MAX_USAGE_DURATION = "cdc_raw_cache_max_usage_duration"; + public static final SecondBoundConfiguration DEFAULT_CDC_CACHE_MAX_USAGE_DURATION = SecondBoundConfiguration.parse("15m"); @JsonProperty(value = IS_ENABLED_PROPERTY) private final boolean isEnabled; @@ -48,22 +69,74 @@ public class CdcConfigurationImpl implements CdcConfiguration private final MillisecondBoundConfiguration cdcConfigRefreshTime; @JsonProperty(value = SEGMENT_HARD_LINK_CACHE_EXPIRY_PROPERTY) private SecondBoundConfiguration segmentHardLinkCacheExpiry; - + @JsonProperty(value = CDC_RAW_CLEANER_FREQUENCY_PROPERTY) + private SecondBoundConfiguration cdcRawCleanerFrequency; + @JsonProperty(value = ENABLE_CDC_RAW_CLEANER_PROPERTY) + private boolean enableCdcRawCleaner; + @JsonProperty(value = FALLBACK_CDC_RAW_MAX_DIRECTORY_SIZE_BYTES) + private long fallbackCdcRawMaxDirectorySize; + @JsonProperty(value = CDC_RAW_MAX_DIRECTORY_MAX_PERCENT) + private float cdcRawMaxPercent; + @JsonProperty(value = CDC_RAW_MAX_CRITICAL_BUFFER_WINDOW) + private SecondBoundConfiguration cdcRawCriticalBufferWindow; + @JsonProperty(value = CDC_RAW_MAX_LOW_BUFFER_WINDOW) + private SecondBoundConfiguration cdcRawLowBufferWindow; + @JsonProperty(value = CDC_CACHE_MAX_USAGE_DURATION) + private SecondBoundConfiguration cacheMaxUsage; public CdcConfigurationImpl() { this.segmentHardLinkCacheExpiry = DEFAULT_SEGMENT_HARD_LINK_CACHE_EXPIRY; this.cdcConfigRefreshTime = DEFAULT_CDC_CONFIG_REFRESH_TIME; this.isEnabled = DEFAULT_IS_ENABLED; + this.cdcRawCleanerFrequency = DEFAULT_CDC_RAW_CLEANER_FREQUENCY; + this.enableCdcRawCleaner = DEFAULT_ENABLE_CDC_RAW_CLEANER_PROPERTY; + this.fallbackCdcRawMaxDirectorySize = DEFAULT_FALLBACK_CDC_RAW_MAX_DIRECTORY_SIZE_BYTES; + this.cdcRawMaxPercent = DEFAULT_CDC_RAW_MAX_DIRECTORY_MAX_PERCENT; + this.cdcRawCriticalBufferWindow = DEFAULT_CDC_RAW_MAX_CRITICAL_BUFFER_WINDOW; + this.cdcRawLowBufferWindow = DEFAULT_CDC_RAW_MAX_LOW_BUFFER_WINDOW; + this.cacheMaxUsage = DEFAULT_CDC_CACHE_MAX_USAGE_DURATION; } public CdcConfigurationImpl(boolean isEnabled, MillisecondBoundConfiguration cdcConfigRefreshTime, SecondBoundConfiguration segmentHardLinkCacheExpiry) + { + this( + isEnabled, + cdcConfigRefreshTime, + segmentHardLinkCacheExpiry, + DEFAULT_CDC_RAW_CLEANER_FREQUENCY, + DEFAULT_ENABLE_CDC_RAW_CLEANER_PROPERTY, + DEFAULT_FALLBACK_CDC_RAW_MAX_DIRECTORY_SIZE_BYTES, + DEFAULT_CDC_RAW_MAX_DIRECTORY_MAX_PERCENT, + DEFAULT_CDC_RAW_MAX_CRITICAL_BUFFER_WINDOW, + DEFAULT_CDC_RAW_MAX_LOW_BUFFER_WINDOW, + DEFAULT_CDC_CACHE_MAX_USAGE_DURATION + ); + } + + public CdcConfigurationImpl(boolean isEnabled, + MillisecondBoundConfiguration cdcConfigRefreshTime, + SecondBoundConfiguration segmentHardLinkCacheExpiry, + SecondBoundConfiguration cdcRawCleanerFrequency, + boolean enableCdcRawCleaner, + long cdcRawMaxDirectorySize, + float cdcRawMaxPercent, + SecondBoundConfiguration cdcRawCriticalBufferWindow, + SecondBoundConfiguration cdcRawLowBufferWindow, + SecondBoundConfiguration cacheMaxUsage) { this.isEnabled = isEnabled; this.cdcConfigRefreshTime = cdcConfigRefreshTime; this.segmentHardLinkCacheExpiry = segmentHardLinkCacheExpiry; + this.cdcRawCleanerFrequency = cdcRawCleanerFrequency; + this.enableCdcRawCleaner = enableCdcRawCleaner; + this.fallbackCdcRawMaxDirectorySize = cdcRawMaxDirectorySize; + this.cdcRawMaxPercent = cdcRawMaxPercent; + this.cdcRawCriticalBufferWindow = cdcRawCriticalBufferWindow; + this.cdcRawLowBufferWindow = cdcRawLowBufferWindow; + this.cacheMaxUsage = cacheMaxUsage; } @Override @@ -100,4 +173,117 @@ public class CdcConfigurationImpl implements CdcConfiguration LOGGER.warn("'segment_hardlink_cache_expiry_in_secs' is deprecated, use 'segment_hardlink_cache_expiry' instead"); this.segmentHardLinkCacheExpiry = new SecondBoundConfiguration(segmentHardlinkCacheExpiryInSecs, TimeUnit.SECONDS); } + + /** + * {@inheritDoc} + */ + @Override + @JsonProperty(value = CDC_RAW_CLEANER_FREQUENCY_PROPERTY) + public SecondBoundConfiguration cdcRawDirectorySpaceCleanerFrequency() + { + return cdcRawCleanerFrequency; + } + + @JsonProperty(value = CDC_RAW_CLEANER_FREQUENCY_PROPERTY) + public void setCdcRawDirectorySpaceCleanerFrequency(SecondBoundConfiguration cdcRawCleanerFrequency) + { + this.cdcRawCleanerFrequency = cdcRawCleanerFrequency; + } + + /** + * {@inheritDoc} + */ + @Override + @JsonProperty(value = ENABLE_CDC_RAW_CLEANER_PROPERTY) + public boolean enableCdcRawDirectoryRoutineCleanUp() + { + return enableCdcRawCleaner; + } + + @JsonProperty(value = ENABLE_CDC_RAW_CLEANER_PROPERTY) + public void setEnableCdcRawCleaner(boolean enableCdcRawCleaner) + { + this.enableCdcRawCleaner = enableCdcRawCleaner; + } + + + /** + * {@inheritDoc} + */ + @Override + @JsonProperty(value = FALLBACK_CDC_RAW_MAX_DIRECTORY_SIZE_BYTES) + public long fallbackCdcRawDirectoryMaxSizeBytes() + { + return fallbackCdcRawMaxDirectorySize; + } + + @JsonProperty(value = FALLBACK_CDC_RAW_MAX_DIRECTORY_SIZE_BYTES) + public void setFallbackCdcRawDirectoryMaxSizeBytes(long fallbackCdcRawMaxDirectorySize) + { + this.fallbackCdcRawMaxDirectorySize = fallbackCdcRawMaxDirectorySize; + } + + /** + * {@inheritDoc} + */ + @Override + @JsonProperty(value = CDC_RAW_MAX_DIRECTORY_MAX_PERCENT) + public float cdcRawDirectoryMaxPercentUsage() + { + return cdcRawMaxPercent; + } + + @JsonProperty(value = CDC_RAW_MAX_DIRECTORY_MAX_PERCENT) + public void setCdcRawDirectoryMaxPercentUsage(long cdcRawMaxPercent) + { + this.cdcRawMaxPercent = cdcRawMaxPercent; + } + + /** + * {@inheritDoc} + */ + @Override + @JsonProperty(value = CDC_RAW_MAX_CRITICAL_BUFFER_WINDOW) + public SecondBoundConfiguration cdcRawDirectoryCriticalBufferWindow() + { + return cdcRawCriticalBufferWindow; + } + + @JsonProperty(value = CDC_RAW_MAX_CRITICAL_BUFFER_WINDOW) + public void setCdcRawDirectoryCriticalBufferWindow(SecondBoundConfiguration cdcRawCriticalBufferWindow) + { + this.cdcRawCriticalBufferWindow = cdcRawCriticalBufferWindow; + } + + /** + * {@inheritDoc} + */ + @Override + @JsonProperty(value = CDC_RAW_MAX_LOW_BUFFER_WINDOW) + public SecondBoundConfiguration cdcRawDirectoryLowBufferWindow() + { + return cdcRawLowBufferWindow; + } + + @JsonProperty(value = CDC_RAW_MAX_LOW_BUFFER_WINDOW) + public void setCdcRawDirectoryLowBufferWindow(SecondBoundConfiguration cdcRawLowBufferWindow) + { + this.cdcRawLowBufferWindow = cdcRawLowBufferWindow; + } + + /** + * {@inheritDoc} + */ + @JsonProperty(value = CDC_CACHE_MAX_USAGE_DURATION) + @Override + public SecondBoundConfiguration cacheMaxUsage() + { + return cacheMaxUsage; + } + + @JsonProperty(value = CDC_CACHE_MAX_USAGE_DURATION) + public void setCacheMaxUsage(SecondBoundConfiguration cacheMaxUsage) + { + this.cacheMaxUsage = cacheMaxUsage; + } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessor.java b/server/src/main/java/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessor.java new file mode 100644 index 00000000..60280c26 --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessor.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.db; + +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.ResultSet; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; +import org.apache.cassandra.sidecar.db.schema.SystemViewsSchema; +import org.apache.cassandra.sidecar.exceptions.SchemaUnavailableException; +import org.apache.cassandra.sidecar.utils.FileUtils; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Database Accessor that queries cassandra to get information maintained under system_auth keyspace. + */ +@Singleton +public class SystemViewsDatabaseAccessor extends DatabaseAccessor<SystemViewsSchema> +{ + private static final String YAML_PROP_IN_MB = "cdc_total_space_in_mb"; + private static final String YAML_PROP_WITH_UNIT = "cdc_total_space"; // expects value with units e.g. "5MiB" + + @Inject + public SystemViewsDatabaseAccessor(SystemViewsSchema systemViewsSchema, + CQLSessionProvider sessionProvider) + { + super(systemViewsSchema, sessionProvider); + } + + @Nullable + public Long getCdcTotalSpaceSetting() throws SchemaUnavailableException + { + // attempt to parse Cassandra v4.0 'cdc_total_space_in_mb' yaml prop + Map<String, String> settings = getSettings(YAML_PROP_IN_MB, YAML_PROP_WITH_UNIT); + String cdcTotalSpaceInMb = settings.get(YAML_PROP_IN_MB); + if (cdcTotalSpaceInMb != null) + { + return FileUtils.mbStringToBytes(cdcTotalSpaceInMb); + } + + // otherwise parse current (v5.0+) 'cdc_total_space' yaml prop + String storageStringToBytes = settings.get(YAML_PROP_WITH_UNIT); + if (storageStringToBytes != null) + { + return FileUtils.storageStringToBytes(storageStringToBytes); + } + + return null; + } + + /** + * Load a setting values from the `system_views.settings` table. + * + * @param names names of settings + * @return map of setting values keyed on `name` loaded from the `system_views.settings` table. + */ + @NotNull + public Map<String, String> getSettings(String... names) throws SchemaUnavailableException + { + BoundStatement statement = tableSchema.selectSettings().bind(Arrays.asList(names)); + ResultSet result = execute(statement); + return result.all() + .stream() + .collect(Collectors.toMap( + row -> row.getString(0), + row -> row.getString(1)) + ); + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/db/schema/SystemViewsSchema.java b/server/src/main/java/org/apache/cassandra/sidecar/db/schema/SystemViewsSchema.java new file mode 100644 index 00000000..e640f42a --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/db/schema/SystemViewsSchema.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.db.schema; + +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.google.inject.Singleton; +import org.apache.cassandra.sidecar.exceptions.SchemaUnavailableException; +import org.jetbrains.annotations.NotNull; + +/** + * Schema for getting information stored in system_views keyspace. + */ +@Singleton +public class SystemViewsSchema extends CassandraSystemTableSchema +{ + protected static final String SYSTEM_VIEWS_KEYSPACE_NAME = "system_views"; + protected static final String SYSTEM_VIEWS_SETTINGS_TABLE_NAME = "settings"; + private PreparedStatement selectSettings; + + @Override + protected String keyspaceName() + { + return SYSTEM_VIEWS_KEYSPACE_NAME; + } + + @Override + protected String tableName() + { + return SYSTEM_VIEWS_SETTINGS_TABLE_NAME; + } + + public PreparedStatement selectSettings() throws SchemaUnavailableException + { + ensureSchemaAvailable(); + return selectSettings; + } + + @Override + protected void prepareStatements(@NotNull Session session) + { + this.selectSettings = session.prepare("SELECT name, value FROM system_views.settings WHERE name IN ?"); + } + + protected void ensureSchemaAvailable() throws SchemaUnavailableException + { + if (selectSettings == null) + { + throw new SchemaUnavailableException(keyspaceName(), tableName()); + } + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/metrics/CdcMetrics.java b/server/src/main/java/org/apache/cassandra/sidecar/metrics/CdcMetrics.java new file mode 100644 index 00000000..ac847d49 --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/metrics/CdcMetrics.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.metrics; + +import java.util.Objects; +import java.util.function.Function; + +import com.codahale.metrics.DefaultSettableGauge; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricRegistry; + +import static org.apache.cassandra.sidecar.metrics.ServerMetrics.SERVER_PREFIX; + +/** + * Tracks metrics related to cdc functionality provided by Sidecar. + */ +public class CdcMetrics +{ + public static final String DOMAIN = SERVER_PREFIX + ".Cdc"; + protected final MetricRegistry metricRegistry; + public final NamedMetric<DeltaGauge> cdcRawCleanerFailed; + public final NamedMetric<DeltaGauge> orphanedIdx; + public final NamedMetric<DefaultSettableGauge<Integer>> oldestSegmentAge; + public final NamedMetric<DeltaGauge> totalConsumedCdcBytes; + public final NamedMetric<DefaultSettableGauge<Long>> totalCdcSpaceUsed; + public final NamedMetric<DeltaGauge> deletedSegment; + public final NamedMetric<DeltaGauge> lowCdcRawSpace; + public final NamedMetric<DeltaGauge> criticalCdcRawSpace; + + public CdcMetrics(MetricRegistry metricRegistry) + { + this.metricRegistry = Objects.requireNonNull(metricRegistry, "Metric registry can not be null"); + this.cdcRawCleanerFailed = createMetric("CleanerFailed", name -> metricRegistry.gauge(name, DeltaGauge::new)); + this.totalConsumedCdcBytes = createMetric("TotalConsumedBytes", name -> metricRegistry.gauge(name, DeltaGauge::new)); + this.totalCdcSpaceUsed = createMetric("TotalSpaceUsed", name -> metricRegistry.gauge(name, () -> new DefaultSettableGauge<>(0L))); + this.orphanedIdx = createMetric("OrphanedIdxFile", name -> metricRegistry.gauge(name, DeltaGauge::new)); + this.deletedSegment = createMetric("DeletedSegment", name -> metricRegistry.gauge(name, DeltaGauge::new)); + this.oldestSegmentAge = createMetric("OldestSegmentAgeSeconds", name -> metricRegistry.gauge(name, () -> new DefaultSettableGauge<>(0))); + this.lowCdcRawSpace = createMetric("LowSpace", name -> metricRegistry.gauge(name, DeltaGauge::new)); + this.criticalCdcRawSpace = createMetric("CriticalSpace", name -> metricRegistry.gauge(name, DeltaGauge::new)); + } + + private <T extends Metric> NamedMetric<T> createMetric(String simpleName, Function<String, T> metricCreator) + { + return NamedMetric.builder(metricCreator) + .withDomain(DOMAIN) + .withName(simpleName) + .build(); + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java b/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java index ea8d7857..fb8be759 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java @@ -64,4 +64,9 @@ public interface ServerMetrics * @return metrics related to coordination functionality that are tracked */ CoordinationMetrics coordination(); + + /** + * @return metrics tracked by server for cdc functionality. + */ + CdcMetrics cdc(); } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetricsImpl.java b/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetricsImpl.java index 15e31e6a..2d4e7da1 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetricsImpl.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetricsImpl.java @@ -36,6 +36,7 @@ public class ServerMetricsImpl implements ServerMetrics protected final SchemaReportingMetrics schemaReportingMetrics; protected final CacheMetrics cacheMetrics; protected final CoordinationMetrics coordinationMetrics; + protected final CdcMetrics cdcMetrics; public ServerMetricsImpl(MetricRegistry metricRegistry) { @@ -48,6 +49,7 @@ public class ServerMetricsImpl implements ServerMetrics this.schemaReportingMetrics = new SchemaReportingMetrics(metricRegistry); this.cacheMetrics = new CacheMetrics(metricRegistry); this.coordinationMetrics = new CoordinationMetrics(metricRegistry); + this.cdcMetrics = new CdcMetrics(metricRegistry); } @Override @@ -91,4 +93,10 @@ public class ServerMetricsImpl implements ServerMetrics { return coordinationMetrics; } + + @Override + public CdcMetrics cdc() + { + return cdcMetrics; + } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java index 3b6da2c4..322b80bf 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java @@ -35,6 +35,7 @@ import org.apache.cassandra.sidecar.coordination.SidecarPeerHealthMonitorTask; import org.apache.cassandra.sidecar.coordination.SidecarPeerHealthProvider; import org.apache.cassandra.sidecar.coordination.SidecarPeerProvider; import org.apache.cassandra.sidecar.db.schema.ConfigsSchema; +import org.apache.cassandra.sidecar.db.schema.SystemViewsSchema; import org.apache.cassandra.sidecar.db.schema.TableSchema; import org.apache.cassandra.sidecar.handlers.cdc.AllServiceConfigHandler; import org.apache.cassandra.sidecar.handlers.cdc.DeleteServiceConfigHandler; @@ -47,6 +48,7 @@ import org.apache.cassandra.sidecar.modules.multibindings.TableSchemaMapKeys; import org.apache.cassandra.sidecar.modules.multibindings.VertxRouteMapKeys; import org.apache.cassandra.sidecar.routes.RouteBuilder; import org.apache.cassandra.sidecar.routes.VertxRoute; +import org.apache.cassandra.sidecar.tasks.CdcRawDirectorySpaceCleaner; import org.apache.cassandra.sidecar.tasks.PeriodicTask; import org.apache.cassandra.sidecar.utils.SidecarClientProvider; @@ -63,6 +65,13 @@ public class CdcModule extends AbstractModule return task; } + @ProvidesIntoMap + @KeyClassMapKey(PeriodicTaskMapKeys.CdcRawDirectorySpaceCleanerTaskKey.class) + PeriodicTask cdcRawDirectorySpaceCleanercPeriodicTask(CdcRawDirectorySpaceCleaner cleanerTask) + { + return cleanerTask; + } + @ProvidesIntoMap @KeyClassMapKey(TableSchemaMapKeys.ConfigsSchemaKey.class) TableSchema configsSchema(ServiceConfiguration serviceConfiguration) @@ -70,6 +79,13 @@ public class CdcModule extends AbstractModule return new ConfigsSchema(serviceConfiguration); } + @ProvidesIntoMap + @KeyClassMapKey(TableSchemaMapKeys.SystemViewsSchemaKey.class) + TableSchema systemViewssSchema(SystemViewsSchema schema) + { + return schema; + } + @ProvidesIntoMap @KeyClassMapKey(VertxRouteMapKeys.ListCdcSegmentsRouteKey.class) VertxRoute listCdcSegmentsRoute(RouteBuilder.Factory factory, diff --git a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/PeriodicTaskMapKeys.java b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/PeriodicTaskMapKeys.java index 7099c7dc..57a4545b 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/PeriodicTaskMapKeys.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/PeriodicTaskMapKeys.java @@ -33,4 +33,5 @@ public interface PeriodicTaskMapKeys interface SchemaReportingTaskKey extends ClassKey {} interface SidecarPeerHealthMonitorTaskKey extends ClassKey {} interface SidecarSchemaInitializerTaskKey extends ClassKey {} + interface CdcRawDirectorySpaceCleanerTaskKey extends ClassKey {} } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/TableSchemaMapKeys.java b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/TableSchemaMapKeys.java index fdf2ee44..c3251e33 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/TableSchemaMapKeys.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/TableSchemaMapKeys.java @@ -30,4 +30,5 @@ public interface TableSchemaMapKeys interface SidecarLeaseSchemaKey extends ClassKey {} interface SidecarRolePermissionsSchemaKey extends ClassKey {} interface SystemAuthSchemaKey extends ClassKey {} + interface SystemViewsSchemaKey extends ClassKey {} } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleaner.java b/server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleaner.java new file mode 100644 index 00000000..ba5bb3ed --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleaner.java @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.tasks; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.Promise; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.server.utils.DurationSpec; +import org.apache.cassandra.sidecar.config.CdcConfiguration; +import org.apache.cassandra.sidecar.config.ServiceConfiguration; +import org.apache.cassandra.sidecar.db.SystemViewsDatabaseAccessor; +import org.apache.cassandra.sidecar.exceptions.SchemaUnavailableException; +import org.apache.cassandra.sidecar.metrics.CdcMetrics; +import org.apache.cassandra.sidecar.metrics.SidecarMetrics; +import org.apache.cassandra.sidecar.utils.CdcUtil; +import org.apache.cassandra.sidecar.utils.FileUtils; +import org.apache.cassandra.sidecar.utils.TimeProvider; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.cassandra.sidecar.utils.CdcUtil.isLogFile; +import static org.apache.cassandra.sidecar.utils.CdcUtil.parseSegmentId; + +/** + * PeriodTask to monitor and remove the oldest commit log segments in the `cdc_raw` directory + * when the space used hits the `cdc_total_space` limit set in the yaml file. + */ +@Singleton +public class CdcRawDirectorySpaceCleaner implements PeriodicTask +{ + private static final Logger LOGGER = LoggerFactory.getLogger(CdcRawDirectorySpaceCleaner.class); + + public static final String CDC_DIR_NAME = "cdc_raw"; + + private final TimeProvider timeProvider; + private final SystemViewsDatabaseAccessor systemViewsDatabaseAccessor; + private final CdcConfiguration cdcConfiguration; + private final InstancesMetadata instancesMetadata; + private final CdcMetrics cdcMetrics; + + // non-volatile variables, PeriodicTaskExecutor should ensure memory visibility + @Nullable + private Long maxUsageBytes = null; + // lazily loaded from system_views.settings if available + private Long maxUsageLastReadNanos = null; + // cdc file -> file size in bytes. It memorizes the file set of the last time the checker runs. + private Map<CdcRawSegmentFile, Long> priorCdcFiles = new HashMap<>(); + + @Inject + public CdcRawDirectorySpaceCleaner(TimeProvider timeProvider, + SystemViewsDatabaseAccessor systemViewsDatabaseAccessor, + ServiceConfiguration serviceConfiguration, + InstancesMetadata instancesMetadata, + SidecarMetrics metrics) + { + this.timeProvider = timeProvider; + this.systemViewsDatabaseAccessor = systemViewsDatabaseAccessor; + this.cdcConfiguration = serviceConfiguration.cdcConfiguration(); + this.instancesMetadata = instancesMetadata; + this.cdcMetrics = metrics.server().cdc(); + } + + @Override + public DurationSpec delay() + { + return cdcConfiguration.cdcRawDirectorySpaceCleanerFrequency(); + } + + @Override + public void execute(Promise<Void> promise) + { + try + { + routineCleanUp(); + promise.tryComplete(); + } + catch (Throwable t) + { + LOGGER.warn("Failed to perform routine clean-up of cdc_raw directory", t); + cdcMetrics.cdcRawCleanerFailed.metric.update(1L); + promise.fail(t); + } + } + + /** + * @return true if we need to refresh the cached `cdc_total_space` value. + */ + protected boolean shouldRefreshCachedMaxUsage() + { + return maxUsageLastReadNanos == null || + (timeProvider.nanoTime() - maxUsageLastReadNanos) >= + TimeUnit.MILLISECONDS.toNanos(cdcConfiguration.cacheMaxUsage().toMillis()); + } + + protected long maxUsage() + { + if (!shouldRefreshCachedMaxUsage()) + { + return Objects.requireNonNull(maxUsageBytes, + "maxUsageBytes cannot be null if maxUsageLastReadNanos is non-null"); + } + + try + { + Long newValue = systemViewsDatabaseAccessor.getCdcTotalSpaceSetting(); + if (newValue != null) + { + if (!newValue.equals(maxUsageBytes)) + { + LOGGER.info( + "Change in cdc_total_space from system_views.settings prev={} latest={}", + maxUsageBytes, newValue); + this.maxUsageBytes = newValue; + this.maxUsageLastReadNanos = timeProvider.nanoTime(); + return newValue; + } + } + } + catch (SchemaUnavailableException e) + { + LOGGER.debug("Could not read cdc_total_space from system_views.settings", e); + } + catch (Throwable t) + { + LOGGER.warn("Error reading cdc_total_space from system_views.settings", t); + } + + LOGGER.warn("Could not read cdc_total_space from system_views.settings, falling back to props"); + return cdcConfiguration.fallbackCdcRawDirectoryMaxSizeBytes(); + } + + @Override + public ScheduleDecision scheduleDecision() + { + if (cdcConfiguration.enableCdcRawDirectoryRoutineCleanUp()) + { + return ScheduleDecision.EXECUTE; + } + LOGGER.debug("Skipping CdcRawDirectorySpaceCleaner: feature is disabled"); + return ScheduleDecision.SKIP; + } + + protected void routineCleanUp() + { + for (InstanceMetadata instanceMetadata : instancesMetadata.instances()) + { + List<String> dataDirectories = instanceMetadata.dataDirs(); + dataDirectories.stream() + .map(dir -> new File(dir, CDC_DIR_NAME)) + .forEach(this::cleanUpCdcRawDirectory); + } + } + + protected void cleanUpCdcRawDirectory(File cdcRawDirectory) + { + if (!cdcRawDirectory.exists() || !cdcRawDirectory.isDirectory()) + { + LOGGER.debug("Skipping CdcRawDirectorySpaceCleaner: CDC directory does not exist: {}", cdcRawDirectory); + return; + } + + List<CdcRawSegmentFile> segmentFiles = Optional + .ofNullable( + cdcRawDirectory.listFiles(this::validSegmentFilter)) + .map(files -> Arrays.stream(files) + .map(CdcRawSegmentFile::new) + .filter( + CdcRawSegmentFile::indexExists) + .collect(Collectors.toList()) + ) + .orElseGet(List::of); + publishCdcStats(segmentFiles); + if (segmentFiles.size() < 2) + { + LOGGER.debug("Skipping cdc data cleaner routine cleanup: No cdc data or only one single cdc segment is found."); + return; + } + + long directorySize = FileUtils.directorySize(cdcRawDirectory); + long upperLimitBytes = + (long) (maxUsage() * cdcConfiguration.cdcRawDirectoryMaxPercentUsage()); + // Sort the files by segmentId to delete commit log segments in write order + // The latest file is the current active segment, but it could be created before the retention duration, e.g. slow data ingress + Collections.sort(segmentFiles); + long nowInMillis = timeProvider.currentTimeMillis(); + + // track the age of the oldest commit log segment to give indication of the time-window buffer available + cdcMetrics.oldestSegmentAge.metric.setValue( + (int) MILLISECONDS.toSeconds(nowInMillis - segmentFiles.get(0).lastModified())); + + if (directorySize > upperLimitBytes) + { + if (segmentFiles.get(0).segmentId > segmentFiles.get(1).segmentId) + { + LOGGER.error("Cdc segments sorted incorrectly {} before {}", + segmentFiles.get(0).segmentId, segmentFiles.get(1).segmentId); + } + + long criticalMillis = cdcConfiguration.cdcRawDirectoryCriticalBufferWindow().toMillis(); + long lowMillis = cdcConfiguration.cdcRawDirectoryLowBufferWindow().toMillis(); + + // we keep the last commit log segment as it may still be actively written to + int i = 0; + while (i < segmentFiles.size() - 1 && directorySize > upperLimitBytes) + { + CdcRawSegmentFile segment = segmentFiles.get(i); + long ageMillis = nowInMillis - segment.lastModified(); + + if (ageMillis < criticalMillis) + { + LOGGER.error("Insufficient Cdc buffer size to maintain {}-minute window segment={} maxSize={} ageMinutes={}", + MILLISECONDS.toMinutes(criticalMillis), segment, upperLimitBytes, + MILLISECONDS.toMinutes(ageMillis)); + cdcMetrics.criticalCdcRawSpace.metric.update(1); + } + else if (ageMillis < lowMillis) + { + LOGGER.warn("Insufficient Cdc buffer size to maintain {}-minute window segment={} maxSize={} ageMinutes={}", + MILLISECONDS.toMinutes(lowMillis), segment, upperLimitBytes, + MILLISECONDS.toMinutes(ageMillis)); + cdcMetrics.lowCdcRawSpace.metric.update(1); + } + long length = 0; + try + { + length = deleteSegment(segment); + cdcMetrics.deletedSegment.metric.update(length); + } + catch (IOException e) + { + LOGGER.warn("Failed to delete cdc segment", e); + } + directorySize -= length; + i++; + } + } + + try + { + cleanupOrphanedIdxFiles(cdcRawDirectory); + } + catch (IOException e) + { + LOGGER.warn("Failed to clean up orphaned idx files", e); + } + } + + protected boolean validSegmentFilter(File file) + { + return file.isFile() && isLogFile(file.getName()); + } + + protected long deleteSegment(CdcRawSegmentFile segment) throws IOException + { + final long numBytes = segment.length() + segment.indexLength(); + LOGGER.info("Deleting Cdc segment path={} lastModified={} numBytes={}", segment, + segment.lastModified(), numBytes); + Files.deleteIfExists(segment.path()); + Files.deleteIfExists(segment.indexPath()); + return numBytes; + } + + // runs optionally if detects orphaned and old index files + private void cleanupOrphanedIdxFiles(File cdcDir) throws IOException + { + final File[] indexFiles = + cdcDir.listFiles(f -> f.isFile() && CdcUtil.isValidIdxFile(f.getName())); + if (indexFiles == null || indexFiles.length == 0) + return; // exit early when finding no index files + + final File[] cdcSegments = + cdcDir.listFiles(f -> f.isFile() && CdcUtil.isLogFile(f.getName())); + Set<String> cdcFileNames = Set.of(); + if (cdcSegments != null) + { + cdcFileNames = new HashSet<>(cdcSegments.length); + for (File f : cdcSegments) + { + cdcFileNames.add(f.getName()); + } + } + + // now, delete all old index files that have no corresponding log files. + for (File idxFile : indexFiles) + { + final String cdcFileName = CdcUtil.idxToLogFileName(idxFile.getName()); + if (!cdcFileNames.contains(cdcFileName)) + { // found an orphaned index file + LOGGER.warn("Orphaned Cdc idx file found with no corresponding Cdc segment path={}", + idxFile.toPath()); + cdcMetrics.orphanedIdx.metric.update(1L); + Files.deleteIfExists(idxFile.toPath()); + } + } + } + + private void publishCdcStats(@Nullable List<CdcRawSegmentFile> cdcFiles) + { + // no cdc data consumed or exist + boolean noCdcFiles = cdcFiles == null || cdcFiles.isEmpty(); + if (noCdcFiles && priorCdcFiles.isEmpty()) + return; + + Map<CdcRawSegmentFile, Long> currentFiles; + long totalCurrentBytes = 0L; + if (noCdcFiles) + { + currentFiles = new HashMap<>(); + } + else + { + currentFiles = new HashMap<>(cdcFiles.size()); + for (CdcRawSegmentFile segment : cdcFiles) + { + if (segment.exists()) + { + long len = segment.length(); + currentFiles.put(segment, len); + totalCurrentBytes += len; + } + } + } + + // skip publishing. there is no cdc data consumed and no data exist. + if (totalCurrentBytes == 0L && priorCdcFiles.isEmpty()) + { + priorCdcFiles = currentFiles; + return; + } + + // consumed files is the files exist in the prior round but now are deleted. + Set<CdcRawSegmentFile> consumedFiles = + Sets.difference(priorCdcFiles.keySet(), currentFiles.keySet()); + long totalConsumedBytes = + consumedFiles.stream().map(priorCdcFiles::get).reduce(0L, Long::sum); + priorCdcFiles.clear(); + priorCdcFiles = currentFiles; + cdcMetrics.totalConsumedCdcBytes.metric.update(totalConsumedBytes); + cdcMetrics.totalCdcSpaceUsed.metric.setValue(totalCurrentBytes); + } + + /** + * Helper class for the CdcRawDirectorySpaceCleaner to track log segment files and associated idx file in the cdc_raw directory + */ + protected static class CdcRawSegmentFile implements Comparable<CdcRawSegmentFile> + { + private final File file; + private final File indexFile; + private final long segmentId; + private final long len; + + CdcRawSegmentFile(File logFile) + { + this.file = logFile; + final String name = logFile.getName(); + this.segmentId = parseSegmentId(name); + this.len = logFile.length(); + this.indexFile = CdcUtil.getIdxFile(logFile); + } + + public boolean exists() + { + return file.exists(); + } + + public boolean indexExists() + { + return indexFile.exists(); + } + + public long length() + { + return len; + } + + public long indexLength() + { + return indexFile.length(); + } + + public long lastModified() + { + return file.lastModified(); + } + + public Path path() + { + return file.toPath(); + } + + public Path indexPath() + { + return indexFile.toPath(); + } + + @Override + public int compareTo(@NotNull CdcRawSegmentFile o) + { + return Long.compare(segmentId, o.segmentId); + } + + @Override + public int hashCode() + { + return file.hashCode(); + } + + @Override + public boolean equals(Object other) + { + if (this == other) + { + return true; + } + if (other == null || this.getClass() != other.getClass()) + { + return false; + } + + CdcRawSegmentFile that = (CdcRawSegmentFile) other; + return file.equals(that.file); + } + + @Override + public String toString() + { + return file.getAbsolutePath(); + } + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/utils/CdcUtil.java b/server/src/main/java/org/apache/cassandra/sidecar/utils/CdcUtil.java index 91aa3029..d7ca8d9d 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/utils/CdcUtil.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/utils/CdcUtil.java @@ -22,6 +22,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.List; +import java.util.regex.Matcher; import java.util.regex.Pattern; /** @@ -37,7 +38,8 @@ public final class CdcUtil private static final int IDX_FILE_EXTENSION_LENGTH = IDX_FILE_EXTENSION.length(); private static final String LOG_FILE_COMPLETE_INDICATOR = "COMPLETED"; private static final String FILENAME_EXTENSION = "(" + IDX_FILE_EXTENSION + "|" + LOG_FILE_EXTENSION + ")"; - private static final Pattern SEGMENT_PATTERN = Pattern.compile(FILENAME_PREFIX + "((\\d+)(" + SEPARATOR + "\\d+)?)" + FILENAME_EXTENSION); + static final Pattern SEGMENT_PATTERN = Pattern.compile(FILENAME_PREFIX + "(?:\\d+" + SEPARATOR + ")?" + "(\\d+)" + FILENAME_EXTENSION); + public static final Pattern IDX_FILE_PATTERN = Pattern.compile(FILENAME_PREFIX + "(?:\\d+" + SEPARATOR + ")?" + "(\\d+)" + IDX_FILE_EXTENSION); private static final int READ_INDEX_FILE_MAX_RETRY = 5; @@ -102,6 +104,28 @@ public final class CdcUtil return new CdcIndex(latestPosition, isCompleted); } + /** + * @param idxFileName Commit log segment idx filename + * @return log segment filename for associated idx file + */ + public static String idxToLogFileName(String idxFileName) + { + return idxFileName.substring(0, idxFileName.length() - IDX_FILE_EXTENSION.length()) + LOG_FILE_EXTENSION; + } + + public static long parseSegmentId(String name) + { + final Matcher matcher = SEGMENT_PATTERN.matcher(name); + if (matcher.matches()) + { + return Long.parseLong(matcher.group(1)); + } + else + { + throw new IllegalStateException("Invalid CommitLog name: " + name); + } + } + /** * Class representing Cdc index */ @@ -119,6 +143,7 @@ public final class CdcUtil /** * Validate for the cdc (log or index) file name.see {@link SEGMENT_PATTERN} for the format + * * @param fileName name of the file * @return true if the name is valid; otherwise, false */ @@ -132,6 +157,15 @@ public final class CdcUtil return isValid(fileName) && fileName.endsWith(LOG_FILE_EXTENSION); } + /** + * @param idxFileName name of the file. + * @return true if the filename is a valid cdc_raw log segment idx file + */ + public static boolean isValidIdxFile(String idxFileName) + { + return IDX_FILE_PATTERN.matcher(idxFileName).matches(); + } + public static boolean isIndexFile(String fileName) { return isValid(fileName) && matchIndexExtension(fileName); diff --git a/server/src/main/java/org/apache/cassandra/sidecar/utils/FileUtils.java b/server/src/main/java/org/apache/cassandra/sidecar/utils/FileUtils.java index 51172e83..d8bddb75 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/utils/FileUtils.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/utils/FileUtils.java @@ -18,11 +18,19 @@ package org.apache.cassandra.sidecar.utils; +import java.io.File; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.jetbrains.annotations.Nullable; + /** * Encompasses utilities for files */ public class FileUtils { + private static final Pattern STORAGE_UNIT_PATTERN = Pattern.compile("(\\d+)(GiB|MiB|KiB|B)?"); + /** * Resolves the home directory from the input {@code directory} string when the string begins with {@code ~}. * @@ -35,4 +43,63 @@ public class FileUtils return directory; return System.getProperty("user.home") + directory.substring(1); } + + /** + * @param directory the directory + * @return the size in bytes of all files in a directory, non-recursively. + */ + public static long directorySize(File directory) + { + long size = 0; + final File[] files = directory.listFiles(); + if (files != null) + { + for (File file : files) + { + if (file.isFile()) + { + size += file.length(); + } + } + } + return size; + } + + public static long mbStringToBytes(String str) + { + return Long.parseLong(str) * (1 << 20); // note the prop name uses 'mb' but Cassandra parses as MiB + } + + @Nullable + public static Long storageStringToBytes(String str) + { + final Matcher matcher = STORAGE_UNIT_PATTERN.matcher(str); + if (matcher.find()) + { + return Long.parseLong(matcher.group(1)) * storageUnitToBytes(matcher.group(2)); + } + return null; + } + + public static long storageUnitToBytes(String unit) + { + if (unit == null) + { + return 1; + } + + switch (unit) + { + case "GiB": + return 1 << 30; + case "MiB": + return 1 << 20; + case "KiB": + return 1024; + case "": + case "B": + return 1; + } + throw new IllegalStateException("Unexpected data storage unit: " + unit); + } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/utils/TimeProvider.java b/server/src/main/java/org/apache/cassandra/sidecar/utils/TimeProvider.java index 9f01cbe2..ac59bf7f 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/utils/TimeProvider.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/utils/TimeProvider.java @@ -23,10 +23,29 @@ package org.apache.cassandra.sidecar.utils; */ public interface TimeProvider { - TimeProvider DEFAULT_TIME_PROVIDER = System::currentTimeMillis; + TimeProvider DEFAULT_TIME_PROVIDER = new TimeProvider() + { + public long currentTimeMillis() + { + return System.currentTimeMillis(); + } + + public long nanoTime() + { + return System.nanoTime(); + } + }; /** * @return the current time in milliseconds */ long currentTimeMillis(); + + /** + * @return the current value of the high-resolution time source in nanosecond. + */ + default long nanoTime() + { + throw new UnsupportedOperationException("Nano time is unsupported"); + } } diff --git a/server/src/test/integration/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessorIntTest.java b/server/src/test/integration/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessorIntTest.java new file mode 100644 index 00000000..3ddb384b --- /dev/null +++ b/server/src/test/integration/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessorIntTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.db; + +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.sidecar.testing.IntegrationTestBase; +import org.apache.cassandra.testing.CassandraIntegrationTest; +import org.apache.cassandra.testing.ConfigurableCassandraTestContext; + +import static org.assertj.core.api.Assertions.assertThat; + +class SystemViewsDatabaseAccessorIntTest extends IntegrationTestBase +{ + @CassandraIntegrationTest(buildCluster = false) + void testReadSettings(ConfigurableCassandraTestContext cassandraTestContext) + { + long cdcSizeLimitInMiB = 5; + cassandraTestContext.configureAndStartCluster( + builder -> + builder.appendConfig(config -> config.set("cdc_total_space_in_mb", String.valueOf(cdcSizeLimitInMiB))) + ); + waitForSchemaReady(10, TimeUnit.SECONDS); + + SystemViewsDatabaseAccessor accessor = injector.getInstance(SystemViewsDatabaseAccessor.class); + Long cdcTotalSpaceSettings = accessor.getCdcTotalSpaceSetting(); + assertThat(cdcTotalSpaceSettings).isNotNull().isEqualTo(cdcSizeLimitInMiB * 1024 * 1024); + } +} diff --git a/server/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java b/server/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java index e0271b77..7e8ed080 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/db/SidecarSchemaTest.java @@ -203,7 +203,9 @@ public class SidecarSchemaTest "INSERT INTO sidecar_internal.configs (service, config) VALUES (?, ?) IF NOT EXISTS", - "DELETE FROM sidecar_internal.configs WHERE service=?" + "DELETE FROM sidecar_internal.configs WHERE service=?", + + "SELECT name, value FROM system_views.settings WHERE name IN ?" ); assertThat(interceptedPrepStmts).as("Intercepted statements match expected statements") diff --git a/server/src/test/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleanerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleanerTest.java new file mode 100644 index 00000000..b0366569 --- /dev/null +++ b/server/src/test/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleanerTest.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.tasks; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.apache.commons.lang3.RandomUtils; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import com.codahale.metrics.MetricRegistry; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; +import org.apache.cassandra.sidecar.cluster.InstancesMetadataImpl; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.server.dns.DnsResolver; +import org.apache.cassandra.sidecar.config.CdcConfiguration; +import org.apache.cassandra.sidecar.config.ServiceConfiguration; +import org.apache.cassandra.sidecar.config.yaml.CdcConfigurationImpl; +import org.apache.cassandra.sidecar.db.SystemViewsDatabaseAccessor; +import org.apache.cassandra.sidecar.metrics.CdcMetrics; +import org.apache.cassandra.sidecar.metrics.ServerMetrics; +import org.apache.cassandra.sidecar.metrics.SidecarMetrics; +import org.apache.cassandra.sidecar.utils.CdcUtil; +import org.apache.cassandra.sidecar.utils.TimeProvider; +import org.mockito.stubbing.Answer; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for the {@link CdcRawDirectorySpaceCleaner} + */ +public class CdcRawDirectorySpaceCleanerTest +{ + private static final MetricRegistry METRIC_REGISTRY = new MetricRegistry(); + private static final String TEST_SEGMENT_FILE_NAME_1 = "CommitLog-2-1250512736956320000.log"; + private static final String TEST_SEGMENT_FILE_NAME_2 = "CommitLog-2-1260512736956320000.log"; + private static final String TEST_SEGMENT_FILE_NAME_3 = "CommitLog-2-1340512736956320000.log"; + private static final String TEST_ORPHANED_SEGMENT_FILE_NAME = "CommitLog-2-1240512736956320000.log"; + private static final String TEST_INTACT_SEGMENT_FILE_NAME = "CommitLog-2-1340512736959990000.log"; + + @Test + public void testCdcRawDirectorySpaceCleaner(@TempDir Path tempDir) throws IOException + { + TimeProvider timeProvider = TimeProvider.DEFAULT_TIME_PROVIDER; + SystemViewsDatabaseAccessor systemViewsDatabaseAccessor = mock(SystemViewsDatabaseAccessor.class); + when(systemViewsDatabaseAccessor.getSettings(any())) + .thenAnswer((Answer<Map<String, String>>) invocation -> Map.of("cdc_total_space", "1MiB")); + when(systemViewsDatabaseAccessor.getCdcTotalSpaceSetting()).thenCallRealMethod(); + CdcConfiguration cdcConfiguration = new CdcConfigurationImpl(); + ServiceConfiguration serviceConfiguration = mock(ServiceConfiguration.class); + when(serviceConfiguration.cdcConfiguration()).thenReturn(cdcConfiguration); + + InstancesMetadata instancesMetadata = mockInstanceMetadata(tempDir); + SidecarMetrics sidecarMetrics = mock(SidecarMetrics.class); + ServerMetrics serverMetrics = mock(ServerMetrics.class); + CdcMetrics cdcMetrics = new CdcMetrics(METRIC_REGISTRY); + when(sidecarMetrics.server()).thenReturn(serverMetrics); + when(serverMetrics.cdc()).thenReturn(cdcMetrics); + CdcRawDirectorySpaceCleaner cleaner = new CdcRawDirectorySpaceCleaner( + timeProvider, + systemViewsDatabaseAccessor, + serviceConfiguration, + instancesMetadata, + sidecarMetrics + ); + + checkExists(tempDir, TEST_ORPHANED_SEGMENT_FILE_NAME, true, false); + checkExists(tempDir, TEST_SEGMENT_FILE_NAME_1); + checkExists(tempDir, TEST_SEGMENT_FILE_NAME_2); + checkExists(tempDir, TEST_SEGMENT_FILE_NAME_3); + checkExists(tempDir, TEST_INTACT_SEGMENT_FILE_NAME, false, true); + + assertEquals(0L, cdcMetrics.criticalCdcRawSpace.metric.getValue()); + assertEquals(0L, cdcMetrics.orphanedIdx.metric.getValue()); + assertEquals(0L, cdcMetrics.deletedSegment.metric.getValue()); + + cleaner.routineCleanUp(); + + // earliest cdc segment should be deleted along with orphaned idx file + checkNotExists(tempDir, TEST_ORPHANED_SEGMENT_FILE_NAME); + checkNotExists(tempDir, TEST_SEGMENT_FILE_NAME_1); + + // latest cdc segments should still exist as long as we have free buffer space + checkExists(tempDir, TEST_SEGMENT_FILE_NAME_2); + checkExists(tempDir, TEST_SEGMENT_FILE_NAME_3); + checkExists(tempDir, TEST_INTACT_SEGMENT_FILE_NAME, false, true); + + // verify metrics match expected + assertEquals(1L, cdcMetrics.criticalCdcRawSpace.metric.getValue()); + assertEquals(1L, cdcMetrics.orphanedIdx.metric.getValue()); + assertTrue(cdcMetrics.totalCdcSpaceUsed.metric.getValue() > 2097152L); + assertTrue(cdcMetrics.deletedSegment.metric.getValue() > 2097152L); + assertEquals(0, cdcMetrics.oldestSegmentAge.metric.getValue()); + + // delete all cdc files, in order to test the scenario that we do not have current cdc file, but have cdc file in the prior round. + // We do not expect all CDC file to be cleaned up in a running system. But test it for robustness. + Files.deleteIfExists(Paths.get(tempDir.toString(), CdcRawDirectorySpaceCleaner.CDC_DIR_NAME, TEST_INTACT_SEGMENT_FILE_NAME)); + cleaner.routineCleanUp(); // it should run fine. + } + + /* test utils */ + + private static InstancesMetadata mockInstanceMetadata(Path tempDir) throws IOException + { + InstanceMetadata instanceMetadata = mock(InstanceMetadata.class); + + File cdcDir = Files.createDirectory(tempDir.resolve(CdcRawDirectorySpaceCleaner.CDC_DIR_NAME)).toFile(); + writeCdcSegment(cdcDir, TEST_ORPHANED_SEGMENT_FILE_NAME, 67108864, true, true, false); + writeCdcSegment(cdcDir, TEST_SEGMENT_FILE_NAME_1, 2097152, true); + writeCdcSegment(cdcDir, TEST_SEGMENT_FILE_NAME_2, 524288, true); + writeCdcSegment(cdcDir, TEST_SEGMENT_FILE_NAME_3, 1024, false); + + Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS); + writeCdcSegment(cdcDir, TEST_INTACT_SEGMENT_FILE_NAME, RandomUtils.nextInt(128, 256), false, false, true); + + when(instanceMetadata.dataDirs()).thenReturn(List.of(cdcDir.getParent())); + return new InstancesMetadataImpl(instanceMetadata, DnsResolver.DEFAULT); + } + + private static void writeCdcSegment(File cdcDir, String filename, int size, boolean complete) throws IOException + { + writeCdcSegment(cdcDir, filename, size, complete, false, false); + } + + private static void writeCdcSegment(File cdcDir, String filename, int size, boolean complete, boolean orphaned, boolean intact) throws IOException + { + if (!orphaned) + { + final File f1 = new File(cdcDir, filename); + assertTrue(f1.createNewFile()); + Files.write(f1.toPath(), RandomUtils.nextBytes(size)); + } + + if (!intact) + { + final File f2 = new File(cdcDir, CdcUtil.getIdxFileName(filename)); + assertTrue(f2.createNewFile()); + Files.write(f2.toPath(), (size + (complete ? "\nCOMPLETED" : "")).getBytes(StandardCharsets.UTF_8)); + } + } + + private void checkExists(Path tempDir, String logFileName) + { + checkExists(tempDir, logFileName, false, false); + } + + private void checkExists(Path tempDir, String logFileName, boolean orphaned, boolean intact) + { + assertEquals(!orphaned, Files.exists(Paths.get(tempDir.toString(), CdcRawDirectorySpaceCleaner.CDC_DIR_NAME, logFileName))); + assertEquals(!intact, Files.exists(Paths.get(tempDir.toString(), CdcRawDirectorySpaceCleaner.CDC_DIR_NAME, CdcUtil.getIdxFileName(logFileName)))); + } + + private void checkNotExists(Path tempDir, String logFileName) + { + assertFalse(Files.exists(Paths.get(tempDir.toString(), CdcRawDirectorySpaceCleaner.CDC_DIR_NAME, logFileName))); + assertFalse(Files.exists(Paths.get(tempDir.toString(), CdcRawDirectorySpaceCleaner.CDC_DIR_NAME, CdcUtil.getIdxFileName(logFileName)))); + } +} diff --git a/server/src/test/java/org/apache/cassandra/sidecar/utils/CdcUtilTest.java b/server/src/test/java/org/apache/cassandra/sidecar/utils/CdcUtilTest.java new file mode 100644 index 00000000..4645b930 --- /dev/null +++ b/server/src/test/java/org/apache/cassandra/sidecar/utils/CdcUtilTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.utils; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Unit tests for the {@link CdcUtil} + */ +public class CdcUtilTest +{ + @Test + public void testMatcher() + { + assertTrue(CdcUtil.SEGMENT_PATTERN.matcher("CommitLog-7-1689642717704.log").matches()); + assertTrue(CdcUtil.SEGMENT_PATTERN.matcher("CommitLog-12345.log").matches()); + assertTrue(CdcUtil.SEGMENT_PATTERN.matcher("CommitLog-2-1340512736956320000.log").matches()); + assertTrue(CdcUtil.SEGMENT_PATTERN.matcher("CommitLog-2-1340512736959990000.log").matches()); + assertTrue(CdcUtil.isValid("CommitLog-7-1689642717704.log")); + assertTrue(CdcUtil.isValid("CommitLog-12345.log")); + assertTrue(CdcUtil.isValid("CommitLog-2-1340512736956320000.log")); + assertTrue(CdcUtil.isValid("CommitLog-2-1340512736959990000.log")); + assertTrue(CdcUtil.isLogFile("CommitLog-7-1689642717704.log")); + assertTrue(CdcUtil.isLogFile("CommitLog-12345.log")); + assertTrue(CdcUtil.isLogFile("CommitLog-2-1340512736956320000.log")); + assertTrue(CdcUtil.isLogFile("CommitLog-2-1340512736959990000.log")); + assertTrue(CdcUtil.IDX_FILE_PATTERN.matcher("CommitLog-7-1689642717704_cdc.idx").matches()); + assertTrue(CdcUtil.IDX_FILE_PATTERN.matcher("CommitLog-12345_cdc.idx").matches()); + assertTrue(CdcUtil.IDX_FILE_PATTERN.matcher("CommitLog-2-1240512736956320000_cdc.idx").matches()); + assertTrue(CdcUtil.IDX_FILE_PATTERN.matcher("CommitLog-2-1340512736956320000_cdc.idx").matches()); + + assertFalse(CdcUtil.isValid("CommitLog-abc.log")); + assertFalse(CdcUtil.isValid("abc-7-1689642717704.log")); + assertFalse(CdcUtil.isValid("abc-1689642717704.log")); + assertFalse(CdcUtil.isLogFile("CommitLog-abc.log")); + assertFalse(CdcUtil.isLogFile("abc-7-1689642717704.log")); + assertFalse(CdcUtil.isLogFile("abc-1689642717704.log")); + assertFalse(CdcUtil.isLogFile("CommitLog-7-1689642717704")); + assertFalse(CdcUtil.isLogFile("CommitLog-12345")); + assertFalse(CdcUtil.isLogFile("CommitLog-2-1340512736956320000")); + assertFalse(CdcUtil.isLogFile("CommitLog-2-1340512736959990000")); + } + + @Test + public void testExtractSegmentIdMatcher() + { + assertEquals(12345L, CdcUtil.parseSegmentId("CommitLog-12345.log")); + assertEquals(1689642717704L, CdcUtil.parseSegmentId("CommitLog-7-1689642717704.log")); + assertEquals(1340512736956320000L, CdcUtil.parseSegmentId("CommitLog-2-1340512736956320000.log")); + assertEquals(1340512736959990000L, CdcUtil.parseSegmentId("CommitLog-2-1340512736959990000.log")); + assertEquals(12345L, CdcUtil.parseSegmentId("CommitLog-6-12345.log")); + assertEquals(1646094405659L, CdcUtil.parseSegmentId("CommitLog-7-1646094405659.log")); + assertEquals(1646094405659L, CdcUtil.parseSegmentId("CommitLog-1646094405659.log")); + } + + @Test + public void testIdxToLogFileName() + { + assertEquals("CommitLog-7-1689642717704.log", CdcUtil.idxToLogFileName("CommitLog-7-1689642717704_cdc.idx")); + assertEquals("CommitLog-12345.log", CdcUtil.idxToLogFileName("CommitLog-12345_cdc.idx")); + assertEquals("CommitLog-2-1240512736956320000.log", CdcUtil.idxToLogFileName("CommitLog-2-1240512736956320000_cdc.idx")); + assertEquals("CommitLog-2-1340512736956320000.log", CdcUtil.idxToLogFileName("CommitLog-2-1340512736956320000_cdc.idx")); + } +} diff --git a/server/src/test/java/org/apache/cassandra/sidecar/utils/FileUtilsTest.java b/server/src/test/java/org/apache/cassandra/sidecar/utils/FileUtilsTest.java index eba7be16..f545d529 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/utils/FileUtilsTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/utils/FileUtilsTest.java @@ -18,8 +18,11 @@ package org.apache.cassandra.sidecar.utils; +import java.util.Objects; + import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.assertj.core.api.Assertions.assertThat; /** @@ -37,4 +40,25 @@ class FileUtilsTest assertThat(FileUtils.maybeResolveHomeDirectory("~/.ccm")).isEqualTo(System.getProperty("user.home") + "/.ccm"); assertThat(FileUtils.maybeResolveHomeDirectory("/dev/null")).isEqualTo("/dev/null"); } + + @Test + public void testStorageStringPatterns() + { + assertEquals(1, FileUtils.storageUnitToBytes("B")); + assertEquals(1024, FileUtils.storageUnitToBytes("KiB")); + assertEquals(1048576, FileUtils.storageUnitToBytes("MiB")); + assertEquals(1073741824, FileUtils.storageUnitToBytes("GiB")); + + assertEquals(1048576L, FileUtils.mbStringToBytes("1")); + assertEquals(524288000L, FileUtils.mbStringToBytes("500")); + + assertEquals(1L, Objects.requireNonNull(FileUtils.storageStringToBytes("1")).longValue()); + assertEquals(1L, Objects.requireNonNull(FileUtils.storageStringToBytes("1B")).longValue()); + assertEquals(500L, Objects.requireNonNull(FileUtils.storageStringToBytes("500B")).longValue()); + assertEquals(1024, Objects.requireNonNull(FileUtils.storageStringToBytes("1KiB")).longValue()); + assertEquals(1048576, Objects.requireNonNull(FileUtils.storageStringToBytes("1024KiB")).longValue()); + assertEquals(1048576, Objects.requireNonNull(FileUtils.storageStringToBytes("1MiB")).longValue()); + assertEquals(4294967296L, Objects.requireNonNull(FileUtils.storageStringToBytes("4096MiB")).longValue()); + assertEquals(536870912000L, Objects.requireNonNull(FileUtils.storageStringToBytes("500GiB")).longValue()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org