TheBuilderJR opened a new issue, #14757:
URL: https://github.com/apache/datafusion/issues/14757

   ### Describe the bug
   
   I'd expect as I add fields to structs, I should be able to cast one into 
another. You can see in the repro below this doesn't seem to be allowed:
   
   ### To Reproduce
   
   ```
   use std::fs;
   use std::sync::Arc;
   use datafusion::prelude::*;
   use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
   use datafusion::arrow::record_batch::RecordBatch;
   use datafusion::arrow::array::{Array, StringArray, StructArray, 
TimestampMillisecondArray, Float64Array};
   use datafusion::datasource::listing::{ListingOptions, ListingTable, 
ListingTableConfig, ListingTableUrl};
   use datafusion::datasource::file_format::parquet::ParquetFormat;
   use datafusion::dataframe::DataFrameWriteOptions;
   
   #[tokio::test]
   async fn test_datafusion_schema_evolution_with_compaction() -> Result<(), 
Box<dyn std::error::Error>> {
       let ctx = SessionContext::new();
   
       let schema1 = Arc::new(Schema::new(vec![
           Field::new("component", DataType::Utf8, true),
           Field::new("message", DataType::Utf8, true),
           Field::new("stack", DataType::Utf8, true),
           Field::new("timestamp", DataType::Utf8, true),
           Field::new(
               "timestamp_utc",
               DataType::Timestamp(TimeUnit::Millisecond, None),
               true,
           ),
           Field::new(
               "additionalInfo",
               DataType::Struct(vec![
                   Field::new("location", DataType::Utf8, true),
                   Field::new(
                       "timestamp_utc",
                       DataType::Timestamp(TimeUnit::Millisecond, None),
                       true,
                   ),
               ].into()),
               true,
           ),
       ]));
       
       let batch1 = RecordBatch::try_new(
           schema1.clone(),
           vec![
               Arc::new(StringArray::from(vec![Some("component1")])),
               Arc::new(StringArray::from(vec![Some("message1")])),
               Arc::new(StringArray::from(vec![Some("stack_trace")])),
               Arc::new(StringArray::from(vec![Some("2025-02-18T00:00:00Z")])),
               
Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
               Arc::new(StructArray::from(vec![
                   (
                       Arc::new(Field::new("location", DataType::Utf8, true)),
                       Arc::new(StringArray::from(vec![Some("USA")])) as 
Arc<dyn Array>,
                   ),
                   (
                       Arc::new(Field::new(
                           "timestamp_utc",
                           DataType::Timestamp(TimeUnit::Millisecond, None),
                           true,
                       )),
                       
Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
                   ),
               ])),
           ],
       )?;
   
       let path1 = "test_data1.parquet";
       let _ = fs::remove_file(path1);
       
       let df1 = ctx.read_batch(batch1)?;
       df1.write_parquet(
           path1,
           DataFrameWriteOptions::default()
               .with_single_file_output(true)
               .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
           None
       ).await?;
   
       let schema2 = Arc::new(Schema::new(vec![
           Field::new("component", DataType::Utf8, true),
           Field::new("message", DataType::Utf8, true),
           Field::new("stack", DataType::Utf8, true),
           Field::new("timestamp", DataType::Utf8, true),
           Field::new(
               "timestamp_utc",
               DataType::Timestamp(TimeUnit::Millisecond, None),
               true,
           ),
           Field::new(
               "additionalInfo",
               DataType::Struct(vec![
                   Field::new("location", DataType::Utf8, true),
                   Field::new(
                       "timestamp_utc",
                       DataType::Timestamp(TimeUnit::Millisecond, None),
                       true,
                   ),
                   Field::new(
                       "reason",
                       DataType::Struct(vec![
                           Field::new("_level", DataType::Float64, true),
                           Field::new(
                               "details",
                               DataType::Struct(vec![
                                   Field::new("rurl", DataType::Utf8, true),
                                   Field::new("s", DataType::Float64, true),
                                   Field::new("t", DataType::Utf8, true),
                               ].into()),
                               true,
                           ),
                       ].into()),
                       true,
                   ),
               ].into()),
               true,
           ),
       ]));
   
       let batch2 = RecordBatch::try_new(
           schema2.clone(),
           vec![
               Arc::new(StringArray::from(vec![Some("component1")])),
               Arc::new(StringArray::from(vec![Some("message1")])),
               Arc::new(StringArray::from(vec![Some("stack_trace")])),
               Arc::new(StringArray::from(vec![Some("2025-02-18T00:00:00Z")])),
               
Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
               Arc::new(StructArray::from(vec![
                   (
                       Arc::new(Field::new("location", DataType::Utf8, true)),
                       Arc::new(StringArray::from(vec![Some("USA")])) as 
Arc<dyn Array>,
                   ),
                   (
                       Arc::new(Field::new(
                           "timestamp_utc",
                           DataType::Timestamp(TimeUnit::Millisecond, None),
                           true,
                       )),
                       
Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
                   ),
                   (
                       Arc::new(Field::new(
                           "reason",
                           DataType::Struct(vec![
                               Field::new("_level", DataType::Float64, true),
                               Field::new(
                                   "details",
                                   DataType::Struct(vec![
                                       Field::new("rurl", DataType::Utf8, true),
                                       Field::new("s", DataType::Float64, true),
                                       Field::new("t", DataType::Utf8, true),
                                   ].into()),
                                   true,
                               ),
                           ].into()),
                           true,
                       )),
                       Arc::new(StructArray::from(vec![
                           (
                               Arc::new(Field::new("_level", DataType::Float64, 
true)),
                               Arc::new(Float64Array::from(vec![Some(1.5)])) as 
Arc<dyn Array>,
                           ),
                           (
                               Arc::new(Field::new(
                                   "details",
                                   DataType::Struct(vec![
                                       Field::new("rurl", DataType::Utf8, true),
                                       Field::new("s", DataType::Float64, true),
                                       Field::new("t", DataType::Utf8, true),
                                   ].into()),
                                   true,
                               )),
                               Arc::new(StructArray::from(vec![
                                   (
                                       Arc::new(Field::new("rurl", 
DataType::Utf8, true)),
                                       
Arc::new(StringArray::from(vec![Some("https://example.com";)])) as Arc<dyn 
Array>,
                                   ),
                                   (
                                       Arc::new(Field::new("s", 
DataType::Float64, true)),
                                       
Arc::new(Float64Array::from(vec![Some(3.14)])) as Arc<dyn Array>,
                                   ),
                                   (
                                       Arc::new(Field::new("t", DataType::Utf8, 
true)),
                                       
Arc::new(StringArray::from(vec![Some("data")])) as Arc<dyn Array>,
                                   ),
                               ])),
                           ),
                       ])),
                   ),
               ])),
           ],
       )?;
   
       let path2 = "test_data2.parquet";
       let _ = fs::remove_file(path2);
       
       let df2 = ctx.read_batch(batch2)?;
       df2.write_parquet(
           path2,
           DataFrameWriteOptions::default()
               .with_single_file_output(true)
               .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
           None
       ).await?;
   
       let paths_str = vec![path1.to_string(), path2.to_string()];
       let config = ListingTableConfig::new_with_multi_paths(
           paths_str
               .into_iter()
               .map(|p| ListingTableUrl::parse(&p))
               .collect::<Result<Vec<_>, _>>()?
       )
           .with_schema(schema2.as_ref().clone().into())
           .infer(&ctx.state()).await?;
   
       let config = ListingTableConfig {
           options: Some(ListingOptions {
               file_sort_order: vec![vec![
                   col("timestamp_utc").sort(true, true),
               ]],
               ..config.options.unwrap_or_else(|| 
ListingOptions::new(Arc::new(ParquetFormat::default())))
           }),
           ..config
       };
   
       let listing_table = ListingTable::try_new(config)?;
       ctx.register_table("events", Arc::new(listing_table))?;
   
       let df = ctx.sql("SELECT * FROM events ORDER BY timestamp_utc").await?;
       let results = df.clone().collect().await?;
   
       assert_eq!(results[0].num_rows(), 2);
   
       let compacted_path = "test_data_compacted.parquet";
       let _ = fs::remove_file(compacted_path);
   
       df.write_parquet(
           compacted_path,
           DataFrameWriteOptions::default()
               .with_single_file_output(true)
               .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
           None
       ).await?;
   
       let new_ctx = SessionContext::new();
       let config = 
ListingTableConfig::new_with_multi_paths(vec![ListingTableUrl::parse(compacted_path)?])
           .with_schema(schema2.as_ref().clone().into())
           .infer(&new_ctx.state()).await?;
       
       let listing_table = ListingTable::try_new(config)?;
       new_ctx.register_table("events", Arc::new(listing_table))?;
   
       let df = new_ctx.sql("SELECT * FROM events ORDER BY 
timestamp_utc").await?;
       let compacted_results = df.collect().await?;
       
       assert_eq!(compacted_results[0].num_rows(), 2);
       assert_eq!(results, compacted_results);
   
       let _ = fs::remove_file(path1);
       let _ = fs::remove_file(path2);
       let _ = fs::remove_file(compacted_path);
   
       Ok(())
   }
   ```
   
   produces
   
   ```
   thread 'test_datafusion_schema_evolution_with_compaction' panicked at 
/Users/bobren/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-array-54.2.0/src/array/struct_array.rs:91:46:
   called `Result::unwrap()` on an `Err` value: InvalidArgumentError("Incorrect 
datatype for StructArray field \"timestamp_utc\", expected 
Timestamp(Millisecond, Some(\"UTC\")) got Timestamp(Millisecond, None)")
   ```
   
   ### Expected behavior
   
   i expected that test to pass
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to