bharath-techie opened a new issue, #19386:
URL: https://github.com/apache/datafusion/issues/19386

   ### Is your feature request related to a problem or challenge?
   
   More context in 
https://github.com/apache/datafusion/issues/19216#issuecomment-3633472687
   How to reproduce  ? 
   
   Take clickbench workload
   ```
   datafusion-cli -m 8G
   DataFusion CLI v51.0.0
   > SET datafusion.execution.target_partitions=1;
   0 row(s) fetched. 
   Elapsed 0.001 seconds.
   
   > CREATE EXTERNAL TABLE hits 
   STORED AS PARQUET 
   LOCATION '/home/ec2-user/clickdata/partitioned/hits/*.parquet';
   0 row(s) fetched. 
   Elapsed 0.054 seconds.
   
   > SELECT "URL", COUNT(*) AS c FROM hits GROUP BY "URL" ORDER BY c DESC LIMIT 
10;
   Resources exhausted: Additional allocation failed for TopK[0] with top 
memory consumers (across reservations) as:
     TopK[0]#4(can spill: false) consumed 7.8 GB, peak 7.8 GB,
     GroupedHashAggregateStream[0] (count(1))#3(can spill: true) consumed 80.4 
KB, peak 4.6 GB,
     DataFusion-Cli#2(can spill: false) consumed 0.0 B, peak 0.0 B.
   Error: Failed to allocate additional 3.9 GB for TopK[0] with 7.8 GB already 
allocated for this reservation - 187.0 MB remain available for the total pool
   > 
   ```
   
   With 8 GB and 1 partition, the above query `SELECT "URL", COUNT(*) AS c FROM 
hits GROUP BY "URL" ORDER BY c DESC LIMIT 10;` fails as the hash aggregate step 
for URL field is quite heavy.
   
   For each batch that gets inserted and referred to in TopK , the entire batch 
size gets added during the size estimation. [ as mentioned in 
https://github.com/https://github.com/apache/datafusion/issues/9562 ]
   
   ```
   Path : datafusion/datafusion/datafusion/physical-plan/src/topk/mod.rs
   
   /// Insert a record batch entry into this store, tracking its
       /// memory use, if it has any uses
       pub fn insert(&mut self, entry: RecordBatchEntry) {
           // uses of 0 means that none of the rows in the batch were stored in 
the topk
           if entry.uses > 0 {
               let size = get_record_batch_memory_size(&entry.batch);
               self.batches_size += size;
               println!("size during insert : {}", size);
               self.batches.insert(entry.id, entry);
           }
       }
   
   /// returns the size of memory used by this store, including all
       /// referenced `RecordBatch`es, in bytes
       pub fn size(&self) -> usize {
           // size_of::<Self>()
           //     + self.batches.capacity() * (size_of::<u32>() + 
size_of::<RecordBatchEntry>())
           //     + self.batches_size
   
           let sizeOfSelf = size_of::<Self>();
           let capacity = self.batches.capacity();
           let u32RecordBatch = size_of::<u32>() + 
size_of::<RecordBatchEntry>();
           let batchesSize = self.batches_size;
   
           let size = sizeOfSelf + capacity * u32RecordBatch + batchesSize;
           println!("self size : {} , capacity : {} , heap size : {}, batch 
size : {}",
                       sizeOfSelf, capacity, u32RecordBatch, batchesSize);
           println!("size during get : {}", size);
           size
       }
   
   ```
   
   
   ```
   Record batch size during insert : 4196909056
   self size : 48 , capacity : 3 , heap size : 60, batch size : 4196909056
   size during get : 4196909284
   
   Record batch size during insert size during insert : 4196909056
   self size : 48 , capacity : 3 , heap size : 60, batch size : 8393818112
   size during get : 8393818340
   
   Record batch size during insert size during insert : 4196909056
   self size : 48 , capacity : 3 , heap size : 60, batch size : 12590727168
   size during get : 12590727396
   ```
   
   ### Describe the solution you'd like
   
   Implement force compaction in topK once we hit memory limit as mentioned in 
https://github.com/apache/datafusion/issues/9417#issuecomment-2431943283
   
   @alamb also mentioned 
https://github.com/apache/datafusion/issues/9417#issuecomment-2432677248 ,  
https://github.com/apache/datafusion/issues/16841#issuecomment-3113393092 - 
potentially there are other places where we could similar optimization ?
   
   
   
   
   
   ### Describe alternatives you've considered
   
   _No response_
   
   ### Additional context
   
   The main problem is multiple counting as that's the reason why this 
particular query fails with memory limit.
   - https://github.com/apache/datafusion/pull/18928 - will this help ?  
   - https://github.com/apache/datafusion/issues/16841 - I see multiple 
approaches being discussed for fixing double counting issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to