findepi commented on code in PR #12289:
URL: https://github.com/apache/datafusion/pull/12289#discussion_r1741180122


##########
datafusion/physical-plan/src/aggregates/topk/priority_map.rs:
##########
@@ -25,17 +25,12 @@ use datafusion_common::Result;
 
 /// A `Map<K, V>` / `PriorityQueue` combo that evicts the worst values after 
reaching `capacity`
 pub struct PriorityMap {
-    map: Box<dyn ArrowHashTable>,
-    heap: Box<dyn ArrowHeap>,
+    map: Box<dyn ArrowHashTable + Send>,
+    heap: Box<dyn ArrowHeap + Send>,
     capacity: usize,
     mapper: Vec<(usize, usize)>,
 }
 
-// JUSTIFICATION

Review Comment:
   removing `unsafe impl Send for PriorityMap {}` line alone gives compilation 
error, because rustc does not infer  `PriorityMap` to be `Send`
   
   ```
    main *$ cargo build
      Compiling datafusion-physical-plan v41.0.0 
(/Users/findepi/repos/datafusion/datafusion/physical-plan)
   error[E0277]: `(dyn ArrowHashTable + 'static)` cannot be sent between 
threads safely
      --> datafusion/physical-plan/src/aggregates/mod.rs:249:9
       |
   249 | /         match stream {
   250 | |             StreamType::AggregateStream(stream) => Box::pin(stream),
   251 | |             StreamType::GroupedHash(stream) => Box::pin(stream),
   252 | |             StreamType::GroupedPriorityQueue(stream) => 
Box::pin(stream),
   253 | |         }
       | |_________^ `(dyn ArrowHashTable + 'static)` cannot be sent between 
threads safely
       |
       = help: the trait `std::marker::Send` is not implemented for `(dyn 
ArrowHashTable + 'static)`, which is required by `GroupedTopKAggregateStream: 
std::marker::Send`
       = note: required for `std::ptr::Unique<(dyn ArrowHashTable + 'static)>` 
to implement `std::marker::Send`
   note: required because it appears within the type `Box<(dyn ArrowHashTable + 
'static)>`
      --> 
/Users/findepi/.rustup/toolchains/stable-aarch64-apple-darwin/lib/rustlib/src/rust/library/alloc/src/boxed.rs:237:12
       |
   237 | pub struct Box<
       |            ^^^
   note: required because it appears within the type `PriorityMap`
      --> datafusion/physical-plan/src/aggregates/topk/priority_map.rs:27:12
       |
   27  | pub struct PriorityMap {
       |            ^^^^^^^^^^^
   note: required because it appears within the type 
`GroupedTopKAggregateStream`
      --> datafusion/physical-plan/src/aggregates/topk_stream.rs:39:12
       |
   39  | pub struct GroupedTopKAggregateStream {
       |            ^^^^^^^^^^^^^^^^^^^^^^^^^^
       = note: required for the cast from 
`Pin<Box<GroupedTopKAggregateStream>>` to `Pin<Box<dyn RecordBatchStream + 
std::marker::Send>>`
   
   error[E0277]: `(dyn ArrowHeap + 'static)` cannot be sent between threads 
safely
      --> datafusion/physical-plan/src/aggregates/mod.rs:249:9
       |
   249 | /         match stream {
   250 | |             StreamType::AggregateStream(stream) => Box::pin(stream),
   251 | |             StreamType::GroupedHash(stream) => Box::pin(stream),
   252 | |             StreamType::GroupedPriorityQueue(stream) => 
Box::pin(stream),
   253 | |         }
       | |_________^ `(dyn ArrowHeap + 'static)` cannot be sent between threads 
safely
       |
       = help: the trait `std::marker::Send` is not implemented for `(dyn 
ArrowHeap + 'static)`, which is required by `GroupedTopKAggregateStream: 
std::marker::Send`
       = note: required for `std::ptr::Unique<(dyn ArrowHeap + 'static)>` to 
implement `std::marker::Send`
   note: required because it appears within the type `Box<(dyn ArrowHeap + 
'static)>`
      --> 
/Users/findepi/.rustup/toolchains/stable-aarch64-apple-darwin/lib/rustlib/src/rust/library/alloc/src/boxed.rs:237:12
       |
   237 | pub struct Box<
       |            ^^^
   note: required because it appears within the type `PriorityMap`
      --> datafusion/physical-plan/src/aggregates/topk/priority_map.rs:27:12
       |
   27  | pub struct PriorityMap {
       |            ^^^^^^^^^^^
   note: required because it appears within the type 
`GroupedTopKAggregateStream`
      --> datafusion/physical-plan/src/aggregates/topk_stream.rs:39:12
       |
   39  | pub struct GroupedTopKAggregateStream {
       |            ^^^^^^^^^^^^^^^^^^^^^^^^^^
       = note: required for the cast from 
`Pin<Box<GroupedTopKAggregateStream>>` to `Pin<Box<dyn RecordBatchStream + 
std::marker::Send>>`
   
   For more information about this error, try `rustc --explain E0277`.
   error: could not compile `datafusion-physical-plan` (lib) due to 2 previous 
errors
   ```
   
   however, removing `unsafe impl Send for PriorityMap {}` **plus** other 
changes in this PR keeps `PriorityMap` as `Send`, so the code works exactly as 
it does on current main



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to