kosiew commented on code in PR #16305: URL: https://github.com/apache/datafusion/pull/16305#discussion_r2134364184
########## datafusion/core/src/datasource/listing/table.rs: ########## @@ -2452,4 +2178,382 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn infer_preserves_provided_schema() -> Result<()> { + let ctx = SessionContext::new(); + + let testdata = datafusion_test_data(); + let filename = format!("{testdata}/aggregate_simple.csv"); + let table_path = ListingTableUrl::parse(filename).unwrap(); + + let provided_schema = create_test_schema(); + + let config = + ListingTableConfig::new(table_path).with_schema(Arc::clone(&provided_schema)); + + let config = config.infer(&ctx.state()).await?; + + assert_eq!(*config.file_schema.unwrap(), *provided_schema); + + Ok(()) + } + + #[tokio::test] + async fn test_listing_table_config_with_multiple_files_comprehensive() -> Result<()> { + let ctx = SessionContext::new(); + + // Create test files with different schemas + let tmp_dir = TempDir::new()?; + let file_path1 = tmp_dir.path().join("file1.csv"); + let file_path2 = tmp_dir.path().join("file2.csv"); + + // File 1: c1,c2,c3 + let mut file1 = std::fs::File::create(&file_path1)?; + writeln!(file1, "c1,c2,c3")?; + writeln!(file1, "1,2,3")?; + writeln!(file1, "4,5,6")?; + + // File 2: c1,c2,c3,c4 + let mut file2 = std::fs::File::create(&file_path2)?; + writeln!(file2, "c1,c2,c3,c4")?; + writeln!(file2, "7,8,9,10")?; + writeln!(file2, "11,12,13,14")?; + + // Parse paths + let table_path1 = ListingTableUrl::parse(file_path1.to_str().unwrap())?; + let table_path2 = ListingTableUrl::parse(file_path2.to_str().unwrap())?; + + // Create format and options + let format = CsvFormat::default().with_has_header(true); + let options = ListingOptions::new(Arc::new(format)); + + // Test case 1: Infer schema using first file's schema + let config1 = ListingTableConfig::new_with_multi_paths(vec![ + table_path1.clone(), + table_path2.clone(), + ]) + .with_listing_options(options.clone()); + let config1 = config1.infer_schema(&ctx.state()).await?; + assert_eq!(*config1.schema_source(), SchemaSource::Inferred); + + // Verify schema matches first file + let schema1 = config1.file_schema.as_ref().unwrap().clone(); + assert_eq!(schema1.fields().len(), 3); + assert_eq!(schema1.field(0).name(), "c1"); + assert_eq!(schema1.field(1).name(), "c2"); + assert_eq!(schema1.field(2).name(), "c3"); + + // Test case 2: Use specified schema with 3 columns + let schema_3cols = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Utf8, true), + Field::new("c2", DataType::Utf8, true), + Field::new("c3", DataType::Utf8, true), + ])); + + let config2 = ListingTableConfig::new_with_multi_paths(vec![ + table_path1.clone(), + table_path2.clone(), + ]) + .with_schema(schema_3cols) + .with_listing_options(options.clone()); + let config2 = config2.infer_schema(&ctx.state()).await?; + assert_eq!(*config2.schema_source(), SchemaSource::Specified); + + // Verify that the schema is still the one we specified (3 columns) + let schema2 = config2.file_schema.as_ref().unwrap().clone(); + assert_eq!(schema2.fields().len(), 3); + assert_eq!(schema2.field(0).name(), "c1"); + assert_eq!(schema2.field(1).name(), "c2"); + assert_eq!(schema2.field(2).name(), "c3"); + + // Test case 3: Use specified schema with 4 columns + let schema_4cols = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Utf8, true), + Field::new("c2", DataType::Utf8, true), + Field::new("c3", DataType::Utf8, true), + Field::new("c4", DataType::Utf8, true), + ])); + + let config3 = ListingTableConfig::new_with_multi_paths(vec![ + table_path1.clone(), + table_path2.clone(), + ]) + .with_schema(schema_4cols) + .with_listing_options(options.clone()); + let config3 = config3.infer_schema(&ctx.state()).await?; + assert_eq!(*config3.schema_source(), SchemaSource::Specified); + + // Verify that the schema is still the one we specified (4 columns) + let schema3 = config3.file_schema.as_ref().unwrap().clone(); + assert_eq!(schema3.fields().len(), 4); + assert_eq!(schema3.field(0).name(), "c1"); + assert_eq!(schema3.field(1).name(), "c2"); + assert_eq!(schema3.field(2).name(), "c3"); + assert_eq!(schema3.field(3).name(), "c4"); + + // Test case 4: Verify order matters when inferring schema + let config4 = ListingTableConfig::new_with_multi_paths(vec![ + table_path2.clone(), + table_path1.clone(), + ]) + .with_listing_options(options); + let config4 = config4.infer_schema(&ctx.state()).await?; + + // Should use first file's schema, which now has 4 columns + let schema4 = config4.file_schema.as_ref().unwrap().clone(); + assert_eq!(schema4.fields().len(), 4); + assert_eq!(schema4.field(0).name(), "c1"); + assert_eq!(schema4.field(1).name(), "c2"); + assert_eq!(schema4.field(2).name(), "c3"); + assert_eq!(schema4.field(3).name(), "c4"); + + Ok(()) + } + + #[tokio::test] + async fn test_list_files_configurations() -> Result<()> { + // Define common test cases as (description, files, paths, target_partitions, expected_partitions, file_ext) + let test_cases = vec![ + // Single path cases + ( + "Single path, more partitions than files", + generate_test_files("bucket/key-prefix", 5), + vec!["test:///bucket/key-prefix/"], + 12, + 5, + Some(""), + ), + ( + "Single path, equal partitions and files", + generate_test_files("bucket/key-prefix", 4), + vec!["test:///bucket/key-prefix/"], + 4, + 4, + Some(""), + ), + ( + "Single path, more files than partitions", + generate_test_files("bucket/key-prefix", 5), + vec!["test:///bucket/key-prefix/"], + 2, + 2, + Some(""), + ), + // Multi path cases + ( + "Multi path, more partitions than files", + { + let mut files = generate_test_files("bucket/key1", 3); + files.extend(generate_test_files_with_start("bucket/key2", 2, 3)); + files.extend(generate_test_files_with_start("bucket/key3", 1, 5)); + files + }, + vec!["test:///bucket/key1/", "test:///bucket/key2/"], + 12, + 5, + Some(""), + ), + // No files case + ( + "No files", + vec![], + vec!["test:///bucket/key-prefix/"], + 2, + 0, + Some(""), + ), + // Exact path cases + ( + "Exact paths test", + { + let mut files = generate_test_files("bucket/key1", 3); + files.extend(generate_test_files_with_start("bucket/key2", 2, 3)); + files + }, + vec![ + "test:///bucket/key1/file0", + "test:///bucket/key1/file1", + "test:///bucket/key1/file2", + "test:///bucket/key2/file3", + "test:///bucket/key2/file4", + ], + 12, + 5, + Some(""), + ), + ]; + + // Run each test case + for (test_name, files, paths, target_partitions, expected_partitions, file_ext) in + test_cases + { + println!("Running test: {test_name}"); + + if files.is_empty() { + // Test empty files case + assert_list_files_for_multi_paths( + &[], + &paths, + target_partitions, + expected_partitions, + file_ext, + ) + .await?; + } else if paths.len() == 1 { + // Test using single path API + let file_refs: Vec<&str> = files.iter().map(|s| s.as_str()).collect(); + assert_list_files_for_scan_grouping( + &file_refs, + paths[0], + target_partitions, + expected_partitions, + file_ext, + ) + .await?; + } else if paths[0].contains("test:///bucket/key") { + // Test using multi path API + let file_refs: Vec<&str> = files.iter().map(|s| s.as_str()).collect(); + assert_list_files_for_multi_paths( + &file_refs, + &paths, + target_partitions, + expected_partitions, + file_ext, + ) + .await?; + } else { + // Test using exact path API for specific cases + let file_refs: Vec<&str> = files.iter().map(|s| s.as_str()).collect(); + assert_list_files_for_exact_paths( + &file_refs, + target_partitions, + expected_partitions, + file_ext, + ) + .await?; + } + } + + Ok(()) + } + + #[cfg(feature = "parquet")] + #[tokio::test] + async fn test_table_stats_behaviors() -> Result<()> { + use crate::datasource::file_format::parquet::ParquetFormat; + + let testdata = crate::test_util::parquet_test_data(); + let filename = format!("{}/{}", testdata, "alltypes_plain.parquet"); + let table_path = ListingTableUrl::parse(filename).unwrap(); + + let ctx = SessionContext::new(); + let state = ctx.state(); + + // Test 1: Default behavior - stats not collected + let opt_default = ListingOptions::new(Arc::new(ParquetFormat::default())); + let schema_default = opt_default.infer_schema(&state, &table_path).await?; + let config_default = ListingTableConfig::new(table_path.clone()) + .with_listing_options(opt_default) + .with_schema(schema_default); + let table_default = ListingTable::try_new(config_default)?; + + let exec_default = table_default.scan(&state, None, &[], None).await?; + assert_eq!( + exec_default.partition_statistics(None)?.num_rows, + Precision::Absent + ); + + // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 + assert_eq!( + exec_default.partition_statistics(None)?.total_byte_size, + Precision::Absent + ); Review Comment: Replace this excerpt from previous do_not_load_table_stats_by_default ```rust let opt = ListingOptions::new(Arc::new(ParquetFormat::default())); let schema = opt.infer_schema(&state, &table_path).await?; let config = ListingTableConfig::new(table_path.clone()) .with_listing_options(opt) .with_schema(schema); let table = ListingTable::try_new(config)?; let exec = table.scan(&state, None, &[], None).await?; assert_eq!(exec.partition_statistics(None)?.num_rows, Precision::Absent); // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 assert_eq!( exec.partition_statistics(None)?.total_byte_size, Precision::Absent ); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org