TheBuilderJR opened a new issue, #14753:
URL: https://github.com/apache/datafusion/issues/14753

   ### Describe the bug
   
   Basically I want
   
   1) Schema evolution so if we add more fields, I don't have to remigrate all 
the old data
   2) Compaction support so I can take multiple parquet files of evolving 
schemas and merge them into one
   3) ListingTable support to query all these various types of evolving schema 
parquet files
   
   I've written a repro below. I think in the past "schema evolution" has been 
too vague and hard to define. Perhaps the following integration test can be 
used as a "we are done with v1 of schema evolution" test? cc @alamb 
   
   ### To Reproduce
   
   ```
   use std::fs;
   use std::sync::Arc;
   use datafusion::prelude::*;
   use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
   use datafusion::arrow::record_batch::RecordBatch;
   use datafusion::arrow::array::{Array, StringArray, StructArray, 
TimestampMillisecondArray};
   use datafusion::datasource::listing::{ListingOptions, ListingTable, 
ListingTableConfig, ListingTableUrl};
   use datafusion::datasource::file_format::parquet::ParquetFormat;
   use datafusion::dataframe::DataFrameWriteOptions;
   
   #[tokio::test]
   async fn test_schema_evolution_with_compaction() -> Result<(), Box<dyn 
std::error::Error>> {
       let ctx = SessionContext::new();
   
       let schema1 = Arc::new(Schema::new(vec![
           Field::new("event", DataType::Utf8, false),
           Field::new("timestamp_utc", 
DataType::Timestamp(TimeUnit::Millisecond, None), false),
       ]));
       
       let batch1 = RecordBatch::try_new(
           schema1.clone(),
           vec![
               Arc::new(StringArray::from(vec!["event1"])),
               Arc::new(TimestampMillisecondArray::from(vec![1640995200000]))
           ]
       )?;
   
       let path1 = "test_data1.parquet";
       let _ = fs::remove_file(path1);
       
       let df1 = ctx.read_batch(batch1)?;
       df1.write_parquet(
           path1,
           DataFrameWriteOptions::default()
               .with_single_file_output(true)
               .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
           None
       ).await?;
   
       let schema2 = Arc::new(Schema::new(vec![
           Field::new("event", DataType::Utf8, false),
           Field::new("timestamp_utc", 
DataType::Timestamp(TimeUnit::Millisecond, None), false),
           Field::new("data", DataType::Struct(vec![
               Field::new("some_data", DataType::Utf8, false)
           ].into()), false),
       ]));
   
       let batch2 = RecordBatch::try_new(
           schema2.clone(),
           vec![
               Arc::new(StringArray::from(vec!["event2"])),
               Arc::new(TimestampMillisecondArray::from(vec![1641081600000])),
               Arc::new(StructArray::from(vec![(
                   Arc::new(Field::new("some_data", DataType::Utf8, false)),
                   Arc::new(StringArray::from(vec!["additional_data"])) as 
Arc<dyn Array>
               )]))
           ]
       )?;
   
       let path2 = "test_data2.parquet";
       let _ = fs::remove_file(path2);
       
       let df2 = ctx.read_batch(batch2)?;
       df2.write_parquet(
           path2,
           DataFrameWriteOptions::default()
               .with_single_file_output(true)
               .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
           None
       ).await?;
   
       let paths_str = vec![path1.to_string(), path2.to_string()];
       let config = ListingTableConfig::new_with_multi_paths(
           paths_str
               .into_iter()
               .map(|p| ListingTableUrl::parse(&p))
               .collect::<Result<Vec<_>, _>>()?
       )
           .with_schema(schema2.as_ref().clone().into())
           .infer(&ctx.state()).await?;
   
       let config = ListingTableConfig {
           options: Some(ListingOptions {
               file_sort_order: vec![vec![
                   col("timestamp_utc").sort(true, true),
               ]],
               ..config.options.unwrap_or_else(|| 
ListingOptions::new(Arc::new(ParquetFormat::default())))
           }),
           ..config
       };
   
       let listing_table = ListingTable::try_new(config)?;
       ctx.register_table("events", Arc::new(listing_table))?;
   
       let df = ctx.sql("SELECT * FROM events ORDER BY event").await?;
       let results = df.clone().collect().await?;
       
       assert_eq!(results.len(), 2);
   
       let compacted_path = "test_data_compacted.parquet";
       let _ = fs::remove_file(compacted_path);
   
       df.write_parquet(
           compacted_path,
           DataFrameWriteOptions::default()
               .with_single_file_output(true)
               .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
           None
       ).await?;
   
       let new_ctx = SessionContext::new();
       let config = 
ListingTableConfig::new_with_multi_paths(vec![ListingTableUrl::parse(compacted_path)?])
           .with_schema(schema2.as_ref().clone().into())
           .infer(&new_ctx.state()).await?;
       
       let listing_table = ListingTable::try_new(config)?;
       new_ctx.register_table("compacted_events", Arc::new(listing_table))?;
   
       let df = new_ctx.sql("SELECT * FROM compacted_events ORDER BY 
event").await?;
       let compacted_results = df.collect().await?;
       
       assert_eq!(compacted_results.len(), 2);
       assert_eq!(results, compacted_results);
   
       let _ = fs::remove_file(path1);
       let _ = fs::remove_file(path2);
       let _ = fs::remove_file(compacted_path);
   
       Ok(())
   }
   ```
   
   ### Expected behavior
   
   It gets all the way through
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to