kosiew commented on code in PR #20047:
URL: https://github.com/apache/datafusion/pull/20047#discussion_r3063779630


##########
datafusion/core/src/execution/context/mod.rs:
##########
@@ -1418,8 +1427,11 @@ impl SessionContext {
             schema.deregister_table(&table)?;
             if table_type == TableType::Base
                 && let Some(lfc) = 
self.runtime_env().cache_manager.get_list_files_cache()

Review Comment:
   I think this invalidation logic now only runs when both caches are enabled, 
because `get_list_files_cache()` and `get_file_statistic_cache()` are chained 
in the same `if let`.
   
   That creates a cleanup gap. For example, if a session disables the 
list-files cache but keeps the file-statistics cache enabled, deregistering a 
table will leave the statistics entries behind. That leaves stale 
session-scoped state around and can leak memory if the table is re-registered.
   
   Can we make these invalidations independent so each cache is cleaned up 
whenever it is present?



##########
datafusion/execution/src/cache/file_statistics_cache.rs:
##########
@@ -0,0 +1,733 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::cache::cache_manager::{
+    CachedFileMetadata, FileStatisticsCache, FileStatisticsCacheEntry,
+};
+use crate::cache::{CacheAccessor, TableScopedPath};
+use object_store::path::Path;
+use std::collections::HashMap;
+use std::sync::Mutex;
+
+pub use crate::cache::DefaultFilesMetadataCache;
+use crate::cache::lru_queue::LruQueue;
+use datafusion_common::TableReference;
+use datafusion_common::heap_size::DFHeapSize;
+
+/// Default implementation of [`FileStatisticsCache`]
+///
+/// Stores cached file metadata (statistics and orderings) for files.
+///
+/// The typical usage pattern is:
+/// 1. Call `get(path)` to check for cached value
+/// 2. If `Some(cached)`, validate with `cached.is_valid_for(&current_meta)`
+/// 3. If invalid or missing, compute new value and call `put(path, new_value)`
+///
+/// # Internal details
+///
+/// The `memory_limit` controls the maximum size of the cache, which uses a
+/// Least Recently Used eviction algorithm. When adding a new entry, if the 
total
+/// size of the cached entries exceeds `memory_limit`, the least recently used 
entries
+/// are evicted until the total size is lower than `memory_limit`.
+///
+///
+/// [`FileStatisticsCache`]: crate::cache::cache_manager::FileStatisticsCache
+#[derive(Default)]
+pub struct DefaultFileStatisticsCache {
+    state: Mutex<DefaultFileStatisticsCacheState>,
+}
+
+impl DefaultFileStatisticsCache {
+    pub fn new(memory_limit: usize) -> Self {
+        Self {
+            state: 
Mutex::new(DefaultFileStatisticsCacheState::new(memory_limit)),
+        }
+    }
+
+    /// Returns the size of the cached memory, in bytes.
+    pub fn memory_used(&self) -> usize {
+        let state = self.state.lock().unwrap();
+        state.memory_used
+    }
+}
+
+struct DefaultFileStatisticsCacheState {
+    lru_queue: LruQueue<TableScopedPath, CachedFileMetadata>,
+    memory_limit: usize,
+    memory_used: usize,
+}
+
+pub const DEFAULT_FILE_STATISTICS_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB
+
+impl Default for DefaultFileStatisticsCacheState {
+    fn default() -> Self {
+        Self {
+            lru_queue: LruQueue::new(),
+            memory_limit: DEFAULT_FILE_STATISTICS_MEMORY_LIMIT,
+            memory_used: 0,
+        }
+    }
+}
+
+impl DefaultFileStatisticsCacheState {
+    fn new(memory_limit: usize) -> Self {
+        Self {
+            lru_queue: LruQueue::new(),
+            memory_limit,
+            memory_used: 0,
+        }
+    }
+    fn get(&mut self, key: &TableScopedPath) -> Option<CachedFileMetadata> {
+        self.lru_queue.get(key).cloned()
+    }
+
+    fn put(
+        &mut self,
+        key: &TableScopedPath,
+        value: CachedFileMetadata,
+    ) -> Option<CachedFileMetadata> {
+        let key_size = key.path.as_ref().heap_size();
+        let entry_size = value.heap_size();
+
+        if entry_size + key_size > self.memory_limit {
+            // Remove potential stale entry
+            self.remove(key);
+            return None;
+        }
+
+        let old_value = self.lru_queue.put(key.clone(), value);

Review Comment:
   Nice change overall, but I think there is a bug in the replacement path for 
`memory_used`.
   
   When an existing cache entry is overwritten, this path adds the new key size 
and value size, but if `old_value` is present it only subtracts 
`old_entry.heap_size()`. It does not subtract the old key's heap usage.
   
   That means repeatedly refreshing the same file will slowly inflate 
`memory_used`, which can trigger eviction earlier than expected and make the 
memory bound inaccurate.
   
   Could you update the overwrite path to subtract the previous key 
contribution as well? Please also add a regression test that does repeated 
`put` calls on the same `TableScopedPath` and verifies `memory_used` stays 
stable.



##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -91,9 +95,31 @@ impl CachedFileMetadata {
 /// 3. If invalid or missing, compute new value and call `put(path, new_value)`
 ///
 /// See [`crate::runtime_env::RuntimeEnv`] for more details
-pub trait FileStatisticsCache: CacheAccessor<Path, CachedFileMetadata> {
+pub trait FileStatisticsCache:
+    CacheAccessor<TableScopedPath, CachedFileMetadata>
+{
+    /// Cache memory limit in bytes.
+    fn cache_limit(&self) -> usize;
+
+    /// Updates the cache with a new memory limit in bytes.
+    fn update_cache_limit(&self, limit: usize);
+
     /// Retrieves the information about the entries currently cached.

Review Comment:
   Non-blocking thought: now that the file statistics cache key is 
`TableScopedPath`, `list_entries() -> HashMap<Path, FileStatisticsCacheEntry>` 
seems to flatten away the table dimension again.
   
   That can make observability a bit confusing for helpers like 
`statistics_cache()`, especially if two tables point at the same object-store 
path.
   
   It may be worth exposing `TableScopedPath` here, or otherwise carrying the 
table reference through the reported entry, so the debug and introspection 
surface matches the actual cache semantics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to