alamb commented on code in PR #15613:
URL: https://github.com/apache/datafusion/pull/15613#discussion_r2035556569


##########
datafusion/execution/src/memory_pool/pool.rs:
##########
@@ -261,7 +268,7 @@ fn insufficient_capacity_err(
 pub struct TrackConsumersPool<I> {
     inner: I,
     top: NonZeroUsize,
-    tracked_consumers: Mutex<HashMap<MemoryConsumer, AtomicU64>>,
+    tracked_consumers: Mutex<HashMap<usize, TrackedConsumer>>,

Review Comment:
   ```suggestion
       /// Maps consumer_id --> TrackedConsumer
       tracked_consumers: Mutex<HashMap<usize, TrackedConsumer>>,
   ```



##########
datafusion/execution/src/memory_pool/pool.rs:
##########
@@ -501,12 +507,12 @@ mod tests {
         // Test: reports if new reservation causes error
         // using the previously set sizes for other consumers
         let mut r5 = MemoryConsumer::new("r5").register(&pool);
-        let expected = "Additional allocation failed with top memory consumers 
(across reservations) as: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 
consumed 15 bytes. Error: Failed to allocate additional 150 bytes for r5 with 0 
bytes already allocated for this reservation - 5 bytes remain available for the 
total pool";
+        let expected = format!("Additional allocation failed with top memory 
consumers (across reservations) as: r1#{}(can spill: false) consumed 50 bytes, 
r3#{}(can spill: false) consumed 20 bytes, r2#{}(can spill: false) consumed 15 
bytes. Error: Failed to allocate additional 150 bytes for r5 with 0 bytes 
already allocated for this reservation - 5 bytes remain available for the total 
pool", r1.consumer().id(), r3.consumer().id(), r2.consumer().id());

Review Comment:
   this is a nice change



##########
datafusion/execution/src/memory_pool/pool.rs:
##########
@@ -277,40 +284,29 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
         }
     }
 
-    /// Determine if there are multiple [`MemoryConsumer`]s registered
-    /// which have the same name.
-    ///
-    /// This is very tied to the implementation of the memory consumer.
-    fn has_multiple_consumers(&self, name: &String) -> bool {
-        let consumer = MemoryConsumer::new(name);
-        let consumer_with_spill = consumer.clone().with_can_spill(true);
-        let guard = self.tracked_consumers.lock();
-        guard.contains_key(&consumer) && 
guard.contains_key(&consumer_with_spill)
-    }
-
     /// The top consumers in a report string.
     pub fn report_top(&self, top: usize) -> String {
         let mut consumers = self
             .tracked_consumers
             .lock()
             .iter()
-            .map(|(consumer, reserved)| {
+            .map(|(consumer_id, tracked_consumer)| {
                 (
-                    (consumer.name().to_owned(), consumer.can_spill()),
-                    reserved.load(Ordering::Acquire),
+                    (
+                        *consumer_id,
+                        tracked_consumer.name.to_owned(),
+                        tracked_consumer.can_spill,
+                    ),
+                    tracked_consumer.reserved.load(Ordering::Acquire),

Review Comment:
   The same thing goes for other updates later down



##########
datafusion/execution/src/memory_pool/mod.rs:
##########
@@ -149,21 +150,65 @@ pub trait MemoryPool: Send + Sync + std::fmt::Debug {
 /// For help with allocation accounting, see the [`proxy`] module.
 ///
 /// [proxy]: datafusion_common::utils::proxy
-#[derive(Debug, PartialEq, Eq, Hash, Clone)]
+#[derive(Debug)]

Review Comment:
   Can you also please add a note to this doc string that says each 
MemoryConsumer gets an automatically assigned globally unique id and that to 
clone a memory consumer, see [`MemoryConsumer::clone_with_new_id`]



##########
datafusion/execution/src/memory_pool/pool.rs:
##########
@@ -277,40 +284,29 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
         }
     }
 
-    /// Determine if there are multiple [`MemoryConsumer`]s registered
-    /// which have the same name.
-    ///
-    /// This is very tied to the implementation of the memory consumer.
-    fn has_multiple_consumers(&self, name: &String) -> bool {
-        let consumer = MemoryConsumer::new(name);
-        let consumer_with_spill = consumer.clone().with_can_spill(true);
-        let guard = self.tracked_consumers.lock();
-        guard.contains_key(&consumer) && 
guard.contains_key(&consumer_with_spill)
-    }
-
     /// The top consumers in a report string.
     pub fn report_top(&self, top: usize) -> String {
         let mut consumers = self
             .tracked_consumers
             .lock()
             .iter()
-            .map(|(consumer, reserved)| {
+            .map(|(consumer_id, tracked_consumer)| {
                 (
-                    (consumer.name().to_owned(), consumer.can_spill()),
-                    reserved.load(Ordering::Acquire),
+                    (
+                        *consumer_id,
+                        tracked_consumer.name.to_owned(),
+                        tracked_consumer.can_spill,
+                    ),
+                    tracked_consumer.reserved.load(Ordering::Acquire),

Review Comment:
   As a follow on / cleanup it would be possible to refactor this code into 
methods, so like this (and it might make the code easier to read)
   
   ```suggestion
                       tracked_consumer.reserved(),
   ```



##########
datafusion/execution/src/memory_pool/mod.rs:
##########
@@ -149,21 +150,65 @@ pub trait MemoryPool: Send + Sync + std::fmt::Debug {
 /// For help with allocation accounting, see the [`proxy`] module.
 ///
 /// [proxy]: datafusion_common::utils::proxy
-#[derive(Debug, PartialEq, Eq, Hash, Clone)]
+#[derive(Debug)]
 pub struct MemoryConsumer {
     name: String,
     can_spill: bool,
+    id: usize,
+}
+
+impl PartialEq for MemoryConsumer {
+    fn eq(&self, other: &Self) -> bool {
+        let is_same_id = self.id == other.id;
+
+        #[cfg(debug_assertions)]
+        if is_same_id {
+            assert_eq!(self.name, other.name);
+            assert_eq!(self.can_spill, other.can_spill);
+        }
+
+        is_same_id
+    }
+}
+
+impl Eq for MemoryConsumer {}
+
+impl Hash for MemoryConsumer {
+    fn hash<H: Hasher>(&self, state: &mut H) {
+        self.id.hash(state);
+        self.name.hash(state);
+        self.can_spill.hash(state);
+    }
 }
 
 impl MemoryConsumer {
+    fn new_unique_id() -> usize {
+        static ID: atomic::AtomicUsize = atomic::AtomicUsize::new(0);
+        ID.fetch_add(1, atomic::Ordering::Relaxed)
+    }
+
     /// Create a new empty [`MemoryConsumer`] that can be grown using 
[`MemoryReservation`]
     pub fn new(name: impl Into<String>) -> Self {
         Self {
             name: name.into(),
             can_spill: false,
+            id: Self::new_unique_id(),
+        }
+    }
+
+    pub fn clone_with_new_id(&self) -> Self {

Review Comment:
   ```suggestion
       /// Return a clone of the MemoryConsumer that has a new
       /// unique ID assigned. 
       pub fn clone_with_new_id(&self) -> Self {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to