nuno-faria commented on code in PR #20047:
URL: https://github.com/apache/datafusion/pull/20047#discussion_r2807893733


##########
datafusion/execution/src/cache/cache_unit.rs:
##########
@@ -36,37 +36,167 @@ pub use crate::cache::DefaultFilesMetadataCache;
 /// 2. If `Some(cached)`, validate with `cached.is_valid_for(&current_meta)`
 /// 3. If invalid or missing, compute new value and call `put(path, new_value)`
 ///
-/// Uses DashMap for lock-free concurrent access.
+/// # Internal details
+///
+/// The `memory_limit` controls the maximum size of the cache, which uses a
+/// Least Recently Used eviction algorithm. When adding a new entry, if the 
total
+/// size of the cached entries exceeds `memory_limit`, the least recently used 
entries
+/// are evicted until the total size is lower than `memory_limit`.
+///
 ///
 /// [`FileStatisticsCache`]: crate::cache::cache_manager::FileStatisticsCache
 #[derive(Default)]
 pub struct DefaultFileStatisticsCache {

Review Comment:
   Should this be now moved (or renamed) to its own `file_statistics_cache.rs`, 
similar to the other caches?



##########
datafusion/execution/src/cache/cache_unit.rs:
##########
@@ -36,37 +36,167 @@ pub use crate::cache::DefaultFilesMetadataCache;
 /// 2. If `Some(cached)`, validate with `cached.is_valid_for(&current_meta)`
 /// 3. If invalid or missing, compute new value and call `put(path, new_value)`
 ///
-/// Uses DashMap for lock-free concurrent access.
+/// # Internal details
+///
+/// The `memory_limit` controls the maximum size of the cache, which uses a
+/// Least Recently Used eviction algorithm. When adding a new entry, if the 
total
+/// size of the cached entries exceeds `memory_limit`, the least recently used 
entries
+/// are evicted until the total size is lower than `memory_limit`.
+///
 ///
 /// [`FileStatisticsCache`]: crate::cache::cache_manager::FileStatisticsCache
 #[derive(Default)]
 pub struct DefaultFileStatisticsCache {
-    cache: DashMap<Path, CachedFileMetadata>,
+    state: Mutex<DefaultFileStatisticsCacheState>,
+}
+
+impl DefaultFileStatisticsCache {
+    pub fn new(memory_limit: usize) -> Self {
+        Self {
+            state: 
Mutex::new(DefaultFileStatisticsCacheState::new(memory_limit)),
+        }
+    }
+
+    /// Returns the size of the cached memory, in bytes.
+    pub fn memory_used(&self) -> usize {
+        let state = self.state.lock().unwrap();
+        state.memory_used
+    }
+}
+
+struct DefaultFileStatisticsCacheState {
+    lru_queue: LruQueue<Path, CachedFileMetadata>,
+    memory_limit: usize,
+    memory_used: usize,
+}
+
+pub const DEFAULT_FILE_STATISTICS_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB
+
+impl Default for DefaultFileStatisticsCacheState {
+    fn default() -> Self {
+        Self {
+            lru_queue: LruQueue::new(),
+            memory_limit: DEFAULT_FILE_STATISTICS_MEMORY_LIMIT,
+            memory_used: 0,
+        }
+    }
 }
 
+impl DefaultFileStatisticsCacheState {

Review Comment:
   Also there are now 3 similar "LRU + memory limit" cache implementations 
(metadata, list files, files statistics). Maybe one day they could be merged 
into a generic one.



##########
datafusion/execution/src/cache/cache_unit.rs:
##########
@@ -36,37 +36,167 @@ pub use crate::cache::DefaultFilesMetadataCache;
 /// 2. If `Some(cached)`, validate with `cached.is_valid_for(&current_meta)`
 /// 3. If invalid or missing, compute new value and call `put(path, new_value)`
 ///
-/// Uses DashMap for lock-free concurrent access.
+/// # Internal details
+///
+/// The `memory_limit` controls the maximum size of the cache, which uses a
+/// Least Recently Used eviction algorithm. When adding a new entry, if the 
total
+/// size of the cached entries exceeds `memory_limit`, the least recently used 
entries
+/// are evicted until the total size is lower than `memory_limit`.
+///
 ///
 /// [`FileStatisticsCache`]: crate::cache::cache_manager::FileStatisticsCache
 #[derive(Default)]
 pub struct DefaultFileStatisticsCache {
-    cache: DashMap<Path, CachedFileMetadata>,
+    state: Mutex<DefaultFileStatisticsCacheState>,
+}
+
+impl DefaultFileStatisticsCache {
+    pub fn new(memory_limit: usize) -> Self {
+        Self {
+            state: 
Mutex::new(DefaultFileStatisticsCacheState::new(memory_limit)),
+        }
+    }
+
+    /// Returns the size of the cached memory, in bytes.
+    pub fn memory_used(&self) -> usize {
+        let state = self.state.lock().unwrap();
+        state.memory_used
+    }
+}
+
+struct DefaultFileStatisticsCacheState {
+    lru_queue: LruQueue<Path, CachedFileMetadata>,
+    memory_limit: usize,
+    memory_used: usize,
+}
+
+pub const DEFAULT_FILE_STATISTICS_MEMORY_LIMIT: usize = 1024 * 1024; // 1MiB
+
+impl Default for DefaultFileStatisticsCacheState {
+    fn default() -> Self {
+        Self {
+            lru_queue: LruQueue::new(),
+            memory_limit: DEFAULT_FILE_STATISTICS_MEMORY_LIMIT,
+            memory_used: 0,
+        }
+    }
 }
 
+impl DefaultFileStatisticsCacheState {
+    fn new(memory_limit: usize) -> Self {
+        Self {
+            lru_queue: LruQueue::new(),
+            memory_limit,
+            memory_used: 0,
+        }
+    }
+    fn get(&mut self, key: &Path) -> Option<CachedFileMetadata> {
+        self.lru_queue.get(key).cloned()
+    }
+
+    fn put(
+        &mut self,
+        key: &Path,
+        value: CachedFileMetadata,
+    ) -> Option<CachedFileMetadata> {
+        let key_size = key.as_ref().heap_size();
+        let entry_size = value.heap_size();
+
+        if entry_size + key_size > self.memory_limit {
+            // Remove potential stale entry
+            self.remove(key);
+            return None;
+        }
+
+        let old_value = self.lru_queue.put(key.clone(), value);
+        self.memory_used += entry_size;
+
+        if let Some(old_entry) = &old_value {
+            self.memory_used -= old_entry.heap_size();
+        } else {
+            self.memory_used += key.as_ref().heap_size();

Review Comment:
   nit: In my opinion I think the code would be easier to read if 
`key.as_ref().heap_size()` was added in the first `self.memory_used += 
entry_size;` and then, if an older entry exists, removed at `self.memory_used 
-= old_entry.heap_size();`, removing the `else`.



##########
datafusion-cli/src/main.rs:
##########
@@ -647,9 +644,9 @@ mod tests {
         
+-----------------------------------+-----------------+---------------------+------+------------------+
         | filename                          | file_size_bytes | 
metadata_size_bytes | hits | extra            |
         
+-----------------------------------+-----------------+---------------------+------+------------------+
-        | alltypes_plain.parquet            | 1851            | 8882           
     | 5    | page_index=false |
+        | alltypes_plain.parquet            | 1851            | 8882           
     | 8    | page_index=false |
         | alltypes_tiny_pages.parquet       | 454233          | 269266         
     | 2    | page_index=true  |
-        | lz4_raw_compressed_larger.parquet | 380836          | 1347           
     | 3    | page_index=false |
+        | lz4_raw_compressed_larger.parquet | 380836          | 1347           
     | 4    | page_index=false |
         
+-----------------------------------+-----------------+---------------------+------+------------------+

Review Comment:
   Isn't this a regression? Each scan now appears to require two reads of the 
metadata cache. I did a quick check and noticed that the extra read is caused 
by the `list_files_for_scan` function in `datafusion_catalog_listing` not 
having access to the ordering information, meaning it needs to call 
`do_collect_statistics_and_ordering` every time.
   
   
https://github.com/apache/datafusion/blob/53b0ffb93d5fb662506439b6f1ffc34f2d7684ec/datafusion/catalog-listing/src/table.rs#L734-L735



##########
datafusion/execution/src/cache/cache_unit.rs:
##########
@@ -75,20 +205,30 @@ impl CacheAccessor<Path, CachedFileMetadata> for 
DefaultFileStatisticsCache {
 }
 
 impl FileStatisticsCache for DefaultFileStatisticsCache {
+    fn cache_limit(&self) -> usize {
+        let state = self.state.lock().unwrap();
+        state.memory_limit
+    }
+
+    fn update_cache_limit(&self, limit: usize) {
+        let mut state = self.state.lock().unwrap();
+        state.memory_limit = limit;
+        state.evict_entries();
+    }
+
     fn list_entries(&self) -> HashMap<Path, FileStatisticsCacheEntry> {
         let mut entries = HashMap::<Path, FileStatisticsCacheEntry>::new();
-
-        for entry in self.cache.iter() {
-            let path = entry.key();
-            let cached = entry.value();
+        for entry in self.state.lock().unwrap().lru_queue.list_entries() {
+            let path = entry.0.clone();
+            let cached = entry.1.clone();
             entries.insert(
-                path.clone(),
+                path,
                 FileStatisticsCacheEntry {
                     object_meta: cached.meta.clone(),
                     num_rows: cached.statistics.num_rows,
                     num_columns: cached.statistics.column_statistics.len(),
                     table_size_bytes: cached.statistics.total_byte_size,
-                    statistics_size_bytes: 0, // TODO: set to the real size in 
the future
+                    statistics_size_bytes: cached.statistics.heap_size(),
                     has_ordering: cached.ordering.is_some(),
                 },

Review Comment:
   I think the clone on `cached` is not necessary.



##########
datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt:
##########
@@ -177,6 +177,10 @@ physical_plan
 statement ok
 DROP TABLE test_table;
 
+# Disable file statistics cache because file statistics have been previously 
created
+statement ok
+set datafusion.runtime.file_statistics_cache_limit = "0K";
+

Review Comment:
   I don't understand the need to disable the cache in this test, as well as 
the other two (`parquet_filter_pushdown.slt`, `array.slt`).
   
   I tried commenting this and the plan changed in a query:
   ```diff
   -02)--DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet,
 
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]},
 projection=[int_col, bigint_col, partition_col], 
output_ordering=[partition_col@2 ASC NULLS LAST, int_col@0 ASC NULLS LAST, 
bigint_col@1 ASC NULLS LAST], file_type=parquet
   +02)--DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet,
 
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet]]},
 projection=[int_col, bigint_col, partition_col], file_type=parquet
   ```
   
   The `output_ordering` information is missing, which might be related to this 
comment https://github.com/apache/datafusion/pull/20047/changes#r2807891883.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to