ethan-tyler commented on code in PR #17702:
URL: https://github.com/apache/datafusion/pull/17702#discussion_r2608310624
##########
datafusion/core/src/datasource/listing_table_factory.rs:
##########
@@ -519,4 +574,55 @@ mod tests {
let listing_options = listing_table.options();
assert!(listing_options.table_partition_cols.is_empty());
}
+
+ #[tokio::test]
+ async fn test_create_using_multiple_locations() {
+ let dir = tempfile::tempdir().unwrap();
+ let mut path = PathBuf::from(dir.path());
+ path.push("folder-1");
+ path.push("folder-2");
+ fs::create_dir_all(&path).unwrap();
Review Comment:
Think there's a bug here - this creates <tmp>/folder-1/folder-2 (nested) but
then tries to register <tmp>/folder-1 and <tmp>/folder-2 as separate paths.
folder-2 doesn't exist as a sibling.
Should probably be:
```rust
fs::create_dir_all(dir.path().join("folder-1")).unwrap();
fs::create_dir_all(dir.path().join("folder-2")).unwrap();
```
Neither folder has any files so schema inference has nothing to work with.
##########
datafusion/core/src/datasource/listing_table_factory.rs:
##########
@@ -63,137 +65,190 @@ impl TableProviderFactory for ListingTableFactory {
))?
.create(session_state, &cmd.options)?;
- let mut table_path = ListingTableUrl::parse(&cmd.location)?;
- let file_extension = match table_path.is_collection() {
- // Setting the extension to be empty instead of allowing the
default extension seems
- // odd, but was done to ensure existing behavior isn't modified.
It seems like this
- // could be refactored to either use the default extension or set
the fully expected
- // extension when compression is included (e.g. ".csv.gz")
- true => "",
- false => &get_extension(cmd.location.as_str()),
+ let file_extension = match cmd.locations.len() {
+ 1 => {
+ let table_path = ListingTableUrl::parse(&cmd.locations[0])?;
+ match table_path.is_collection() {
+ // Setting the extension to be empty instead of allowing
the default extension seems
+ // odd, but was done to ensure existing behavior isn't
modified. It seems like this
+ // could be refactored to either use the default extension
or set the fully expected
+ // extension when compression is included (e.g. ".csv.gz").
+ // We do the same if there are multiple locations provided
for the table.
+ true => "",
+ false => &get_extension(cmd.locations[0].as_str()),
+ }
+ }
+ _ => "",
};
+
let mut options = ListingOptions::new(file_format)
.with_session_config_options(session_state.config())
.with_file_extension(file_extension);
- let (provided_schema, table_partition_cols) = if
cmd.schema.fields().is_empty() {
- let infer_parts = session_state
- .config_options()
- .execution
- .listing_table_factory_infer_partitions;
- let part_cols = if cmd.table_partition_cols.is_empty() &&
infer_parts {
- options
- .infer_partitions(session_state, &table_path)
- .await?
- .into_iter()
- } else {
- cmd.table_partition_cols.clone().into_iter()
- };
-
- (
- None,
- part_cols
- .map(|p| {
- (
- p,
- DataType::Dictionary(
- Box::new(DataType::UInt16),
- Box::new(DataType::Utf8),
- ),
- )
- })
- .collect::<Vec<_>>(),
+ let table_paths: Vec<ListingTableUrl> = cmd
+ .locations
+ .iter()
+ .map(|loc| ListingTableUrl::parse(loc))
+ .collect::<Result<Vec<_>>>()?;
+
+ // We use the first location to infer the partition columns,
+ // primarily for performance and simplicity reasons.
+ let partition_columns = infer_partition_columns(
+ &options,
+ session_state,
+ &table_paths[0],
+ &cmd.table_partition_cols,
+ )
+ .await?;
+
+ let infer_schemas = table_paths.into_iter().map(|listing_url| {
+ infer_schema(
+ &options,
+ &session_state,
+ listing_url,
+ &partition_columns,
+ &cmd.order_exprs,
+ &cmd.schema,
+ &cmd.file_type,
)
- } else {
- let schema = Arc::clone(cmd.schema.inner());
- let table_partition_cols = cmd
- .table_partition_cols
- .iter()
- .map(|col| {
- schema
- .field_with_name(col)
- .map_err(|e| arrow_datafusion_err!(e))
- })
- .collect::<Result<Vec<_>>>()?
- .into_iter()
- .map(|f| (f.name().to_owned(), f.data_type().to_owned()))
- .collect();
- // exclude partition columns to support creating partitioned
external table
- // with a specified column definition like
- // `create external table a(c0 int, c1 int) stored as csv
partitioned by (c1)...`
- let mut project_idx = Vec::new();
- for i in 0..schema.fields().len() {
- if !cmd.table_partition_cols.contains(schema.field(i).name()) {
- project_idx.push(i);
- }
- }
- let schema = Arc::new(schema.project(&project_idx)?);
- (Some(schema), table_partition_cols)
- };
-
- options = options.with_table_partition_cols(table_partition_cols);
-
- options
- .validate_partitions(session_state, &table_path)
- .await?;
-
- let resolved_schema = match provided_schema {
- // We will need to check the table columns against the schema
- // this is done so that we can do an ORDER BY for external table
creation
- // specifically for parquet file format.
- // See: https://github.com/apache/datafusion/issues/7317
- None => {
- // if the folder then rewrite a file path as 'path/*.parquet'
- // to only read the files the reader can understand
- if table_path.is_folder() && table_path.get_glob().is_none() {
- // Since there are no files yet to infer an actual
extension,
- // derive the pattern based on compression type.
- // So for gzipped CSV the pattern is `*.csv.gz`
- let glob = match options.format.compression_type() {
- Some(compression) => {
- match
options.format.get_ext_with_compression(&compression) {
- // Use glob based on `FileFormat` extension
- Ok(ext) => format!("*.{ext}"),
- // Fallback to `file_type`, if not supported
by `FileFormat`
- Err(_) => format!("*.{}",
cmd.file_type.to_lowercase()),
- }
- }
- None => format!("*.{}", cmd.file_type.to_lowercase()),
- };
- table_path = table_path.with_glob(glob.as_ref())?;
- }
- let schema = options.infer_schema(session_state,
&table_path).await?;
- let df_schema = Arc::clone(&schema).to_dfschema()?;
- let column_refs: HashSet<_> = cmd
- .order_exprs
- .iter()
- .flat_map(|sort| sort.iter())
- .flat_map(|s| s.expr.column_refs())
- .collect();
-
- for column in &column_refs {
- if !df_schema.has_column(column) {
- return plan_err!("Column {column} is not in schema");
- }
- }
-
- schema
- }
- Some(s) => s,
- };
- let config = ListingTableConfig::new(table_path)
+ });
+ let results = join_all(infer_schemas).await;
+
+ let mut merged_schema = DFSchema::empty();
+ let mut listing_urls = Vec::new();
+ for result in results {
+ let (resolved_table_path, resolved_schema) = result?;
+ listing_urls.push(resolved_table_path);
+ merged_schema.merge(&resolved_schema.to_dfschema()?);
+ }
+
+ options = options.with_table_partition_cols(partition_columns);
+ let config = ListingTableConfig::new_with_multi_paths(listing_urls)
.with_listing_options(options.with_file_sort_order(cmd.order_exprs.clone()))
- .with_schema(resolved_schema);
+ .with_schema(merged_schema.inner().to_owned());
+
let provider = ListingTable::try_new(config)?
.with_cache(state.runtime_env().cache_manager.get_file_statistic_cache());
+
let table = provider
.with_definition(cmd.definition.clone())
.with_constraints(cmd.constraints.clone())
.with_column_defaults(cmd.column_defaults.clone());
+
Ok(Arc::new(table))
}
}
+async fn infer_schema(
+ options: &ListingOptions,
+ session_state: &SessionState,
+ mut table_path: ListingTableUrl,
+ partition_cols: &Vec<(String, DataType)>,
+ order_exprs: &Vec<Vec<Sort>>,
+ schema: &DFSchema,
+ file_type: &String,
+) -> Result<(ListingTableUrl, SchemaRef), DataFusionError> {
+ let provided_schema = if schema.fields().len() == 0 {
+ None
+ } else {
+ let schema = Arc::clone(schema.inner());
+ let partitions_cols_set: HashSet<&String> =
+ HashSet::from_iter(partition_cols.iter().map(|(k, _)| k));
+ // exclude partition columns to support creating a partitioned
external table
+ // with a specified column definition like
+ // `create external table a(c0 int, c1 int) stored as csv partitioned
by (c1)...`
+ let mut project_idx = Vec::new();
+ for i in 0..schema.fields().len() {
+ if !partitions_cols_set.contains(schema.field(i).name()) {
+ project_idx.push(i);
+ }
+ }
+ let schema = Arc::new(schema.project(&project_idx)?);
+ Some(schema)
+ };
+
+ options
+ .validate_partitions(session_state, &table_path)
+ .await?;
+
+ let resolved_schema = match provided_schema {
+ // We will need to check the table columns against the schema
+ // this is done so that we can do an ORDER BY for external table
creation
+ // specifically for parquet file format.
+ // See: https://github.com/apache/datafusion/issues/7317
+ None => {
+ // if the folder then rewrite a file path as 'path/*.parquet'
+ // to only read the files the reader can understand
+ if table_path.is_folder() && table_path.get_glob().is_none() {
+ // Since there are no files yet to infer an actual extension,
+ // derive the pattern based on compression type.
+ // So for gzipped CSV the pattern is `*.csv.gz`
+ let glob = match options.format.compression_type() {
+ Some(compression) => {
+ match
options.format.get_ext_with_compression(&compression) {
+ // Use glob based on `FileFormat` extension
+ Ok(ext) => format!("*.{ext}"),
+ // Fallback to `file_type`, if not supported by
`FileFormat`
+ Err(_) => format!("*.{}",
file_type.to_lowercase()),
+ }
+ }
+ None => format!("*.{}", file_type.to_lowercase()),
+ };
+ table_path = table_path.with_glob(&glob)?;
+ }
+ let schema = options.infer_schema(session_state,
&table_path).await?;
+ let df_schema = Arc::clone(&schema).to_dfschema()?;
+ let column_refs: HashSet<_> = order_exprs
+ .iter()
+ .flat_map(|sort| sort.iter())
+ .flat_map(|s| s.expr.column_refs())
+ .collect();
+
+ for column in &column_refs {
+ if !df_schema.has_column(column) {
+ return plan_err!("Column {column} is not in schema");
+ }
+ }
+
+ schema
+ }
+ Some(s) => s,
+ };
+ Ok((table_path, resolved_schema))
+}
+
+async fn infer_partition_columns(
+ options: &ListingOptions,
+ session_state: &SessionState,
+ table_path: &ListingTableUrl,
+ provided_cols: &Vec<String>,
+) -> Result<Vec<(String, DataType)>, DataFusionError> {
+ let infer_parts = session_state
+ .config_options()
+ .execution
+ .listing_table_factory_infer_partitions;
+ let part_cols = if provided_cols.is_empty() && infer_parts {
+ options
+ .infer_partitions(session_state, &table_path)
+ .await?
+ .into_iter()
+ } else {
+ provided_cols.clone().into_iter()
+ };
+
+ Ok(part_cols
+ .map(|p| {
+ (
+ p,
+ DataType::Dictionary(
+ Box::new(DataType::UInt16),
+ Box::new(DataType::Utf8),
+ ),
+ )
+ })
+ .collect::<Vec<_>>())
Review Comment:
The old code checked partition columns against the provided schema and used
the schema's types. This always returns Dictionary(UInt16, Utf8) regardless of
what the user specified.
Also means typos in PARTITIONED BY silently succeed now instead of erroring.
##########
datafusion/core/src/datasource/listing_table_factory.rs:
##########
@@ -63,137 +65,190 @@ impl TableProviderFactory for ListingTableFactory {
))?
.create(session_state, &cmd.options)?;
- let mut table_path = ListingTableUrl::parse(&cmd.location)?;
- let file_extension = match table_path.is_collection() {
- // Setting the extension to be empty instead of allowing the
default extension seems
- // odd, but was done to ensure existing behavior isn't modified.
It seems like this
- // could be refactored to either use the default extension or set
the fully expected
- // extension when compression is included (e.g. ".csv.gz")
- true => "",
- false => &get_extension(cmd.location.as_str()),
+ let file_extension = match cmd.locations.len() {
+ 1 => {
+ let table_path = ListingTableUrl::parse(&cmd.locations[0])?;
+ match table_path.is_collection() {
+ // Setting the extension to be empty instead of allowing
the default extension seems
+ // odd, but was done to ensure existing behavior isn't
modified. It seems like this
+ // could be refactored to either use the default extension
or set the fully expected
+ // extension when compression is included (e.g. ".csv.gz").
+ // We do the same if there are multiple locations provided
for the table.
+ true => "",
+ false => &get_extension(cmd.locations[0].as_str()),
+ }
+ }
+ _ => "",
};
+
let mut options = ListingOptions::new(file_format)
.with_session_config_options(session_state.config())
.with_file_extension(file_extension);
- let (provided_schema, table_partition_cols) = if
cmd.schema.fields().is_empty() {
- let infer_parts = session_state
- .config_options()
- .execution
- .listing_table_factory_infer_partitions;
- let part_cols = if cmd.table_partition_cols.is_empty() &&
infer_parts {
- options
- .infer_partitions(session_state, &table_path)
- .await?
- .into_iter()
- } else {
- cmd.table_partition_cols.clone().into_iter()
- };
-
- (
- None,
- part_cols
- .map(|p| {
- (
- p,
- DataType::Dictionary(
- Box::new(DataType::UInt16),
- Box::new(DataType::Utf8),
- ),
- )
- })
- .collect::<Vec<_>>(),
+ let table_paths: Vec<ListingTableUrl> = cmd
+ .locations
+ .iter()
+ .map(|loc| ListingTableUrl::parse(loc))
+ .collect::<Result<Vec<_>>>()?;
+
+ // We use the first location to infer the partition columns,
+ // primarily for performance and simplicity reasons.
+ let partition_columns = infer_partition_columns(
+ &options,
+ session_state,
+ &table_paths[0],
+ &cmd.table_partition_cols,
+ )
+ .await?;
+
+ let infer_schemas = table_paths.into_iter().map(|listing_url| {
+ infer_schema(
+ &options,
+ &session_state,
+ listing_url,
+ &partition_columns,
+ &cmd.order_exprs,
+ &cmd.schema,
+ &cmd.file_type,
)
- } else {
- let schema = Arc::clone(cmd.schema.inner());
- let table_partition_cols = cmd
- .table_partition_cols
- .iter()
- .map(|col| {
- schema
- .field_with_name(col)
- .map_err(|e| arrow_datafusion_err!(e))
- })
- .collect::<Result<Vec<_>>>()?
- .into_iter()
- .map(|f| (f.name().to_owned(), f.data_type().to_owned()))
- .collect();
- // exclude partition columns to support creating partitioned
external table
- // with a specified column definition like
- // `create external table a(c0 int, c1 int) stored as csv
partitioned by (c1)...`
- let mut project_idx = Vec::new();
- for i in 0..schema.fields().len() {
- if !cmd.table_partition_cols.contains(schema.field(i).name()) {
- project_idx.push(i);
- }
- }
- let schema = Arc::new(schema.project(&project_idx)?);
- (Some(schema), table_partition_cols)
- };
-
- options = options.with_table_partition_cols(table_partition_cols);
-
- options
- .validate_partitions(session_state, &table_path)
- .await?;
-
- let resolved_schema = match provided_schema {
- // We will need to check the table columns against the schema
- // this is done so that we can do an ORDER BY for external table
creation
- // specifically for parquet file format.
- // See: https://github.com/apache/datafusion/issues/7317
- None => {
- // if the folder then rewrite a file path as 'path/*.parquet'
- // to only read the files the reader can understand
- if table_path.is_folder() && table_path.get_glob().is_none() {
- // Since there are no files yet to infer an actual
extension,
- // derive the pattern based on compression type.
- // So for gzipped CSV the pattern is `*.csv.gz`
- let glob = match options.format.compression_type() {
- Some(compression) => {
- match
options.format.get_ext_with_compression(&compression) {
- // Use glob based on `FileFormat` extension
- Ok(ext) => format!("*.{ext}"),
- // Fallback to `file_type`, if not supported
by `FileFormat`
- Err(_) => format!("*.{}",
cmd.file_type.to_lowercase()),
- }
- }
- None => format!("*.{}", cmd.file_type.to_lowercase()),
- };
- table_path = table_path.with_glob(glob.as_ref())?;
- }
- let schema = options.infer_schema(session_state,
&table_path).await?;
- let df_schema = Arc::clone(&schema).to_dfschema()?;
- let column_refs: HashSet<_> = cmd
- .order_exprs
- .iter()
- .flat_map(|sort| sort.iter())
- .flat_map(|s| s.expr.column_refs())
- .collect();
-
- for column in &column_refs {
- if !df_schema.has_column(column) {
- return plan_err!("Column {column} is not in schema");
- }
- }
-
- schema
- }
- Some(s) => s,
- };
- let config = ListingTableConfig::new(table_path)
+ });
+ let results = join_all(infer_schemas).await;
+
+ let mut merged_schema = DFSchema::empty();
+ let mut listing_urls = Vec::new();
+ for result in results {
+ let (resolved_table_path, resolved_schema) = result?;
+ listing_urls.push(resolved_table_path);
+ merged_schema.merge(&resolved_schema.to_dfschema()?);
+ }
+
+ options = options.with_table_partition_cols(partition_columns);
+ let config = ListingTableConfig::new_with_multi_paths(listing_urls)
.with_listing_options(options.with_file_sort_order(cmd.order_exprs.clone()))
- .with_schema(resolved_schema);
+ .with_schema(merged_schema.inner().to_owned());
+
let provider = ListingTable::try_new(config)?
.with_cache(state.runtime_env().cache_manager.get_file_statistic_cache());
+
let table = provider
.with_definition(cmd.definition.clone())
.with_constraints(cmd.constraints.clone())
.with_column_defaults(cmd.column_defaults.clone());
+
Ok(Arc::new(table))
}
}
+async fn infer_schema(
+ options: &ListingOptions,
+ session_state: &SessionState,
+ mut table_path: ListingTableUrl,
+ partition_cols: &Vec<(String, DataType)>,
+ order_exprs: &Vec<Vec<Sort>>,
+ schema: &DFSchema,
+ file_type: &String,
+) -> Result<(ListingTableUrl, SchemaRef), DataFusionError> {
+ let provided_schema = if schema.fields().len() == 0 {
+ None
+ } else {
+ let schema = Arc::clone(schema.inner());
+ let partitions_cols_set: HashSet<&String> =
+ HashSet::from_iter(partition_cols.iter().map(|(k, _)| k));
+ // exclude partition columns to support creating a partitioned
external table
+ // with a specified column definition like
+ // `create external table a(c0 int, c1 int) stored as csv partitioned
by (c1)...`
+ let mut project_idx = Vec::new();
+ for i in 0..schema.fields().len() {
+ if !partitions_cols_set.contains(schema.field(i).name()) {
+ project_idx.push(i);
+ }
+ }
+ let schema = Arc::new(schema.project(&project_idx)?);
+ Some(schema)
+ };
+
+ options
+ .validate_partitions(session_state, &table_path)
+ .await?;
Review Comment:
This runs before partition cols are set on options (that happens at line
125, after all infer_schema calls complete). So validate_partitions always sees
an empty partition list and never catches mismatches.
The old code set partition cols first, then validated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]