This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 12c60ac feat: define Hudi error types across hudi-core (#124)
12c60ac is described below
commit 12c60ac0bc2481781aba0ba00f7389823f416d19
Author: JinYang <[email protected]>
AuthorDate: Thu Dec 5 09:20:03 2024 +0800
feat: define Hudi error types across hudi-core (#124)
Use `thiserror` instead of `anyhow` to define the error for hudi-core crate.
---------
Signed-off-by: GoHalo <[email protected]>
Co-authored-by: Shiyan Xu <[email protected]>
---
Cargo.toml | 2 +-
crates/core/Cargo.toml | 2 +-
crates/core/src/config/internal.rs | 16 ++++--
crates/core/src/config/mod.rs | 3 +-
crates/core/src/config/read.rs | 25 ++++++---
crates/core/src/config/table.rs | 67 +++++++++++++++++------
crates/core/src/config/utils.rs | 9 ++--
crates/core/src/file_group/mod.rs | 15 +++---
crates/core/src/file_group/reader.rs | 2 +-
crates/core/src/lib.rs | 46 ++++++++++++++++
crates/core/src/storage/mod.rs | 53 +++++++++---------
crates/core/src/storage/utils.rs | 36 +++++++++----
crates/core/src/table/builder.rs | 23 ++++----
crates/core/src/table/fs_view.rs | 5 +-
crates/core/src/table/mod.rs | 15 +++---
crates/core/src/table/partition.rs | 39 +++++++-------
crates/core/src/table/timeline.rs | 30 ++++++-----
python/Cargo.toml | 4 +-
python/src/internal.rs | 102 ++++++++++++++++++++++++-----------
19 files changed, 333 insertions(+), 161 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index b8dd722..971e9eb 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -61,7 +61,7 @@ serde = { version = "1.0.203", features = ["derive"] }
serde_json = { version = "1" }
# "stdlib"
-anyhow = { version = "1.0.86" }
+thiserror = { version = "2.0.3" }
bytes = { version = "1" }
paste = { version = "1.0.15" }
once_cell = { version = "1.19.0" }
diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index 90fae7d..d7c9d06 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -48,7 +48,7 @@ serde = { workspace = true }
serde_json = { workspace = true }
# "stdlib"
-anyhow = { workspace = true }
+thiserror = { workspace = true }
bytes = { workspace = true }
paste = { workspace = true }
strum = { workspace = true }
diff --git a/crates/core/src/config/internal.rs
b/crates/core/src/config/internal.rs
index 89fd314..4fdc276 100644
--- a/crates/core/src/config/internal.rs
+++ b/crates/core/src/config/internal.rs
@@ -21,10 +21,13 @@
use std::collections::HashMap;
use std::str::FromStr;
-use anyhow::{anyhow, Result};
use strum_macros::EnumIter;
-use crate::config::{ConfigParser, HudiConfigValue};
+use crate::{
+ config::{ConfigParser, HudiConfigValue},
+ CoreError::{ConfigNotFound, InvalidConfig},
+ Result,
+};
/// Configurations for internal use.
///
@@ -64,11 +67,16 @@ impl ConfigParser for HudiInternalConfig {
let get_result = configs
.get(self.as_ref())
.map(|v| v.as_str())
- .ok_or(anyhow!("Config '{}' not found", self.as_ref()));
+ .ok_or(ConfigNotFound(self.as_ref().to_string()));
match self {
Self::SkipConfigValidation => get_result
- .and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e)))
+ .and_then(|v| {
+ bool::from_str(v).map_err(|e| InvalidConfig {
+ item: Self::SkipConfigValidation.as_ref(),
+ source: Box::new(e),
+ })
+ })
.map(HudiConfigValue::Boolean),
}
}
diff --git a/crates/core/src/config/mod.rs b/crates/core/src/config/mod.rs
index 7b557b4..f586a89 100644
--- a/crates/core/src/config/mod.rs
+++ b/crates/core/src/config/mod.rs
@@ -21,8 +21,7 @@ use std::any::type_name;
use std::collections::HashMap;
use std::sync::Arc;
-use crate::storage::utils::parse_uri;
-use anyhow::Result;
+use crate::{storage::utils::parse_uri, Result};
use serde::{Deserialize, Serialize};
use url::Url;
diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs
index e67617d..a938953 100644
--- a/crates/core/src/config/read.rs
+++ b/crates/core/src/config/read.rs
@@ -21,10 +21,14 @@
use std::collections::HashMap;
use std::str::FromStr;
-use crate::config::{ConfigParser, HudiConfigValue};
-use anyhow::{anyhow, Result};
use strum_macros::EnumIter;
+use crate::{
+ config::{ConfigParser, HudiConfigValue},
+ CoreError::{ConfigNotFound, InvalidConfig},
+ Result,
+};
+
/// Configurations for reading Hudi tables.
///
/// **Example**
@@ -37,6 +41,7 @@ use strum_macros::EnumIter;
/// HudiTable::new_with_options("/tmp/hudi_data", options)
/// ```
///
+
#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)]
pub enum HudiReadConfig {
/// Define input splits
@@ -74,11 +79,16 @@ impl ConfigParser for HudiReadConfig {
let get_result = configs
.get(self.as_ref())
.map(|v| v.as_str())
- .ok_or(anyhow!("Config '{}' not found", self.as_ref()));
+ .ok_or(ConfigNotFound(self.as_ref().to_string()));
match self {
Self::InputPartitions => get_result
- .and_then(|v| usize::from_str(v).map_err(|e| anyhow!(e)))
+ .and_then(|v| {
+ usize::from_str(v).map_err(|e| InvalidConfig {
+ item: Self::InputPartitions.as_ref(),
+ source: Box::new(e),
+ })
+ })
.map(HudiConfigValue::UInteger),
Self::AsOfTimestamp => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
}
@@ -89,8 +99,8 @@ impl ConfigParser for HudiReadConfig {
mod tests {
use crate::config::read::HudiReadConfig::InputPartitions;
use crate::config::ConfigParser;
+ use crate::CoreError::InvalidConfig;
use std::collections::HashMap;
- use std::num::ParseIntError;
#[test]
fn parse_valid_config_value() {
@@ -103,7 +113,10 @@ mod tests {
fn parse_invalid_config_value() {
let options = HashMap::from([(InputPartitions.as_ref().to_string(),
"foo".to_string())]);
let value = InputPartitions.parse_value(&options);
- assert!(value.err().unwrap().is::<ParseIntError>());
+ assert!(matches!(
+ value.unwrap_err(),
+ InvalidConfig { item: _, source: _ }
+ ));
assert_eq!(
InputPartitions
.parse_value_or_default(&options)
diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs
index 98cca7d..bb19066 100644
--- a/crates/core/src/config/table.rs
+++ b/crates/core/src/config/table.rs
@@ -21,11 +21,13 @@
use std::collections::HashMap;
use std::str::FromStr;
-use anyhow::anyhow;
-use anyhow::Result;
use strum_macros::{AsRefStr, EnumIter};
-use crate::config::{ConfigParser, HudiConfigValue};
+use crate::{
+ config::{ConfigParser, HudiConfigValue},
+ CoreError::{self, ConfigNotFound, InvalidConfig, Unsupported},
+ Result,
+};
/// Configurations for Hudi tables, most of them are persisted in
`hoodie.properties`.
///
@@ -146,7 +148,7 @@ impl ConfigParser for HudiTableConfig {
let get_result = configs
.get(self.as_ref())
.map(|v| v.as_str())
- .ok_or(anyhow!("Config '{}' not found", self.as_ref()));
+ .ok_or(ConfigNotFound(self.as_ref().to_string()));
match self {
Self::BaseFileFormat => get_result
@@ -154,24 +156,49 @@ impl ConfigParser for HudiTableConfig {
.map(|v| HudiConfigValue::String(v.as_ref().to_string())),
Self::BasePath => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
Self::Checksum => get_result
- .and_then(|v| isize::from_str(v).map_err(|e| anyhow!(e)))
+ .and_then(|v| {
+ isize::from_str(v).map_err(|e| InvalidConfig {
+ item: Self::Checksum.as_ref(),
+ source: Box::new(e),
+ })
+ })
.map(HudiConfigValue::Integer),
Self::DatabaseName => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
Self::DropsPartitionFields => get_result
- .and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e)))
+ .and_then(|v| {
+ bool::from_str(v).map_err(|e| InvalidConfig {
+ item: Self::DropsPartitionFields.as_ref(),
+ source: Box::new(e),
+ })
+ })
.map(HudiConfigValue::Boolean),
Self::IsHiveStylePartitioning => get_result
- .and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e)))
+ .and_then(|v| {
+ bool::from_str(v).map_err(|e| InvalidConfig {
+ item: Self::IsHiveStylePartitioning.as_ref(),
+ source: Box::new(e),
+ })
+ })
.map(HudiConfigValue::Boolean),
Self::IsPartitionPathUrlencoded => get_result
- .and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e)))
+ .and_then(|v| {
+ bool::from_str(v).map_err(|e| InvalidConfig {
+ item: Self::IsPartitionPathUrlencoded.as_ref(),
+ source: Box::new(e),
+ })
+ })
.map(HudiConfigValue::Boolean),
Self::KeyGeneratorClass => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
Self::PartitionFields => get_result
.map(|v|
HudiConfigValue::List(v.split(',').map(str::to_string).collect())),
Self::PrecombineField => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
Self::PopulatesMetaFields => get_result
- .and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e)))
+ .and_then(|v| {
+ bool::from_str(v).map_err(|e| InvalidConfig {
+ item: Self::PopulatesMetaFields.as_ref(),
+ source: Box::new(e),
+ })
+ })
.map(HudiConfigValue::Boolean),
Self::RecordKeyFields => get_result
.map(|v|
HudiConfigValue::List(v.split(',').map(str::to_string).collect())),
@@ -180,10 +207,20 @@ impl ConfigParser for HudiTableConfig {
.and_then(TableTypeValue::from_str)
.map(|v| HudiConfigValue::String(v.as_ref().to_string())),
Self::TableVersion => get_result
- .and_then(|v| isize::from_str(v).map_err(|e| anyhow!(e)))
+ .and_then(|v| {
+ isize::from_str(v).map_err(|e| InvalidConfig {
+ item: Self::TableVersion.as_ref(),
+ source: Box::new(e),
+ })
+ })
.map(HudiConfigValue::Integer),
Self::TimelineLayoutVersion => get_result
- .and_then(|v| isize::from_str(v).map_err(|e| anyhow!(e)))
+ .and_then(|v| {
+ isize::from_str(v).map_err(|e| InvalidConfig {
+ item: Self::TimelineLayoutVersion.as_ref(),
+ source: Box::new(e),
+ })
+ })
.map(HudiConfigValue::Integer),
}
}
@@ -199,13 +236,13 @@ pub enum TableTypeValue {
}
impl FromStr for TableTypeValue {
- type Err = anyhow::Error;
+ type Err = CoreError;
fn from_str(s: &str) -> Result<Self> {
match s.to_ascii_lowercase().as_str() {
"copy_on_write" | "copy-on-write" | "cow" => Ok(Self::CopyOnWrite),
"merge_on_read" | "merge-on-read" | "mor" => Ok(Self::MergeOnRead),
- _ => Err(anyhow!("Unsupported table type: {}", s)),
+ v => Err(Unsupported(format!("Unsupported table type {}", v))),
}
}
}
@@ -218,12 +255,12 @@ pub enum BaseFileFormatValue {
}
impl FromStr for BaseFileFormatValue {
- type Err = anyhow::Error;
+ type Err = CoreError;
fn from_str(s: &str) -> Result<Self> {
match s.to_ascii_lowercase().as_str() {
"parquet" => Ok(Self::Parquet),
- _ => Err(anyhow!("Unsupported base file format: {}", s)),
+ v => Err(Unsupported(format!("Unsupported base file format {}",
v))),
}
}
}
diff --git a/crates/core/src/config/utils.rs b/crates/core/src/config/utils.rs
index 800a81d..85784ec 100644
--- a/crates/core/src/config/utils.rs
+++ b/crates/core/src/config/utils.rs
@@ -18,11 +18,12 @@
*/
//! Config utilities.
-use anyhow::{Context, Result};
use bytes::Bytes;
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Cursor};
+use crate::{CoreError, Result};
+
/// Returns an empty iterator to represent an empty set of options.
pub fn empty_options<'a>() -> std::iter::Empty<(&'a str, &'a str)> {
std::iter::empty::<(&str, &str)>()
@@ -57,7 +58,7 @@ pub fn parse_data_for_options(data: &Bytes, split_chars:
&str) -> Result<HashMap
let mut options = HashMap::new();
for line in lines {
- let line = line.context("Failed to read line")?;
+ let line = line.map_err(|e| CoreError::Internal(format!("Failed to
read line {e}")))?;
let trimmed_line = line.trim();
if trimmed_line.is_empty() || trimmed_line.starts_with('#') {
continue;
@@ -65,7 +66,9 @@ pub fn parse_data_for_options(data: &Bytes, split_chars:
&str) -> Result<HashMap
let mut parts = trimmed_line.splitn(2, |c| split_chars.contains(c));
let key = parts
.next()
- .context("Missing key in config line")?
+ .ok_or(CoreError::Internal(
+ "Missing key in config line".to_string(),
+ ))?
.trim()
.to_owned();
let value = parts.next().unwrap_or("").trim().to_owned();
diff --git a/crates/core/src/file_group/mod.rs
b/crates/core/src/file_group/mod.rs
index db96b24..2fde1bf 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -28,11 +28,10 @@ use std::fmt::Formatter;
use std::hash::{Hash, Hasher};
use std::path::PathBuf;
-use anyhow::{anyhow, Result};
-
use crate::storage::file_info::FileInfo;
use crate::storage::file_stats::FileStats;
use crate::storage::Storage;
+use crate::{CoreError::Internal, Result};
/// Represents common metadata about a Hudi Base File.
#[derive(Clone, Debug)]
@@ -51,10 +50,12 @@ impl BaseFile {
/// Parse file name and extract file_group_id and commit_time.
fn parse_file_name(file_name: &str) -> Result<(String, String)> {
let err_msg = format!("Failed to parse file name '{}' for base file.",
file_name);
- let (name, _) =
file_name.rsplit_once('.').ok_or(anyhow!(err_msg.clone()))?;
+ let (name, _) = file_name
+ .rsplit_once('.')
+ .ok_or(Internal(err_msg.clone()))?;
let parts: Vec<&str> = name.split('_').collect();
- let file_group_id =
parts.first().ok_or(anyhow!(err_msg.clone()))?.to_string();
- let commit_time =
parts.get(2).ok_or(anyhow!(err_msg.clone()))?.to_string();
+ let file_group_id =
parts.first().ok_or(Internal(err_msg.clone()))?.to_string();
+ let commit_time =
parts.get(2).ok_or(Internal(err_msg.clone()))?.to_string();
Ok((file_group_id, commit_time))
}
@@ -189,11 +190,11 @@ impl FileGroup {
pub fn add_base_file(&mut self, base_file: BaseFile) -> Result<&Self> {
let commit_time = base_file.commit_time.as_str();
if self.file_slices.contains_key(commit_time) {
- Err(anyhow!(
+ Err(Internal(format!(
"Commit time {0} is already present in File Group {1}",
commit_time.to_owned(),
self.id,
- ))
+ )))
} else {
self.file_slices.insert(
commit_time.to_owned(),
diff --git a/crates/core/src/file_group/reader.rs
b/crates/core/src/file_group/reader.rs
index 1c7d748..eeb283a 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -21,7 +21,7 @@ use crate::config::utils::split_hudi_options_from_others;
use crate::config::HudiConfigs;
use crate::file_group::FileSlice;
use crate::storage::Storage;
-use anyhow::Result;
+use crate::Result;
use arrow_array::RecordBatch;
use std::sync::Arc;
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
index 3190dec..6485c1d 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/lib.rs
@@ -48,3 +48,49 @@ pub mod file_group;
pub mod storage;
pub mod table;
pub mod util;
+
+use thiserror::Error;
+
+#[derive(Error, Debug)]
+pub enum CoreError {
+ #[error("Config '{0}' not found")]
+ ConfigNotFound(String),
+
+ #[error("Invalid config item '{item}', {source:?}")]
+ InvalidConfig {
+ item: &'static str,
+ source: Box<dyn std::error::Error + Sync + Send + 'static>,
+ },
+
+ #[error("Parse url '{url}' failed, {source}")]
+ UrlParse {
+ url: String,
+ source: url::ParseError,
+ },
+
+ #[error("Invalid file path '{name}', {detail}")]
+ InvalidPath { name: String, detail: String },
+
+ #[error("{0}")]
+ Unsupported(String),
+
+ #[error("{0}")]
+ Internal(String),
+
+ #[error(transparent)]
+ Utf8Error(#[from] std::str::Utf8Error),
+
+ #[error(transparent)]
+ ObjectStore(#[from] object_store::Error),
+
+ #[error(transparent)]
+ ObjectStorePath(#[from] object_store::path::Error),
+
+ #[error(transparent)]
+ Parquet(#[from] parquet::errors::ParquetError),
+
+ #[error(transparent)]
+ Arrow(#[from] arrow::error::ArrowError),
+}
+
+type Result<T, E = CoreError> = std::result::Result<T, E>;
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 9a4aae1..0a9f0e1 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -22,11 +22,6 @@ use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
-use crate::config::table::HudiTableConfig;
-use crate::config::HudiConfigs;
-use crate::storage::file_info::FileInfo;
-use crate::storage::utils::join_url_segments;
-use anyhow::{anyhow, Context, Result};
use arrow::compute::concat_batches;
use arrow::record_batch::RecordBatch;
use async_recursion::async_recursion;
@@ -39,6 +34,12 @@ use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::file::metadata::ParquetMetaData;
use url::Url;
+use crate::config::table::HudiTableConfig;
+use crate::config::HudiConfigs;
+use crate::storage::file_info::FileInfo;
+use crate::storage::utils::join_url_segments;
+use crate::{CoreError, Result};
+
pub mod file_info;
pub mod file_stats;
pub mod utils;
@@ -60,10 +61,10 @@ impl Storage {
hudi_configs: Arc<HudiConfigs>,
) -> Result<Arc<Storage>> {
if !hudi_configs.contains(HudiTableConfig::BasePath) {
- return Err(anyhow!(
+ return Err(CoreError::Internal(format!(
"Failed to create storage: {} is required.",
HudiTableConfig::BasePath.as_ref()
- ));
+ )));
}
let base_url = hudi_configs.get(HudiTableConfig::BasePath)?.to_url()?;
@@ -75,7 +76,7 @@ impl Storage {
options,
hudi_configs,
})),
- Err(e) => Err(anyhow!("Failed to create storage: {}", e)),
+ Err(e) => Err(CoreError::ObjectStore(e)),
}
}
@@ -108,7 +109,10 @@ impl Storage {
let uri = obj_url.to_string();
let name = obj_path
.filename()
- .ok_or(anyhow!("Failed to get file name for {}", obj_path))?
+ .ok_or(CoreError::InvalidPath {
+ name: obj_path.to_string(),
+ detail: "failed to get file name".to_string(),
+ })?
.to_string();
Ok(FileInfo {
uri,
@@ -156,16 +160,14 @@ impl Storage {
let mut batches = Vec::new();
while let Some(r) = stream.next().await {
- let batch = r.context("Failed to read record batch.")?;
- batches.push(batch)
+ batches.push(r?)
}
if batches.is_empty() {
return Ok(RecordBatch::new_empty(schema.clone()));
}
- concat_batches(&schema, &batches)
- .map_err(|e| anyhow!("Failed to concat record batches: {}", e))
+ Ok(concat_batches(&schema, &batches)?)
}
pub async fn list_dirs(&self, subdir: Option<&str>) -> Result<Vec<String>>
{
@@ -174,7 +176,10 @@ impl Storage {
for dir in dir_paths {
dirs.push(
dir.filename()
- .ok_or(anyhow!("Failed to get file name for {}", dir))?
+ .ok_or(CoreError::InvalidPath {
+ name: dir.to_string(),
+ detail: "failed to get file name".to_string(),
+ })?
.to_string(),
)
}
@@ -203,10 +208,10 @@ impl Storage {
let name = obj_meta
.location
.filename()
- .ok_or(anyhow!(
- "Failed to get file name for {:?}",
- obj_meta.location
- ))?
+ .ok_or(CoreError::InvalidPath {
+ name: obj_meta.location.to_string(),
+ detail: "failed to get file name".to_string(),
+ })?
.to_string();
let uri = join_url_segments(&prefix_url, &[&name])?.to_string();
file_info.push(FileInfo {
@@ -241,9 +246,10 @@ pub async fn get_leaf_dirs(storage: &Storage, subdir:
Option<&str>) -> Result<Ve
next_subdir.push(curr);
}
next_subdir.push(child_dir);
- let next_subdir = next_subdir
- .to_str()
- .ok_or(anyhow!("Failed to convert path: {:?}", next_subdir))?;
+ let next_subdir =
next_subdir.to_str().ok_or(CoreError::InvalidPath {
+ name: format!("{:?}", next_subdir),
+ detail: "failed to convert path".to_string(),
+ })?;
let curr_leaf_dir = get_leaf_dirs(storage,
Some(next_subdir)).await?;
leaf_dirs.extend(curr_leaf_dir);
}
@@ -293,10 +299,7 @@ mod tests {
result.is_err(),
"Should return error when no base path is invalid."
);
- assert!(result
- .unwrap_err()
- .to_string()
- .contains("Failed to create storage"));
+ assert!(matches!(result.unwrap_err(), CoreError::ObjectStore(_)));
}
#[tokio::test]
diff --git a/crates/core/src/storage/utils.rs b/crates/core/src/storage/utils.rs
index 2413613..10198a1 100644
--- a/crates/core/src/storage/utils.rs
+++ b/crates/core/src/storage/utils.rs
@@ -16,13 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-//! Utility functions for storage.
-use std::path::{Path, PathBuf};
-use std::str::FromStr;
-use anyhow::{anyhow, Result};
+//! Utility functions for storage.
+use std::path::Path;
use url::{ParseError, Url};
+use crate::{
+ CoreError::{InvalidPath, UrlParse},
+ Result,
+};
+
/// Splits a filename into a stem and an extension.
pub fn split_filename(filename: &str) -> Result<(String, String)> {
let path = Path::new(filename);
@@ -30,7 +33,10 @@ pub fn split_filename(filename: &str) -> Result<(String,
String)> {
let stem = path
.file_stem()
.and_then(|s| s.to_str())
- .ok_or_else(|| anyhow!("No file stem found"))?
+ .ok_or_else(|| InvalidPath {
+ name: filename.to_string(),
+ detail: "no file stem found".to_string(),
+ })?
.to_string();
let extension = path
@@ -44,13 +50,20 @@ pub fn split_filename(filename: &str) -> Result<(String,
String)> {
/// Parses a URI string into a URL.
pub fn parse_uri(uri: &str) -> Result<Url> {
- let mut url = Url::parse(uri)
- .or(Url::from_file_path(PathBuf::from_str(uri)?))
- .map_err(|_| anyhow!("Failed to parse uri: {}", uri))?;
+ let mut url = match Url::parse(uri) {
+ Ok(url) => url,
+ Err(source) => Url::from_directory_path(uri).map_err(|_| UrlParse {
+ url: uri.to_string(),
+ source,
+ })?,
+ };
if url.path().ends_with('/') {
url.path_segments_mut()
- .map_err(|_| anyhow!("Failed to parse uri: {}", uri))?
+ .map_err(|_| InvalidPath {
+ name: uri.to_string(),
+ detail: "parse uri failed".to_string(),
+ })?
.pop();
}
@@ -73,7 +86,10 @@ pub fn join_url_segments(base_url: &Url, segments: &[&str])
-> Result<Url> {
for &seg in segments {
let segs: Vec<_> = seg.split('/').filter(|&s| !s.is_empty()).collect();
url.path_segments_mut()
- .map_err(|_| ParseError::RelativeUrlWithoutBase)?
+ .map_err(|_| UrlParse {
+ url: base_url.to_string(),
+ source: ParseError::RelativeUrlWithoutBase,
+ })?
.extend(segs);
}
diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs
index c037a0c..eca7d2b 100644
--- a/crates/core/src/table/builder.rs
+++ b/crates/core/src/table/builder.rs
@@ -17,7 +17,6 @@
* under the License.
*/
-use anyhow::{anyhow, Context, Result};
use paste::paste;
use std::collections::HashMap;
use std::env;
@@ -38,6 +37,7 @@ use crate::storage::Storage;
use crate::table::fs_view::FileSystemView;
use crate::table::timeline::Timeline;
use crate::table::Table;
+use crate::{CoreError, Result};
/// Builder for creating a [Table] instance.
#[derive(Debug, Clone)]
@@ -112,13 +112,10 @@ impl TableBuilder {
let hudi_configs = Arc::from(hudi_configs);
let storage_options = Arc::from(self.storage_options.clone());
- let timeline = Timeline::new(hudi_configs.clone(),
storage_options.clone())
- .await
- .context("Failed to load timeline")?;
+ let timeline = Timeline::new(hudi_configs.clone(),
storage_options.clone()).await?;
- let file_system_view = FileSystemView::new(hudi_configs.clone(),
storage_options.clone())
- .await
- .context("Failed to load file system view")?;
+ let file_system_view =
+ FileSystemView::new(hudi_configs.clone(),
storage_options.clone()).await?;
Ok(Table {
hudi_configs,
@@ -253,22 +250,26 @@ impl TableBuilder {
// additional validation
let table_type = hudi_configs.get(TableType)?.to::<String>();
if TableTypeValue::from_str(&table_type)? != CopyOnWrite {
- return Err(anyhow!("Only support copy-on-write table."));
+ return Err(CoreError::Unsupported(
+ "Only support copy-on-write table.".to_string(),
+ ));
}
let table_version = hudi_configs.get(TableVersion)?.to::<isize>();
if !(5..=6).contains(&table_version) {
- return Err(anyhow!("Only support table version 5 and 6."));
+ return Err(CoreError::Unsupported(
+ "Only support table version 5 and 6.".to_string(),
+ ));
}
let drops_partition_cols = hudi_configs
.get_or_default(DropsPartitionFields)
.to::<bool>();
if drops_partition_cols {
- return Err(anyhow!(
+ return Err(CoreError::Unsupported(format!(
"Only support when `{}` is disabled",
DropsPartitionFields.as_ref()
- ));
+ )));
}
Ok(())
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index f2309e3..ee68eec 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -24,8 +24,9 @@ use crate::config::HudiConfigs;
use crate::file_group::{BaseFile, FileGroup, FileSlice};
use crate::storage::file_info::FileInfo;
use crate::storage::{get_leaf_dirs, Storage};
+
use crate::table::partition::PartitionPruner;
-use anyhow::Result;
+use crate::{CoreError, Result};
use dashmap::DashMap;
use futures::stream::{self, StreamExt, TryStreamExt};
@@ -134,7 +135,7 @@ impl FileSystemView {
.map(|path| async move {
let file_groups =
Self::load_file_groups_for_partition(&self.storage,
&path).await?;
- Ok::<_, anyhow::Error>((path, file_groups))
+ Ok::<_, CoreError>((path, file_groups))
})
// TODO parameterize the parallelism for partition loading
.buffer_unordered(10)
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 4d69bbf..4836d1c 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -87,7 +87,6 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
-use anyhow::{anyhow, Context, Result};
use arrow::record_batch::RecordBatch;
use arrow_schema::{Field, Schema};
use url::Url;
@@ -102,6 +101,7 @@ use crate::table::builder::TableBuilder;
use crate::table::fs_view::FileSystemView;
use crate::table::partition::PartitionPruner;
use crate::table::timeline::Timeline;
+use crate::{CoreError, Result};
pub mod builder;
mod fs_view;
@@ -161,7 +161,6 @@ impl Table {
.register_object_store(runtime_env.clone());
}
- /// Get the latest [Schema] of the table.
pub async fn get_schema(&self) -> Result<Schema> {
self.timeline.get_latest_schema().await
}
@@ -259,16 +258,18 @@ impl Table {
timestamp: &str,
filters: &[(&str, &str, &str)],
) -> Result<Vec<RecordBatch>> {
- let file_slices = self
- .get_file_slices_as_of(timestamp, filters)
- .await
- .context(format!("Failed to get file slices as of {}",
timestamp))?;
+ let file_slices = self.get_file_slices_as_of(timestamp,
filters).await?;
let mut batches = Vec::new();
let fg_reader = self.create_file_group_reader();
for f in file_slices {
match fg_reader.read_file_slice(&f).await {
Ok(batch) => batches.push(batch),
- Err(e) => return Err(anyhow!("Failed to read file slice {:?} -
{}", f, e)),
+ Err(e) => {
+ return Err(CoreError::Internal(format!(
+ "Failed to read file slice {:?} - {}",
+ f, e
+ )))
+ }
}
}
Ok(batches)
diff --git a/crates/core/src/table/partition.rs
b/crates/core/src/table/partition.rs
index 17927eb..8f03521 100644
--- a/crates/core/src/table/partition.rs
+++ b/crates/core/src/table/partition.rs
@@ -18,8 +18,7 @@
*/
use crate::config::table::HudiTableConfig;
use crate::config::HudiConfigs;
-use anyhow::Result;
-use anyhow::{anyhow, Context};
+use crate::{CoreError, Result};
use arrow_array::{ArrayRef, Scalar, StringArray};
use arrow_cast::{cast_with_options, CastOptions};
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
@@ -120,11 +119,11 @@ impl PartitionPruner {
let parts: Vec<&str> = partition_path.split('/').collect();
if parts.len() != self.schema.fields().len() {
- return Err(anyhow!(
+ return Err(CoreError::Internal(format!(
"Partition path should have {} part(s) but got {}",
self.schema.fields().len(),
parts.len()
- ));
+ )));
}
self.schema
@@ -134,14 +133,17 @@ impl PartitionPruner {
.map(|(field, part)| {
let value = if self.is_hive_style {
let (name, value) = part.split_once('=').ok_or_else(|| {
- anyhow!("Partition path should be hive-style but got
{}", part)
+ CoreError::Internal(format!(
+ "Partition path should be hive-style but got {}",
+ part
+ ))
})?;
if name != field.name() {
- return Err(anyhow!(
+ return Err(CoreError::Internal(format!(
"Partition path should contain {} but got {}",
field.name(),
name
- ));
+ )));
}
value
} else {
@@ -177,13 +179,13 @@ impl Operator {
}
impl FromStr for Operator {
- type Err = anyhow::Error;
+ type Err = crate::CoreError;
fn from_str(s: &str) -> Result<Self> {
Operator::TOKEN_OP_PAIRS
.iter()
.find_map(|&(token, op)| if token == s { Some(op) } else { None })
- .ok_or_else(|| anyhow!("Unsupported operator: {}", s))
+ .ok_or_else(|| CoreError::Internal(format!("Unsupported operator:
{}", s)))
}
}
@@ -196,21 +198,17 @@ pub struct PartitionFilter {
}
impl TryFrom<((&str, &str, &str), &Schema)> for PartitionFilter {
- type Error = anyhow::Error;
+ type Error = crate::CoreError;
fn try_from((filter, partition_schema): ((&str, &str, &str), &Schema)) ->
Result<Self> {
let (field_name, operator_str, value_str) = filter;
- let field: &Field = partition_schema
- .field_with_name(field_name)
- .with_context(|| format!("Field '{}' not found in partition
schema", field_name))?;
+ let field: &Field = partition_schema.field_with_name(field_name)?;
- let operator = Operator::from_str(operator_str)
- .with_context(|| format!("Unsupported operator: {}",
operator_str))?;
+ let operator = Operator::from_str(operator_str)?;
let value = &[value_str];
- let value = Self::cast_value(value, field.data_type())
- .with_context(|| format!("Unable to cast {:?} as {:?}", value,
field.data_type()))?;
+ let value = Self::cast_value(value, field.data_type())?;
let field = field.clone();
Ok(PartitionFilter {
@@ -290,7 +288,7 @@ mod tests {
assert!(filter
.unwrap_err()
.to_string()
- .contains("not found in partition schema"));
+ .contains("Unable to get field named"));
}
#[test]
@@ -311,7 +309,10 @@ mod tests {
let filter_tuple = ("count", "=", "not_a_number");
let filter = PartitionFilter::try_from((filter_tuple, &schema));
assert!(filter.is_err());
- assert!(filter.unwrap_err().to_string().contains("Unable to cast"));
+ assert!(filter
+ .unwrap_err()
+ .to_string()
+ .contains("Cannot cast string"));
}
#[test]
diff --git a/crates/core/src/table/timeline.rs
b/crates/core/src/table/timeline.rs
index 2993c95..d147abf 100644
--- a/crates/core/src/table/timeline.rs
+++ b/crates/core/src/table/timeline.rs
@@ -23,7 +23,6 @@ use std::fmt::Debug;
use std::path::PathBuf;
use std::sync::Arc;
-use anyhow::{anyhow, Context, Result};
use arrow_schema::Schema;
use parquet::arrow::parquet_to_arrow_schema;
use serde_json::{Map, Value};
@@ -32,6 +31,7 @@ use crate::config::HudiConfigs;
use crate::file_group::FileGroup;
use crate::storage::utils::split_filename;
use crate::storage::Storage;
+use crate::{CoreError, Result};
/// The [State] of an [Instant] represents the status of the action performed
on the table.
#[allow(dead_code)]
@@ -80,7 +80,10 @@ impl Instant {
commit_file_path.push(self.file_name());
commit_file_path
.to_str()
- .ok_or(anyhow!("Failed to get file path for {:?}", self))
+ .ok_or(CoreError::Internal(format!(
+ "Failed to get file path for {:?}",
+ self
+ )))
.map(|s| s.to_string())
}
@@ -140,10 +143,11 @@ impl Timeline {
.storage
.get_file_data(instant.relative_path()?.as_str())
.await?;
- let json: Value = serde_json::from_slice(&bytes)?;
+ let json: Value = serde_json::from_slice(&bytes)
+ .map_err(|e| CoreError::Internal(format!("Invalid instant {:?}",
e)))?;
let commit_metadata = json
.as_object()
- .ok_or_else(|| anyhow!("Expected JSON object"))?
+ .ok_or_else(|| CoreError::Internal("Expected JSON
object".to_string()))?
.clone();
Ok(commit_metadata)
}
@@ -167,17 +171,15 @@ impl Timeline {
.and_then(|first_value| first_value["path"].as_str());
if let Some(path) = parquet_path {
- let parquet_meta = self
- .storage
- .get_parquet_file_metadata(path)
- .await
- .context("Failed to get parquet file metadata")?;
-
-
parquet_to_arrow_schema(parquet_meta.file_metadata().schema_descr(), None)
- .context("Failed to resolve the latest schema")
+ let parquet_meta =
self.storage.get_parquet_file_metadata(path).await?;
+
+ Ok(parquet_to_arrow_schema(
+ parquet_meta.file_metadata().schema_descr(),
+ None,
+ )?)
} else {
- Err(anyhow!(
- "Failed to resolve the latest schema: no file path found"
+ Err(CoreError::Internal(
+ "Failed to resolve the latest schema: no file path
found".to_string(),
))
}
}
diff --git a/python/Cargo.toml b/python/Cargo.toml
index 9033667..1861c65 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -39,7 +39,7 @@ arrow = { workspace = true }
arrow-schema = { workspace = true }
# "stdlib"
-anyhow = { workspace = true }
+thiserror = { workspace = true }
# runtime / async
futures = { workspace = true }
@@ -47,4 +47,4 @@ tokio = { workspace = true }
[dependencies.pyo3]
version = "0.22.2"
-features = ["extension-module", "abi3", "abi3-py39", "anyhow"]
+features = ["extension-module", "abi3", "abi3-py39"]
diff --git a/python/src/internal.rs b/python/src/internal.rs
index 37201cd..549a503 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -18,10 +18,10 @@
*/
use std::collections::HashMap;
+use std::convert::From;
use std::path::PathBuf;
use std::sync::OnceLock;
-use anyhow::Context;
use arrow::pyarrow::ToPyArrow;
use pyo3::{pyclass, pyfunction, pymethods, PyErr, PyObject, PyResult, Python};
use tokio::runtime::Runtime;
@@ -32,6 +32,30 @@ use hudi::table::builder::TableBuilder;
use hudi::table::Table;
use hudi::util::convert_vec_to_slice;
use hudi::util::vec_to_slice;
+use hudi::CoreError;
+use pyo3::create_exception;
+use pyo3::exceptions::PyException;
+
+create_exception!(_internal, HudiCoreError, PyException);
+
+fn convert_to_py_err(err: CoreError) -> PyErr {
+ // TODO(xushiyan): match and map all sub types
+ HudiCoreError::new_err(err.to_string())
+}
+
+#[derive(thiserror::Error, Debug)]
+pub enum PythonError {
+ #[error("Error in Hudi core")]
+ HudiCore(#[from] CoreError),
+}
+
+impl From<PythonError> for PyErr {
+ fn from(err: PythonError) -> PyErr {
+ match err {
+ PythonError::HudiCore(err) => convert_to_py_err(err),
+ }
+ }
+}
#[cfg(not(tarpaulin))]
#[derive(Clone, Debug)]
@@ -46,7 +70,8 @@ impl HudiFileGroupReader {
#[new]
#[pyo3(signature = (base_uri, options=None))]
fn new(base_uri: &str, options: Option<HashMap<String, String>>) ->
PyResult<Self> {
- let inner = FileGroupReader::new_with_options(base_uri,
options.unwrap_or_default())?;
+ let inner = FileGroupReader::new_with_options(base_uri,
options.unwrap_or_default())
+ .map_err(PythonError::from)?;
Ok(HudiFileGroupReader { inner })
}
@@ -55,7 +80,8 @@ impl HudiFileGroupReader {
relative_path: &str,
py: Python,
) -> PyResult<PyObject> {
-
rt().block_on(self.inner.read_file_slice_by_base_file_path(relative_path))?
+
rt().block_on(self.inner.read_file_slice_by_base_file_path(relative_path))
+ .map_err(PythonError::from)?
.to_pyarrow(py)
}
}
@@ -84,15 +110,17 @@ pub struct HudiFileSlice {
#[pymethods]
impl HudiFileSlice {
fn base_file_relative_path(&self) -> PyResult<String> {
- PathBuf::from(&self.partition_path)
+ Ok(PathBuf::from(&self.partition_path)
.join(&self.base_file_name)
.to_str()
.map(String::from)
- .context(format!(
- "Failed to get base file relative path for file slice: {:?}",
- self
- ))
- .map_err(PyErr::from)
+ .ok_or_else(|| {
+ CoreError::Internal(format!(
+ "Failed to get base file relative path for file slice:
{:?}",
+ self
+ ))
+ })
+ .map_err(PythonError::from)?)
}
}
@@ -132,10 +160,12 @@ impl HudiTable {
base_uri: &str,
options: Option<HashMap<String, String>>,
) -> PyResult<Self> {
- let inner: Table = rt().block_on(Table::new_with_options(
- base_uri,
- options.unwrap_or_default(),
- ))?;
+ let inner: Table = rt()
+ .block_on(Table::new_with_options(
+ base_uri,
+ options.unwrap_or_default(),
+ ))
+ .map_err(PythonError::from)?;
Ok(HudiTable { inner })
}
@@ -148,11 +178,14 @@ impl HudiTable {
}
fn get_schema(&self, py: Python) -> PyResult<PyObject> {
- rt().block_on(self.inner.get_schema())?.to_pyarrow(py)
+ rt().block_on(self.inner.get_schema())
+ .map_err(PythonError::from)?
+ .to_pyarrow(py)
}
fn get_partition_schema(&self, py: Python) -> PyResult<PyObject> {
- rt().block_on(self.inner.get_partition_schema())?
+ rt().block_on(self.inner.get_partition_schema())
+ .map_err(PythonError::from)?
.to_pyarrow(py)
}
@@ -164,10 +197,12 @@ impl HudiTable {
py: Python,
) -> PyResult<Vec<Vec<HudiFileSlice>>> {
py.allow_threads(|| {
- let file_slices = rt().block_on(
- self.inner
- .get_file_slices_splits(n,
vec_to_slice!(filters.unwrap_or_default())),
- )?;
+ let file_slices = rt()
+ .block_on(
+ self.inner
+ .get_file_slices_splits(n,
vec_to_slice!(filters.unwrap_or_default())),
+ )
+ .map_err(PythonError::from)?;
Ok(file_slices
.iter()
.map(|inner_vec|
inner_vec.iter().map(convert_file_slice).collect())
@@ -182,10 +217,12 @@ impl HudiTable {
py: Python,
) -> PyResult<Vec<HudiFileSlice>> {
py.allow_threads(|| {
- let file_slices = rt().block_on(
- self.inner
-
.get_file_slices(vec_to_slice!(filters.unwrap_or_default())),
- )?;
+ let file_slices = rt()
+ .block_on(
+ self.inner
+
.get_file_slices(vec_to_slice!(filters.unwrap_or_default())),
+ )
+ .map_err(PythonError::from)?;
Ok(file_slices.iter().map(convert_file_slice).collect())
})
}
@@ -204,7 +241,8 @@ impl HudiTable {
rt().block_on(
self.inner
.read_snapshot(vec_to_slice!(filters.unwrap_or_default())),
- )?
+ )
+ .map_err(PythonError::from)?
.to_pyarrow(py)
}
}
@@ -218,13 +256,15 @@ pub fn build_hudi_table(
storage_options: Option<HashMap<String, String>>,
options: Option<HashMap<String, String>>,
) -> PyResult<HudiTable> {
- let inner = rt().block_on(
- TableBuilder::from_base_uri(&base_uri)
- .with_hudi_options(hudi_options.unwrap_or_default())
- .with_storage_options(storage_options.unwrap_or_default())
- .with_options(options.unwrap_or_default())
- .build(),
- )?;
+ let inner = rt()
+ .block_on(
+ TableBuilder::from_base_uri(&base_uri)
+ .with_hudi_options(hudi_options.unwrap_or_default())
+ .with_storage_options(storage_options.unwrap_or_default())
+ .with_options(options.unwrap_or_default())
+ .build(),
+ )
+ .map_err(PythonError::from)?;
Ok(HudiTable { inner })
}