debajyoti-truefoundry commented on issue #16620:
URL: https://github.com/apache/datafusion/issues/16620#issuecomment-3375880735

   ```rust
   use arrow::array::{ArrayRef, StringArray};
   use arrow::datatypes::{DataType, Field, Schema};
   use arrow::record_batch::RecordBatch;
   use datafusion::prelude::*;
   use parquet::arrow::ArrowWriter;
   use parquet::file::properties::WriterProperties;
   use rand::Rng;
   use std::fs::File;
   use std::sync::Arc;
   
   fn generate_random_8byte_hex() -> String {
       let mut rng = rand::thread_rng();
       let mut bytes = [0u8; 8];
       rng.fill(&mut bytes);
       hex::encode(bytes)
   }
   
   #[tokio::main]
   async fn main() -> Result<(), Box<dyn std::error::Error>> {
       let schema = Arc::new(Schema::new(vec![
           Field::new("span_id", DataType::Utf8, false),
           Field::new("data", DataType::Utf8, false),
       ]));
   
       let temp_file = tempfile::Builder::new()
           .suffix(".parquet")
           .tempfile()?;
       let temp_path = temp_file.path().to_string_lossy().to_string();
   
       let file = File::create(&temp_path)?;
       let props = WriterProperties::builder().build();
       let mut writer = ArrowWriter::try_new(file, schema.clone(), 
Some(props))?;
   
       let batch_size = 10_000;
       let total_rows = 1_000_000;
       let num_batches = total_rows / batch_size;
   
       for _batch_idx in 0..num_batches {
           let mut span_ids = Vec::with_capacity(batch_size);
           let mut data_values = Vec::with_capacity(batch_size);
   
           for _ in 0..batch_size {
               span_ids.push(generate_random_8byte_hex());
               data_values.push(generate_random_8byte_hex());
           }
   
           let span_id_array: ArrayRef = Arc::new(StringArray::from(span_ids));
           let data_array: ArrayRef = Arc::new(StringArray::from(data_values));
   
           let batch = RecordBatch::try_new(
               schema.clone(),
               vec![span_id_array, data_array],
           )?;
   
           writer.write(&batch)?;
       }
   
       writer.close()?;
   
       let ctx = SessionContext::new();
   
       ctx.register_parquet("spans", &temp_path, ParquetReadOptions::default())
           .await?;
   
       let sql = "explain analyze SELECT DISTINCT ON (span_id) span_id, data 
FROM spans ORDER BY span_id LIMIT 10";
   
       let df = ctx.sql(sql).await?;
       let results = df.collect().await?;
   
       for batch in results {
           println!("{}", 
arrow::util::pretty::pretty_format_batches(&[batch])?);
       }
   
       let sql2 = "explain analyze SELECT DISTINCT span_id, data FROM spans 
ORDER BY span_id LIMIT 10";
   
       let df2 = ctx.sql(sql2).await?;
       let results2 = df2.collect().await?;
   
       for batch in results2 {
           println!("{}", 
arrow::util::pretty::pretty_format_batches(&[batch])?);
       }
   
       Ok(())
   }
   ```
   
   ```
                    |     AggregateExec: mode=FinalPartitioned, gby=[span_id@0 
as span_id, data@1 as data], aggr=[], metrics=[output_rows=1000000, 
elapsed_compute=74.551957ms, spill_count=0, spilled_bytes=0.0 B, 
spilled_rows=0, peak_mem_used=115344288]  
   ```
   
   vs
   
   ```
   AggregateExec: mode=FinalPartitioned, gby=[span_id@0 as span_id], 
aggr=[first_value(spans.span_id) ORDER BY [spans.span_id ASC NULLS LAST], 
first_value(spans.data) ORDER BY [spans.span_id ASC NULLS LAST]], 
metrics=[output_rows=1000000, elapsed_compute=28.630642585s, spill_count=0, 
spilled_bytes=0.0 B, spilled_rows=0, peak_mem_used=1170265952]   
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to