TheBuilderJR commented on issue #13270: URL: https://github.com/apache/datafusion/issues/13270#issuecomment-2666296334
@zhuqi-lucas here's a concrete repro for what I expect datafusion to be able to do but can't ``` use std::fs; use std::sync::Arc; use datafusion::prelude::*; use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::arrow::array::{Array, StringArray, StructArray, TimestampMillisecondArray}; use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::dataframe::DataFrameWriteOptions; #[tokio::test] async fn test_schema_evolution_with_compaction() -> Result<(), Box<dyn std::error::Error>> { let ctx = SessionContext::new(); let schema1 = Arc::new(Schema::new(vec![ Field::new("event", DataType::Utf8, false), Field::new("timestamp_utc", DataType::Timestamp(TimeUnit::Millisecond, None), false), ])); let batch1 = RecordBatch::try_new( schema1.clone(), vec![ Arc::new(StringArray::from(vec!["event1"])), Arc::new(TimestampMillisecondArray::from(vec![1640995200000])) ] )?; let path1 = "test_data1.parquet"; let _ = fs::remove_file(path1); let df1 = ctx.read_batch(batch1)?; df1.write_parquet( path1, DataFrameWriteOptions::default() .with_single_file_output(true) .with_sort_by(vec![col("timestamp_utc").sort(true, true)]), None ).await?; let schema2 = Arc::new(Schema::new(vec![ Field::new("event", DataType::Utf8, false), Field::new("timestamp_utc", DataType::Timestamp(TimeUnit::Millisecond, None), false), Field::new("data", DataType::Struct(vec![ Field::new("some_data", DataType::Utf8, false) ].into()), false), ])); let batch2 = RecordBatch::try_new( schema2.clone(), vec![ Arc::new(StringArray::from(vec!["event2"])), Arc::new(TimestampMillisecondArray::from(vec![1641081600000])), Arc::new(StructArray::from(vec![( Arc::new(Field::new("some_data", DataType::Utf8, false)), Arc::new(StringArray::from(vec!["additional_data"])) as Arc<dyn Array> )])) ] )?; let path2 = "test_data2.parquet"; let _ = fs::remove_file(path2); let df2 = ctx.read_batch(batch2)?; df2.write_parquet( path2, DataFrameWriteOptions::default() .with_single_file_output(true) .with_sort_by(vec![col("timestamp_utc").sort(true, true)]), None ).await?; let paths_str = vec![path1.to_string(), path2.to_string()]; let config = ListingTableConfig::new_with_multi_paths( paths_str .into_iter() .map(|p| ListingTableUrl::parse(&p)) .collect::<Result<Vec<_>, _>>()? ) .with_schema(schema2.as_ref().clone().into()) .infer(&ctx.state()).await?; let config = ListingTableConfig { options: Some(ListingOptions { file_sort_order: vec![vec![ col("timestamp_utc").sort(true, true), ]], ..config.options.unwrap_or_else(|| ListingOptions::new(Arc::new(ParquetFormat::default()))) }), ..config }; let listing_table = ListingTable::try_new(config)?; ctx.register_table("events", Arc::new(listing_table))?; let df = ctx.sql("SELECT * FROM events ORDER BY event").await?; let results = df.clone().collect().await?; assert_eq!(results.len(), 2); let compacted_path = "test_data_compacted.parquet"; let _ = fs::remove_file(compacted_path); df.write_parquet( compacted_path, DataFrameWriteOptions::default() .with_single_file_output(true) .with_sort_by(vec![col("timestamp_utc").sort(true, true)]), None ).await?; let new_ctx = SessionContext::new(); let config = ListingTableConfig::new_with_multi_paths(vec![ListingTableUrl::parse(compacted_path)?]) .with_schema(schema2.as_ref().clone().into()) .infer(&new_ctx.state()).await?; let listing_table = ListingTable::try_new(config)?; new_ctx.register_table("compacted_events", Arc::new(listing_table))?; let df = new_ctx.sql("SELECT * FROM compacted_events ORDER BY event").await?; let compacted_results = df.collect().await?; assert_eq!(compacted_results.len(), 2); assert_eq!(results, compacted_results); let _ = fs::remove_file(path1); let _ = fs::remove_file(path2); let _ = fs::remove_file(compacted_path); Ok(()) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org