notfilippo commented on code in PR #18928:
URL: https://github.com/apache/datafusion/pull/18928#discussion_r2558937086


##########
datafusion/execution/Cargo.toml:
##########
@@ -46,10 +46,14 @@ default = ["sql"]
 parquet_encryption = [
     "parquet/encryption",
 ]
+arrow_buffer_pool = [

Review Comment:
   The name is fairly descriptive. Maybe we could change it to something 
better.  I'm open to suggestions!



##########
datafusion/execution/src/memory_pool/arrow.rs:
##########
@@ -0,0 +1,104 @@
+//! Adapter for integrating DataFusion's [`MemoryPool`] with Arrow's memory 
tracking APIs.
+
+use crate::memory_pool::{MemoryConsumer, MemoryLimit, MemoryPool, 
MemoryReservation};
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// An adapter that implements Arrow's [`arrow_buffer::MemoryPool`] trait
+/// by wrapping a DataFusion [`MemoryPool`].
+///
+/// This allows DataFusion's memory management system to be used with Arrow's
+/// memory allocation APIs. Each reservation made through this pool will be
+/// tracked using the provided [`MemoryConsumer`], enabling DataFusion to
+/// monitor and limit memory usage across Arrow operations.
+///
+/// This is useful when you want Arrow operations (such as array builders
+/// or compute kernels) to participate in DataFusion's memory management
+/// and respect the same memory limits as DataFusion operators.
+#[derive(Debug)]
+pub struct ArrowMemoryPool {
+    inner: Arc<dyn MemoryPool>,
+    consumer: MemoryConsumer,
+}
+
+impl ArrowMemoryPool {
+    /// Creates a new [`ArrowMemoryPool`] that wraps the given DataFusion 
[`MemoryPool`]
+    /// and tracks allocations under the specified [`MemoryConsumer`].
+    pub fn new(inner: Arc<dyn MemoryPool>, consumer: MemoryConsumer) -> Self {
+        Self { inner, consumer }
+    }
+}
+
+impl arrow_buffer::MemoryReservation for MemoryReservation {
+    fn size(&self) -> usize {
+        MemoryReservation::size(self)
+    }
+
+    fn resize(&mut self, new_size: usize) {
+        MemoryReservation::resize(self, new_size)
+    }
+}
+
+impl arrow_buffer::MemoryPool for ArrowMemoryPool {
+    fn reserve(&self, size: usize) -> Box<dyn arrow_buffer::MemoryReservation> 
{
+        let consumer = self.consumer.clone_with_new_id();
+        let mut reservation = consumer.register(&self.inner);
+        reservation.grow(size);
+
+        Box::new(reservation)
+    }
+
+    fn available(&self) -> isize {
+        // The pool may be overfilled, so this method might return a negative 
value.
+        (self.capacity() as i128 - self.used() as i128)
+            .try_into()
+            .unwrap_or(isize::MIN)
+    }
+
+    fn used(&self) -> usize {
+        self.inner.reserved()
+    }
+
+    fn capacity(&self) -> usize {
+        match self.inner.memory_limit() {
+            MemoryLimit::Infinite | MemoryLimit::Unknown => usize::MAX,
+            MemoryLimit::Finite(capacity) => capacity,
+        }
+    }

Review Comment:
   I'm not really sure these methods have a use-case... I'm open to removing 
them in the upstream trait.



##########
datafusion/execution/Cargo.toml:
##########
@@ -46,10 +46,14 @@ default = ["sql"]
 parquet_encryption = [
     "parquet/encryption",
 ]
+arrow_buffer_pool = [

Review Comment:
   The feature-flag name is fairly descriptive. Maybe we could change it to 
something better.  I'm open to suggestions!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to