ethan-tyler commented on code in PR #17702:
URL: https://github.com/apache/datafusion/pull/17702#discussion_r2608310624


##########
datafusion/core/src/datasource/listing_table_factory.rs:
##########
@@ -519,4 +574,55 @@ mod tests {
         let listing_options = listing_table.options();
         assert!(listing_options.table_partition_cols.is_empty());
     }
+
+    #[tokio::test]
+    async fn test_create_using_multiple_locations() {
+        let dir = tempfile::tempdir().unwrap();
+        let mut path = PathBuf::from(dir.path());
+        path.push("folder-1");
+        path.push("folder-2");
+        fs::create_dir_all(&path).unwrap();

Review Comment:
   Think there's a bug here - this creates <tmp>/folder-1/folder-2 (nested) but 
then tries to register <tmp>/folder-1 and <tmp>/folder-2 as separate paths. 
folder-2 doesn't exist as a sibling.
   
   Should probably be:
   ```rust
   fs::create_dir_all(dir.path().join("folder-1")).unwrap();
   fs::create_dir_all(dir.path().join("folder-2")).unwrap();
   ```
   
   Neither folder has any files so schema inference has nothing to work with.



##########
datafusion/core/src/datasource/listing_table_factory.rs:
##########
@@ -63,137 +65,190 @@ impl TableProviderFactory for ListingTableFactory {
             ))?
             .create(session_state, &cmd.options)?;
 
-        let mut table_path = ListingTableUrl::parse(&cmd.location)?;
-        let file_extension = match table_path.is_collection() {
-            // Setting the extension to be empty instead of allowing the 
default extension seems
-            // odd, but was done to ensure existing behavior isn't modified. 
It seems like this
-            // could be refactored to either use the default extension or set 
the fully expected
-            // extension when compression is included (e.g. ".csv.gz")
-            true => "",
-            false => &get_extension(cmd.location.as_str()),
+        let file_extension = match cmd.locations.len() {
+            1 => {
+                let table_path = ListingTableUrl::parse(&cmd.locations[0])?;
+                match table_path.is_collection() {
+                    // Setting the extension to be empty instead of allowing 
the default extension seems
+                    // odd, but was done to ensure existing behavior isn't 
modified. It seems like this
+                    // could be refactored to either use the default extension 
or set the fully expected
+                    // extension when compression is included (e.g. ".csv.gz").
+                    // We do the same if there are multiple locations provided 
for the table.
+                    true => "",
+                    false => &get_extension(cmd.locations[0].as_str()),
+                }
+            }
+            _ => "",
         };
+
         let mut options = ListingOptions::new(file_format)
             .with_session_config_options(session_state.config())
             .with_file_extension(file_extension);
 
-        let (provided_schema, table_partition_cols) = if 
cmd.schema.fields().is_empty() {
-            let infer_parts = session_state
-                .config_options()
-                .execution
-                .listing_table_factory_infer_partitions;
-            let part_cols = if cmd.table_partition_cols.is_empty() && 
infer_parts {
-                options
-                    .infer_partitions(session_state, &table_path)
-                    .await?
-                    .into_iter()
-            } else {
-                cmd.table_partition_cols.clone().into_iter()
-            };
-
-            (
-                None,
-                part_cols
-                    .map(|p| {
-                        (
-                            p,
-                            DataType::Dictionary(
-                                Box::new(DataType::UInt16),
-                                Box::new(DataType::Utf8),
-                            ),
-                        )
-                    })
-                    .collect::<Vec<_>>(),
+        let table_paths: Vec<ListingTableUrl> = cmd
+            .locations
+            .iter()
+            .map(|loc| ListingTableUrl::parse(loc))
+            .collect::<Result<Vec<_>>>()?;
+
+        // We use the first location to infer the partition columns,
+        // primarily for performance and simplicity reasons.
+        let partition_columns = infer_partition_columns(
+            &options,
+            session_state,
+            &table_paths[0],
+            &cmd.table_partition_cols,
+        )
+        .await?;
+
+        let infer_schemas = table_paths.into_iter().map(|listing_url| {
+            infer_schema(
+                &options,
+                &session_state,
+                listing_url,
+                &partition_columns,
+                &cmd.order_exprs,
+                &cmd.schema,
+                &cmd.file_type,
             )
-        } else {
-            let schema = Arc::clone(cmd.schema.inner());
-            let table_partition_cols = cmd
-                .table_partition_cols
-                .iter()
-                .map(|col| {
-                    schema
-                        .field_with_name(col)
-                        .map_err(|e| arrow_datafusion_err!(e))
-                })
-                .collect::<Result<Vec<_>>>()?
-                .into_iter()
-                .map(|f| (f.name().to_owned(), f.data_type().to_owned()))
-                .collect();
-            // exclude partition columns to support creating partitioned 
external table
-            // with a specified column definition like
-            // `create external table a(c0 int, c1 int) stored as csv 
partitioned by (c1)...`
-            let mut project_idx = Vec::new();
-            for i in 0..schema.fields().len() {
-                if !cmd.table_partition_cols.contains(schema.field(i).name()) {
-                    project_idx.push(i);
-                }
-            }
-            let schema = Arc::new(schema.project(&project_idx)?);
-            (Some(schema), table_partition_cols)
-        };
-
-        options = options.with_table_partition_cols(table_partition_cols);
-
-        options
-            .validate_partitions(session_state, &table_path)
-            .await?;
-
-        let resolved_schema = match provided_schema {
-            // We will need to check the table columns against the schema
-            // this is done so that we can do an ORDER BY for external table 
creation
-            // specifically for parquet file format.
-            // See: https://github.com/apache/datafusion/issues/7317
-            None => {
-                // if the folder then rewrite a file path as 'path/*.parquet'
-                // to only read the files the reader can understand
-                if table_path.is_folder() && table_path.get_glob().is_none() {
-                    // Since there are no files yet to infer an actual 
extension,
-                    // derive the pattern based on compression type.
-                    // So for gzipped CSV the pattern is `*.csv.gz`
-                    let glob = match options.format.compression_type() {
-                        Some(compression) => {
-                            match 
options.format.get_ext_with_compression(&compression) {
-                                // Use glob based on `FileFormat` extension
-                                Ok(ext) => format!("*.{ext}"),
-                                // Fallback to `file_type`, if not supported 
by `FileFormat`
-                                Err(_) => format!("*.{}", 
cmd.file_type.to_lowercase()),
-                            }
-                        }
-                        None => format!("*.{}", cmd.file_type.to_lowercase()),
-                    };
-                    table_path = table_path.with_glob(glob.as_ref())?;
-                }
-                let schema = options.infer_schema(session_state, 
&table_path).await?;
-                let df_schema = Arc::clone(&schema).to_dfschema()?;
-                let column_refs: HashSet<_> = cmd
-                    .order_exprs
-                    .iter()
-                    .flat_map(|sort| sort.iter())
-                    .flat_map(|s| s.expr.column_refs())
-                    .collect();
-
-                for column in &column_refs {
-                    if !df_schema.has_column(column) {
-                        return plan_err!("Column {column} is not in schema");
-                    }
-                }
-
-                schema
-            }
-            Some(s) => s,
-        };
-        let config = ListingTableConfig::new(table_path)
+        });
+        let results = join_all(infer_schemas).await;
+
+        let mut merged_schema = DFSchema::empty();
+        let mut listing_urls = Vec::new();
+        for result in results {
+            let (resolved_table_path, resolved_schema) = result?;
+            listing_urls.push(resolved_table_path);
+            merged_schema.merge(&resolved_schema.to_dfschema()?);
+        }
+
+        options = options.with_table_partition_cols(partition_columns);
+        let config = ListingTableConfig::new_with_multi_paths(listing_urls)
             
.with_listing_options(options.with_file_sort_order(cmd.order_exprs.clone()))
-            .with_schema(resolved_schema);
+            .with_schema(merged_schema.inner().to_owned());
+
         let provider = ListingTable::try_new(config)?
             
.with_cache(state.runtime_env().cache_manager.get_file_statistic_cache());
+
         let table = provider
             .with_definition(cmd.definition.clone())
             .with_constraints(cmd.constraints.clone())
             .with_column_defaults(cmd.column_defaults.clone());
+
         Ok(Arc::new(table))
     }
 }
 
+async fn infer_schema(
+    options: &ListingOptions,
+    session_state: &SessionState,
+    mut table_path: ListingTableUrl,
+    partition_cols: &Vec<(String, DataType)>,
+    order_exprs: &Vec<Vec<Sort>>,
+    schema: &DFSchema,
+    file_type: &String,
+) -> Result<(ListingTableUrl, SchemaRef), DataFusionError> {
+    let provided_schema = if schema.fields().len() == 0 {
+        None
+    } else {
+        let schema = Arc::clone(schema.inner());
+        let partitions_cols_set: HashSet<&String> =
+            HashSet::from_iter(partition_cols.iter().map(|(k, _)| k));
+        // exclude partition columns to support creating a partitioned 
external table
+        // with a specified column definition like
+        // `create external table a(c0 int, c1 int) stored as csv partitioned 
by (c1)...`
+        let mut project_idx = Vec::new();
+        for i in 0..schema.fields().len() {
+            if !partitions_cols_set.contains(schema.field(i).name()) {
+                project_idx.push(i);
+            }
+        }
+        let schema = Arc::new(schema.project(&project_idx)?);
+        Some(schema)
+    };
+
+    options
+        .validate_partitions(session_state, &table_path)
+        .await?;
+
+    let resolved_schema = match provided_schema {
+        // We will need to check the table columns against the schema
+        // this is done so that we can do an ORDER BY for external table 
creation
+        // specifically for parquet file format.
+        // See: https://github.com/apache/datafusion/issues/7317
+        None => {
+            // if the folder then rewrite a file path as 'path/*.parquet'
+            // to only read the files the reader can understand
+            if table_path.is_folder() && table_path.get_glob().is_none() {
+                // Since there are no files yet to infer an actual extension,
+                // derive the pattern based on compression type.
+                // So for gzipped CSV the pattern is `*.csv.gz`
+                let glob = match options.format.compression_type() {
+                    Some(compression) => {
+                        match 
options.format.get_ext_with_compression(&compression) {
+                            // Use glob based on `FileFormat` extension
+                            Ok(ext) => format!("*.{ext}"),
+                            // Fallback to `file_type`, if not supported by 
`FileFormat`
+                            Err(_) => format!("*.{}", 
file_type.to_lowercase()),
+                        }
+                    }
+                    None => format!("*.{}", file_type.to_lowercase()),
+                };
+                table_path = table_path.with_glob(&glob)?;
+            }
+            let schema = options.infer_schema(session_state, 
&table_path).await?;
+            let df_schema = Arc::clone(&schema).to_dfschema()?;
+            let column_refs: HashSet<_> = order_exprs
+                .iter()
+                .flat_map(|sort| sort.iter())
+                .flat_map(|s| s.expr.column_refs())
+                .collect();
+
+            for column in &column_refs {
+                if !df_schema.has_column(column) {
+                    return plan_err!("Column {column} is not in schema");
+                }
+            }
+
+            schema
+        }
+        Some(s) => s,
+    };
+    Ok((table_path, resolved_schema))
+}
+
+async fn infer_partition_columns(
+    options: &ListingOptions,
+    session_state: &SessionState,
+    table_path: &ListingTableUrl,
+    provided_cols: &Vec<String>,
+) -> Result<Vec<(String, DataType)>, DataFusionError> {
+    let infer_parts = session_state
+        .config_options()
+        .execution
+        .listing_table_factory_infer_partitions;
+    let part_cols = if provided_cols.is_empty() && infer_parts {
+        options
+            .infer_partitions(session_state, &table_path)
+            .await?
+            .into_iter()
+    } else {
+        provided_cols.clone().into_iter()
+    };
+
+    Ok(part_cols
+        .map(|p| {
+            (
+                p,
+                DataType::Dictionary(
+                    Box::new(DataType::UInt16),
+                    Box::new(DataType::Utf8),
+                ),
+            )
+        })
+        .collect::<Vec<_>>())

Review Comment:
   The old code checked partition columns against the provided schema and used 
the schema's types. This always returns Dictionary(UInt16, Utf8) regardless of 
what the user specified.
   
   Also means typos in PARTITIONED BY silently succeed now instead of erroring.



##########
datafusion/core/src/datasource/listing_table_factory.rs:
##########
@@ -63,137 +65,190 @@ impl TableProviderFactory for ListingTableFactory {
             ))?
             .create(session_state, &cmd.options)?;
 
-        let mut table_path = ListingTableUrl::parse(&cmd.location)?;
-        let file_extension = match table_path.is_collection() {
-            // Setting the extension to be empty instead of allowing the 
default extension seems
-            // odd, but was done to ensure existing behavior isn't modified. 
It seems like this
-            // could be refactored to either use the default extension or set 
the fully expected
-            // extension when compression is included (e.g. ".csv.gz")
-            true => "",
-            false => &get_extension(cmd.location.as_str()),
+        let file_extension = match cmd.locations.len() {
+            1 => {
+                let table_path = ListingTableUrl::parse(&cmd.locations[0])?;
+                match table_path.is_collection() {
+                    // Setting the extension to be empty instead of allowing 
the default extension seems
+                    // odd, but was done to ensure existing behavior isn't 
modified. It seems like this
+                    // could be refactored to either use the default extension 
or set the fully expected
+                    // extension when compression is included (e.g. ".csv.gz").
+                    // We do the same if there are multiple locations provided 
for the table.
+                    true => "",
+                    false => &get_extension(cmd.locations[0].as_str()),
+                }
+            }
+            _ => "",
         };
+
         let mut options = ListingOptions::new(file_format)
             .with_session_config_options(session_state.config())
             .with_file_extension(file_extension);
 
-        let (provided_schema, table_partition_cols) = if 
cmd.schema.fields().is_empty() {
-            let infer_parts = session_state
-                .config_options()
-                .execution
-                .listing_table_factory_infer_partitions;
-            let part_cols = if cmd.table_partition_cols.is_empty() && 
infer_parts {
-                options
-                    .infer_partitions(session_state, &table_path)
-                    .await?
-                    .into_iter()
-            } else {
-                cmd.table_partition_cols.clone().into_iter()
-            };
-
-            (
-                None,
-                part_cols
-                    .map(|p| {
-                        (
-                            p,
-                            DataType::Dictionary(
-                                Box::new(DataType::UInt16),
-                                Box::new(DataType::Utf8),
-                            ),
-                        )
-                    })
-                    .collect::<Vec<_>>(),
+        let table_paths: Vec<ListingTableUrl> = cmd
+            .locations
+            .iter()
+            .map(|loc| ListingTableUrl::parse(loc))
+            .collect::<Result<Vec<_>>>()?;
+
+        // We use the first location to infer the partition columns,
+        // primarily for performance and simplicity reasons.
+        let partition_columns = infer_partition_columns(
+            &options,
+            session_state,
+            &table_paths[0],
+            &cmd.table_partition_cols,
+        )
+        .await?;
+
+        let infer_schemas = table_paths.into_iter().map(|listing_url| {
+            infer_schema(
+                &options,
+                &session_state,
+                listing_url,
+                &partition_columns,
+                &cmd.order_exprs,
+                &cmd.schema,
+                &cmd.file_type,
             )
-        } else {
-            let schema = Arc::clone(cmd.schema.inner());
-            let table_partition_cols = cmd
-                .table_partition_cols
-                .iter()
-                .map(|col| {
-                    schema
-                        .field_with_name(col)
-                        .map_err(|e| arrow_datafusion_err!(e))
-                })
-                .collect::<Result<Vec<_>>>()?
-                .into_iter()
-                .map(|f| (f.name().to_owned(), f.data_type().to_owned()))
-                .collect();
-            // exclude partition columns to support creating partitioned 
external table
-            // with a specified column definition like
-            // `create external table a(c0 int, c1 int) stored as csv 
partitioned by (c1)...`
-            let mut project_idx = Vec::new();
-            for i in 0..schema.fields().len() {
-                if !cmd.table_partition_cols.contains(schema.field(i).name()) {
-                    project_idx.push(i);
-                }
-            }
-            let schema = Arc::new(schema.project(&project_idx)?);
-            (Some(schema), table_partition_cols)
-        };
-
-        options = options.with_table_partition_cols(table_partition_cols);
-
-        options
-            .validate_partitions(session_state, &table_path)
-            .await?;
-
-        let resolved_schema = match provided_schema {
-            // We will need to check the table columns against the schema
-            // this is done so that we can do an ORDER BY for external table 
creation
-            // specifically for parquet file format.
-            // See: https://github.com/apache/datafusion/issues/7317
-            None => {
-                // if the folder then rewrite a file path as 'path/*.parquet'
-                // to only read the files the reader can understand
-                if table_path.is_folder() && table_path.get_glob().is_none() {
-                    // Since there are no files yet to infer an actual 
extension,
-                    // derive the pattern based on compression type.
-                    // So for gzipped CSV the pattern is `*.csv.gz`
-                    let glob = match options.format.compression_type() {
-                        Some(compression) => {
-                            match 
options.format.get_ext_with_compression(&compression) {
-                                // Use glob based on `FileFormat` extension
-                                Ok(ext) => format!("*.{ext}"),
-                                // Fallback to `file_type`, if not supported 
by `FileFormat`
-                                Err(_) => format!("*.{}", 
cmd.file_type.to_lowercase()),
-                            }
-                        }
-                        None => format!("*.{}", cmd.file_type.to_lowercase()),
-                    };
-                    table_path = table_path.with_glob(glob.as_ref())?;
-                }
-                let schema = options.infer_schema(session_state, 
&table_path).await?;
-                let df_schema = Arc::clone(&schema).to_dfschema()?;
-                let column_refs: HashSet<_> = cmd
-                    .order_exprs
-                    .iter()
-                    .flat_map(|sort| sort.iter())
-                    .flat_map(|s| s.expr.column_refs())
-                    .collect();
-
-                for column in &column_refs {
-                    if !df_schema.has_column(column) {
-                        return plan_err!("Column {column} is not in schema");
-                    }
-                }
-
-                schema
-            }
-            Some(s) => s,
-        };
-        let config = ListingTableConfig::new(table_path)
+        });
+        let results = join_all(infer_schemas).await;
+
+        let mut merged_schema = DFSchema::empty();
+        let mut listing_urls = Vec::new();
+        for result in results {
+            let (resolved_table_path, resolved_schema) = result?;
+            listing_urls.push(resolved_table_path);
+            merged_schema.merge(&resolved_schema.to_dfschema()?);
+        }
+
+        options = options.with_table_partition_cols(partition_columns);
+        let config = ListingTableConfig::new_with_multi_paths(listing_urls)
             
.with_listing_options(options.with_file_sort_order(cmd.order_exprs.clone()))
-            .with_schema(resolved_schema);
+            .with_schema(merged_schema.inner().to_owned());
+
         let provider = ListingTable::try_new(config)?
             
.with_cache(state.runtime_env().cache_manager.get_file_statistic_cache());
+
         let table = provider
             .with_definition(cmd.definition.clone())
             .with_constraints(cmd.constraints.clone())
             .with_column_defaults(cmd.column_defaults.clone());
+
         Ok(Arc::new(table))
     }
 }
 
+async fn infer_schema(
+    options: &ListingOptions,
+    session_state: &SessionState,
+    mut table_path: ListingTableUrl,
+    partition_cols: &Vec<(String, DataType)>,
+    order_exprs: &Vec<Vec<Sort>>,
+    schema: &DFSchema,
+    file_type: &String,
+) -> Result<(ListingTableUrl, SchemaRef), DataFusionError> {
+    let provided_schema = if schema.fields().len() == 0 {
+        None
+    } else {
+        let schema = Arc::clone(schema.inner());
+        let partitions_cols_set: HashSet<&String> =
+            HashSet::from_iter(partition_cols.iter().map(|(k, _)| k));
+        // exclude partition columns to support creating a partitioned 
external table
+        // with a specified column definition like
+        // `create external table a(c0 int, c1 int) stored as csv partitioned 
by (c1)...`
+        let mut project_idx = Vec::new();
+        for i in 0..schema.fields().len() {
+            if !partitions_cols_set.contains(schema.field(i).name()) {
+                project_idx.push(i);
+            }
+        }
+        let schema = Arc::new(schema.project(&project_idx)?);
+        Some(schema)
+    };
+
+    options
+        .validate_partitions(session_state, &table_path)
+        .await?;

Review Comment:
   This runs before partition cols are set on options (that happens at line 
125, after all infer_schema calls complete). So validate_partitions always sees 
an empty partition list and never catches mismatches.
   
   The old code set partition cols first, then validated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to