alamb commented on code in PR #15613: URL: https://github.com/apache/datafusion/pull/15613#discussion_r2035556569
########## datafusion/execution/src/memory_pool/pool.rs: ########## @@ -261,7 +268,7 @@ fn insufficient_capacity_err( pub struct TrackConsumersPool<I> { inner: I, top: NonZeroUsize, - tracked_consumers: Mutex<HashMap<MemoryConsumer, AtomicU64>>, + tracked_consumers: Mutex<HashMap<usize, TrackedConsumer>>, Review Comment: ```suggestion /// Maps consumer_id --> TrackedConsumer tracked_consumers: Mutex<HashMap<usize, TrackedConsumer>>, ``` ########## datafusion/execution/src/memory_pool/pool.rs: ########## @@ -501,12 +507,12 @@ mod tests { // Test: reports if new reservation causes error // using the previously set sizes for other consumers let mut r5 = MemoryConsumer::new("r5").register(&pool); - let expected = "Additional allocation failed with top memory consumers (across reservations) as: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes. Error: Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated for this reservation - 5 bytes remain available for the total pool"; + let expected = format!("Additional allocation failed with top memory consumers (across reservations) as: r1#{}(can spill: false) consumed 50 bytes, r3#{}(can spill: false) consumed 20 bytes, r2#{}(can spill: false) consumed 15 bytes. Error: Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated for this reservation - 5 bytes remain available for the total pool", r1.consumer().id(), r3.consumer().id(), r2.consumer().id()); Review Comment: this is a nice change ########## datafusion/execution/src/memory_pool/pool.rs: ########## @@ -277,40 +284,29 @@ impl<I: MemoryPool> TrackConsumersPool<I> { } } - /// Determine if there are multiple [`MemoryConsumer`]s registered - /// which have the same name. - /// - /// This is very tied to the implementation of the memory consumer. - fn has_multiple_consumers(&self, name: &String) -> bool { - let consumer = MemoryConsumer::new(name); - let consumer_with_spill = consumer.clone().with_can_spill(true); - let guard = self.tracked_consumers.lock(); - guard.contains_key(&consumer) && guard.contains_key(&consumer_with_spill) - } - /// The top consumers in a report string. pub fn report_top(&self, top: usize) -> String { let mut consumers = self .tracked_consumers .lock() .iter() - .map(|(consumer, reserved)| { + .map(|(consumer_id, tracked_consumer)| { ( - (consumer.name().to_owned(), consumer.can_spill()), - reserved.load(Ordering::Acquire), + ( + *consumer_id, + tracked_consumer.name.to_owned(), + tracked_consumer.can_spill, + ), + tracked_consumer.reserved.load(Ordering::Acquire), Review Comment: The same thing goes for other updates later down ########## datafusion/execution/src/memory_pool/mod.rs: ########## @@ -149,21 +150,65 @@ pub trait MemoryPool: Send + Sync + std::fmt::Debug { /// For help with allocation accounting, see the [`proxy`] module. /// /// [proxy]: datafusion_common::utils::proxy -#[derive(Debug, PartialEq, Eq, Hash, Clone)] +#[derive(Debug)] Review Comment: Can you also please add a note to this doc string that says each MemoryConsumer gets an automatically assigned globally unique id and that to clone a memory consumer, see [`MemoryConsumer::clone_with_new_id`] ########## datafusion/execution/src/memory_pool/pool.rs: ########## @@ -277,40 +284,29 @@ impl<I: MemoryPool> TrackConsumersPool<I> { } } - /// Determine if there are multiple [`MemoryConsumer`]s registered - /// which have the same name. - /// - /// This is very tied to the implementation of the memory consumer. - fn has_multiple_consumers(&self, name: &String) -> bool { - let consumer = MemoryConsumer::new(name); - let consumer_with_spill = consumer.clone().with_can_spill(true); - let guard = self.tracked_consumers.lock(); - guard.contains_key(&consumer) && guard.contains_key(&consumer_with_spill) - } - /// The top consumers in a report string. pub fn report_top(&self, top: usize) -> String { let mut consumers = self .tracked_consumers .lock() .iter() - .map(|(consumer, reserved)| { + .map(|(consumer_id, tracked_consumer)| { ( - (consumer.name().to_owned(), consumer.can_spill()), - reserved.load(Ordering::Acquire), + ( + *consumer_id, + tracked_consumer.name.to_owned(), + tracked_consumer.can_spill, + ), + tracked_consumer.reserved.load(Ordering::Acquire), Review Comment: As a follow on / cleanup it would be possible to refactor this code into methods, so like this (and it might make the code easier to read) ```suggestion tracked_consumer.reserved(), ``` ########## datafusion/execution/src/memory_pool/mod.rs: ########## @@ -149,21 +150,65 @@ pub trait MemoryPool: Send + Sync + std::fmt::Debug { /// For help with allocation accounting, see the [`proxy`] module. /// /// [proxy]: datafusion_common::utils::proxy -#[derive(Debug, PartialEq, Eq, Hash, Clone)] +#[derive(Debug)] pub struct MemoryConsumer { name: String, can_spill: bool, + id: usize, +} + +impl PartialEq for MemoryConsumer { + fn eq(&self, other: &Self) -> bool { + let is_same_id = self.id == other.id; + + #[cfg(debug_assertions)] + if is_same_id { + assert_eq!(self.name, other.name); + assert_eq!(self.can_spill, other.can_spill); + } + + is_same_id + } +} + +impl Eq for MemoryConsumer {} + +impl Hash for MemoryConsumer { + fn hash<H: Hasher>(&self, state: &mut H) { + self.id.hash(state); + self.name.hash(state); + self.can_spill.hash(state); + } } impl MemoryConsumer { + fn new_unique_id() -> usize { + static ID: atomic::AtomicUsize = atomic::AtomicUsize::new(0); + ID.fetch_add(1, atomic::Ordering::Relaxed) + } + /// Create a new empty [`MemoryConsumer`] that can be grown using [`MemoryReservation`] pub fn new(name: impl Into<String>) -> Self { Self { name: name.into(), can_spill: false, + id: Self::new_unique_id(), + } + } + + pub fn clone_with_new_id(&self) -> Self { Review Comment: ```suggestion /// Return a clone of the MemoryConsumer that has a new /// unique ID assigned. pub fn clone_with_new_id(&self) -> Self { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org