TheBuilderJR opened a new issue, #14755:
URL: https://github.com/apache/datafusion/issues/14755

   ### Describe the bug
   
   It should be agnostic, but basically requires files to be ordered in 
evolving order. For the following if you change from 4,1,2,3 to 1,2,3,4 it 
works, but not vice versa
   
   ### To Reproduce
   
   ```
   use std::fs;
   use std::sync::Arc;
   use datafusion::prelude::*;
   use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
   use datafusion::arrow::record_batch::RecordBatch;
   use datafusion::arrow::array::{Array, StringArray, StructArray, 
TimestampMillisecondArray};
   use datafusion::datasource::listing::{ListingOptions, ListingTable, 
ListingTableConfig, ListingTableUrl};
   use datafusion::datasource::file_format::parquet::ParquetFormat;
   use datafusion::dataframe::DataFrameWriteOptions;
   
   #[tokio::test]
   async fn test_datafusion_schema_evolution_with_compaction() -> Result<(), 
Box<dyn std::error::Error>> {
       let ctx = SessionContext::new();
   
       let schema1 = Arc::new(Schema::new(vec![
           Field::new("event", DataType::Utf8, false),
           Field::new("timestamp_utc", 
DataType::Timestamp(TimeUnit::Millisecond, None), false),
       ]));
       
       let batch1 = RecordBatch::try_new(
           schema1.clone(),
           vec![
               Arc::new(StringArray::from(vec!["event1"])),
               Arc::new(TimestampMillisecondArray::from(vec![1640995200000]))
           ]
       )?;
   
       let path1 = "test_data1.parquet";
       let _ = fs::remove_file(path1);
       
       let df1 = ctx.read_batch(batch1)?;
       df1.write_parquet(
           path1,
           DataFrameWriteOptions::default()
               .with_single_file_output(true)
               .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
           None
       ).await?;
   
       let schema2 = Arc::new(Schema::new(vec![
           Field::new("event", DataType::Utf8, false),
           Field::new("timestamp_utc", 
DataType::Timestamp(TimeUnit::Millisecond, None), false),
           Field::new("data", DataType::Struct(vec![
               Field::new("some_data", DataType::Utf8, false)
           ].into()), false),
       ]));
   
       let batch2 = RecordBatch::try_new(
           schema2.clone(),
           vec![
               Arc::new(StringArray::from(vec!["event2"])),
               Arc::new(TimestampMillisecondArray::from(vec![1641081600000])),
               Arc::new(StructArray::from(vec![(
                   Arc::new(Field::new("some_data", DataType::Utf8, false)),
                   Arc::new(StringArray::from(vec!["additional_data"])) as 
Arc<dyn Array>
               )]))
           ]
       )?;
   
       let path2 = "test_data2.parquet";
       let _ = fs::remove_file(path2);
       
       let df2 = ctx.read_batch(batch2)?;
       df2.write_parquet(
           path2,
           DataFrameWriteOptions::default()
               .with_single_file_output(true)
               .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
           None
       ).await?;
   
       let schema3 = Arc::new(Schema::new(vec![
           Field::new("event", DataType::Utf8, false),
           Field::new("timestamp_utc", 
DataType::Timestamp(TimeUnit::Millisecond, None), false),
           Field::new("data", DataType::Struct(vec![
               Field::new("even_more_nested_data", DataType::Struct(vec![
                   Field::new("some_data", DataType::Utf8, false)
               ].into()), false)
           ].into()), false),
       ]));
   
       let batch3 = RecordBatch::try_new(
           schema3.clone(),
           vec![
               Arc::new(StringArray::from(vec!["event3"])),
               Arc::new(TimestampMillisecondArray::from(vec![1641168000000])),
               Arc::new(StructArray::from(vec![(
                   Arc::new(Field::new("even_more_nested_data", 
DataType::Struct(vec![
                       Field::new("some_data", DataType::Utf8, false)
                   ].into()), false)),
                   Arc::new(StructArray::from(vec![(
                       Arc::new(Field::new("some_data", DataType::Utf8, false)),
                       Arc::new(StringArray::from(vec!["deeply_nested_value"])) 
as Arc<dyn Array>
                   )])) as Arc<dyn Array>
               )]))
           ]
       )?;
   
       let path3 = "test_data3.parquet";
       let _ = fs::remove_file(path3);
       
       let df3 = ctx.read_batch(batch3)?;
       df3.write_parquet(
           path3,
           DataFrameWriteOptions::default()
               .with_single_file_output(true)
               .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
           None
       ).await?;
   
       let schema4 = Arc::new(Schema::new(vec![
           Field::new("event", DataType::Utf8, false),
           Field::new("timestamp_utc", 
DataType::Timestamp(TimeUnit::Millisecond, None), false),
           Field::new("data", DataType::Struct(vec![
               Field::new("even_more_nested_data", DataType::Struct(vec![
                   Field::new("some_data", DataType::Struct(vec![
                       Field::new("deepest_data", DataType::Utf8, false)
                   ].into()), false)
               ].into()), false)
           ].into()), false),
       ]));
   
       let batch4 = RecordBatch::try_new(
           schema4.clone(),
           vec![
               Arc::new(StringArray::from(vec!["event4"])),
               Arc::new(TimestampMillisecondArray::from(vec![1641254400000])),
               Arc::new(StructArray::from(vec![(
                   Arc::new(Field::new("even_more_nested_data", 
DataType::Struct(vec![
                       Field::new("some_data", DataType::Struct(vec![
                           Field::new("deepest_data", DataType::Utf8, false)
                       ].into()), false)
                   ].into()), false)),
                   Arc::new(StructArray::from(vec![(
                       Arc::new(Field::new("some_data", DataType::Struct(vec![
                           Field::new("deepest_data", DataType::Utf8, false)
                       ].into()), false)),
                       Arc::new(StructArray::from(vec![(
                           Arc::new(Field::new("deepest_data", DataType::Utf8, 
false)),
                           
Arc::new(StringArray::from(vec!["super_deeply_nested_value"])) as Arc<dyn Array>
                       )])) as Arc<dyn Array>
                   )])) as Arc<dyn Array>
               )]))
           ]
       )?;
   
       let path4 = "test_data4.parquet";
       let _ = fs::remove_file(path4);
       
       let df4 = ctx.read_batch(batch4)?;
       df4.write_parquet(
           path4,
           DataFrameWriteOptions::default()
               .with_single_file_output(true)
               .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
           None
       ).await?;
   
       let paths_str = vec![path4.to_string(), path2.to_string(), 
path3.to_string(), path1.to_string()];
       let config = ListingTableConfig::new_with_multi_paths(
           paths_str
               .into_iter()
               .map(|p| ListingTableUrl::parse(&p))
               .collect::<Result<Vec<_>, _>>()?
       )
           .with_schema(schema4.as_ref().clone().into())
           .infer(&ctx.state()).await?;
   
       let config = ListingTableConfig {
           options: Some(ListingOptions {
               file_sort_order: vec![vec![
                   col("timestamp_utc").sort(true, true),
               ]],
               ..config.options.unwrap_or_else(|| 
ListingOptions::new(Arc::new(ParquetFormat::default())))
           }),
           ..config
       };
   
       let listing_table = ListingTable::try_new(config)?;
       ctx.register_table("events", Arc::new(listing_table))?;
   
       let df = ctx.sql("SELECT * FROM events ORDER BY event").await?;
       let results = df.clone().collect().await?;
   
       assert_eq!(results[0].num_rows(), 4);
   
       let compacted_path = "test_data_compacted.parquet";
       let _ = fs::remove_file(compacted_path);
   
       df.write_parquet(
           compacted_path,
           DataFrameWriteOptions::default()
               .with_single_file_output(true)
               .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
           None
       ).await?;
   
       let new_ctx = SessionContext::new();
       let config = 
ListingTableConfig::new_with_multi_paths(vec![ListingTableUrl::parse(compacted_path)?])
           .with_schema(schema4.as_ref().clone().into())
           .infer(&new_ctx.state()).await?;
       
       let listing_table = ListingTable::try_new(config)?;
       new_ctx.register_table("events", Arc::new(listing_table))?;
   
       let df = new_ctx.sql("SELECT * FROM events ORDER BY event").await?;
       let compacted_results = df.collect().await?;
       
       assert_eq!(compacted_results[0].num_rows(), 4);
       assert_eq!(results, compacted_results);
   
       let _ = fs::remove_file(path1);
       let _ = fs::remove_file(path2);
       let _ = fs::remove_file(path3);
       let _ = fs::remove_file(path4);
       let _ = fs::remove_file(compacted_path);
   
       Ok(())
   }
   ```
   
   produces
   
   ```
   rror: Plan("Cannot cast file schema field data of type Struct([Field { name: 
\"some_data\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: 
false, metadata: {} }]) to table schema field of type Struct([Field { name: 
\"even_more_nested_data\", data_type: Struct([Field { name: \"some_data\", 
data_type: Struct([Field { name: \"deepest_data\", data_type: Utf8, nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, 
dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 
0, dict_is_ordered: false, metadata: {} }])
   ```
   
   ### Expected behavior
   
   it should work irrespective of file order
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to