benbellick commented on code in PR #19729:
URL: https://github.com/apache/datafusion/pull/19729#discussion_r2854236105
##########
datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs:
##########
@@ -176,45 +176,97 @@ pub async fn from_read_rel(
}))
}
Some(ReadType::LocalFiles(lf)) => {
- fn extract_filename(name: &str) -> Option<String> {
- let corrected_url =
- if name.starts_with("file://") &&
!name.starts_with("file:///") {
- name.replacen("file://", "file:///", 1)
- } else {
- name.to_string()
- };
-
- Url::parse(&corrected_url).ok().and_then(|url| {
- let path = url.path();
- std::path::Path::new(path)
- .file_name()
- .map(|filename| filename.to_string_lossy().to_string())
+ /// Parses the URI string from a PathType variant.
+ /// Returns an error if the URI is malformed.
+ fn parse_uri(
+ path_type: Option<&PathType>,
+ ) -> datafusion::common::Result<Option<Url>> {
+ let path_str = match path_type {
+ Some(PathType::UriPath(p)) => p,
+ Some(PathType::UriPathGlob(p)) => p,
+ Some(PathType::UriFile(p)) => p,
+ Some(PathType::UriFolder(p)) => p,
+ None => return Ok(None),
+ };
+
+ Url::parse(path_str).map(Some).map_err(|e| {
+ datafusion::error::DataFusionError::Substrait(format!(
+ "Failed to parse URI '{path_str}': {e}"
+ ))
})
}
- // we could use the file name to check the original table provider
- // TODO: currently does not support multiple local files
- let filename: Option<String> =
- lf.items.first().and_then(|x| match x.path_type.as_ref() {
- Some(UriFile(name)) => extract_filename(name),
- _ => None,
- });
+ // Collect all file URIs from LocalFiles items
+ let uris: Vec<Url> = lf
+ .items
+ .iter()
+ .map(|item| parse_uri(item.path_type.as_ref()))
+ .collect::<datafusion::common::Result<Vec<_>>>()?
+ .into_iter()
+ .flatten()
+ .collect();
- if lf.items.len() > 1 || filename.is_none() {
- return not_impl_err!("Only single file reads are supported");
+ if uris.is_empty() {
+ return plan_err!("No valid file URIs found in LocalFiles");
}
- let name = filename.unwrap();
- // directly use unwrap here since we could determine it is a valid
one
- let table_reference = TableReference::Bare { table: name.into() };
- read_with_schema(
- consumer,
+ // Generate a table name from the first URI's path component
+ let table_name = std::path::Path::new(uris[0].path())
+ .file_name()
+ .map(|n| n.to_string_lossy().to_string())
+ .unwrap_or_else(|| "local_files".to_string());
+
+ let table_reference = TableReference::Bare {
+ table: table_name.clone().into(),
+ };
+
+ // Try to resolve files using the new trait method.
+ // If not implemented, fall back to the legacy single-file behavior
+ // for backward compatibility.
+ let provider = match consumer
Review Comment:
This is also a question for the people with more datafusion experience, but
is this the right approach? I would think that if the consumer fails to handle
multiple files, it is an execution failure. Otherwise, doesn't it risk
executing the plan incorrectly?
##########
datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs:
##########
@@ -176,45 +176,97 @@ pub async fn from_read_rel(
}))
}
Some(ReadType::LocalFiles(lf)) => {
- fn extract_filename(name: &str) -> Option<String> {
- let corrected_url =
- if name.starts_with("file://") &&
!name.starts_with("file:///") {
- name.replacen("file://", "file:///", 1)
- } else {
- name.to_string()
- };
-
- Url::parse(&corrected_url).ok().and_then(|url| {
- let path = url.path();
- std::path::Path::new(path)
- .file_name()
- .map(|filename| filename.to_string_lossy().to_string())
+ /// Parses the URI string from a PathType variant.
+ /// Returns an error if the URI is malformed.
+ fn parse_uri(
+ path_type: Option<&PathType>,
+ ) -> datafusion::common::Result<Option<Url>> {
+ let path_str = match path_type {
+ Some(PathType::UriPath(p)) => p,
+ Some(PathType::UriPathGlob(p)) => p,
+ Some(PathType::UriFile(p)) => p,
+ Some(PathType::UriFolder(p)) => p,
+ None => return Ok(None),
+ };
+
+ Url::parse(path_str).map(Some).map_err(|e| {
+ datafusion::error::DataFusionError::Substrait(format!(
+ "Failed to parse URI '{path_str}': {e}"
+ ))
})
}
- // we could use the file name to check the original table provider
- // TODO: currently does not support multiple local files
- let filename: Option<String> =
- lf.items.first().and_then(|x| match x.path_type.as_ref() {
- Some(UriFile(name)) => extract_filename(name),
- _ => None,
- });
+ // Collect all file URIs from LocalFiles items
+ let uris: Vec<Url> = lf
+ .items
+ .iter()
+ .map(|item| parse_uri(item.path_type.as_ref()))
+ .collect::<datafusion::common::Result<Vec<_>>>()?
+ .into_iter()
+ .flatten()
+ .collect();
- if lf.items.len() > 1 || filename.is_none() {
- return not_impl_err!("Only single file reads are supported");
+ if uris.is_empty() {
Review Comment:
One other small thing. [I don't think the spec technically enforces that an
empty `LocalFiles` is
invalid](https://github.com/substrait-io/substrait/blob/2705258137b7272830cf00680e6f3a36b8d3ed4b/proto/substrait/algebra.proto#L164-L166).
Since the schema will have to be part of the plan, it is also reasonable to
just consider it an empty table 🤷
I don't know what is consistent with the normal datafusion approach here.
##########
datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs:
##########
@@ -176,45 +176,97 @@ pub async fn from_read_rel(
}))
}
Some(ReadType::LocalFiles(lf)) => {
- fn extract_filename(name: &str) -> Option<String> {
- let corrected_url =
- if name.starts_with("file://") &&
!name.starts_with("file:///") {
- name.replacen("file://", "file:///", 1)
- } else {
- name.to_string()
- };
-
- Url::parse(&corrected_url).ok().and_then(|url| {
- let path = url.path();
- std::path::Path::new(path)
- .file_name()
- .map(|filename| filename.to_string_lossy().to_string())
+ /// Parses the URI string from a PathType variant.
+ /// Returns an error if the URI is malformed.
+ fn parse_uri(
Review Comment:
Just a note in case someone comes to this in the future and as the same
question.
The `Url` crate is quite permissive, so while the type is called `Url`, it
handles general URIs without a problem.
##########
datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs:
##########
@@ -176,45 +176,97 @@ pub async fn from_read_rel(
}))
}
Some(ReadType::LocalFiles(lf)) => {
- fn extract_filename(name: &str) -> Option<String> {
- let corrected_url =
- if name.starts_with("file://") &&
!name.starts_with("file:///") {
- name.replacen("file://", "file:///", 1)
- } else {
- name.to_string()
- };
-
- Url::parse(&corrected_url).ok().and_then(|url| {
- let path = url.path();
- std::path::Path::new(path)
- .file_name()
- .map(|filename| filename.to_string_lossy().to_string())
+ /// Parses the URI string from a PathType variant.
+ /// Returns an error if the URI is malformed.
+ fn parse_uri(
+ path_type: Option<&PathType>,
+ ) -> datafusion::common::Result<Option<Url>> {
+ let path_str = match path_type {
+ Some(PathType::UriPath(p)) => p,
+ Some(PathType::UriPathGlob(p)) => p,
+ Some(PathType::UriFile(p)) => p,
+ Some(PathType::UriFolder(p)) => p,
+ None => return Ok(None),
+ };
+
+ Url::parse(path_str).map(Some).map_err(|e| {
+ datafusion::error::DataFusionError::Substrait(format!(
+ "Failed to parse URI '{path_str}': {e}"
+ ))
})
}
- // we could use the file name to check the original table provider
- // TODO: currently does not support multiple local files
- let filename: Option<String> =
- lf.items.first().and_then(|x| match x.path_type.as_ref() {
- Some(UriFile(name)) => extract_filename(name),
- _ => None,
- });
+ // Collect all file URIs from LocalFiles items
+ let uris: Vec<Url> = lf
+ .items
+ .iter()
+ .map(|item| parse_uri(item.path_type.as_ref()))
+ .collect::<datafusion::common::Result<Vec<_>>>()?
+ .into_iter()
+ .flatten()
+ .collect();
- if lf.items.len() > 1 || filename.is_none() {
- return not_impl_err!("Only single file reads are supported");
+ if uris.is_empty() {
Review Comment:
In that way, I see no reason to not just let the implementer of
`resolve_local_files` handle this case. The only complexity then is that if you
don't have any `uri`s, then you can't make a table name.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]