This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 12c60ac  feat: define Hudi error types across hudi-core (#124)
12c60ac is described below

commit 12c60ac0bc2481781aba0ba00f7389823f416d19
Author: JinYang <[email protected]>
AuthorDate: Thu Dec 5 09:20:03 2024 +0800

    feat: define Hudi error types across hudi-core (#124)
    
    Use `thiserror` instead of `anyhow` to define the error for hudi-core crate.
    
    ---------
    
    Signed-off-by: GoHalo <[email protected]>
    Co-authored-by: Shiyan Xu <[email protected]>
---
 Cargo.toml                           |   2 +-
 crates/core/Cargo.toml               |   2 +-
 crates/core/src/config/internal.rs   |  16 ++++--
 crates/core/src/config/mod.rs        |   3 +-
 crates/core/src/config/read.rs       |  25 ++++++---
 crates/core/src/config/table.rs      |  67 +++++++++++++++++------
 crates/core/src/config/utils.rs      |   9 ++--
 crates/core/src/file_group/mod.rs    |  15 +++---
 crates/core/src/file_group/reader.rs |   2 +-
 crates/core/src/lib.rs               |  46 ++++++++++++++++
 crates/core/src/storage/mod.rs       |  53 +++++++++---------
 crates/core/src/storage/utils.rs     |  36 +++++++++----
 crates/core/src/table/builder.rs     |  23 ++++----
 crates/core/src/table/fs_view.rs     |   5 +-
 crates/core/src/table/mod.rs         |  15 +++---
 crates/core/src/table/partition.rs   |  39 +++++++-------
 crates/core/src/table/timeline.rs    |  30 ++++++-----
 python/Cargo.toml                    |   4 +-
 python/src/internal.rs               | 102 ++++++++++++++++++++++++-----------
 19 files changed, 333 insertions(+), 161 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index b8dd722..971e9eb 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -61,7 +61,7 @@ serde = { version = "1.0.203", features = ["derive"] }
 serde_json = { version = "1" }
 
 # "stdlib"
-anyhow = { version = "1.0.86" }
+thiserror = { version = "2.0.3" }
 bytes = { version = "1" }
 paste = { version = "1.0.15" }
 once_cell = { version = "1.19.0" }
diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index 90fae7d..d7c9d06 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -48,7 +48,7 @@ serde = { workspace = true }
 serde_json = { workspace = true }
 
 # "stdlib"
-anyhow = { workspace = true }
+thiserror = { workspace = true }
 bytes = { workspace = true }
 paste = { workspace = true }
 strum = { workspace = true }
diff --git a/crates/core/src/config/internal.rs 
b/crates/core/src/config/internal.rs
index 89fd314..4fdc276 100644
--- a/crates/core/src/config/internal.rs
+++ b/crates/core/src/config/internal.rs
@@ -21,10 +21,13 @@
 use std::collections::HashMap;
 use std::str::FromStr;
 
-use anyhow::{anyhow, Result};
 use strum_macros::EnumIter;
 
-use crate::config::{ConfigParser, HudiConfigValue};
+use crate::{
+    config::{ConfigParser, HudiConfigValue},
+    CoreError::{ConfigNotFound, InvalidConfig},
+    Result,
+};
 
 /// Configurations for internal use.
 ///
@@ -64,11 +67,16 @@ impl ConfigParser for HudiInternalConfig {
         let get_result = configs
             .get(self.as_ref())
             .map(|v| v.as_str())
-            .ok_or(anyhow!("Config '{}' not found", self.as_ref()));
+            .ok_or(ConfigNotFound(self.as_ref().to_string()));
 
         match self {
             Self::SkipConfigValidation => get_result
-                .and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e)))
+                .and_then(|v| {
+                    bool::from_str(v).map_err(|e| InvalidConfig {
+                        item: Self::SkipConfigValidation.as_ref(),
+                        source: Box::new(e),
+                    })
+                })
                 .map(HudiConfigValue::Boolean),
         }
     }
diff --git a/crates/core/src/config/mod.rs b/crates/core/src/config/mod.rs
index 7b557b4..f586a89 100644
--- a/crates/core/src/config/mod.rs
+++ b/crates/core/src/config/mod.rs
@@ -21,8 +21,7 @@ use std::any::type_name;
 use std::collections::HashMap;
 use std::sync::Arc;
 
-use crate::storage::utils::parse_uri;
-use anyhow::Result;
+use crate::{storage::utils::parse_uri, Result};
 use serde::{Deserialize, Serialize};
 use url::Url;
 
diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs
index e67617d..a938953 100644
--- a/crates/core/src/config/read.rs
+++ b/crates/core/src/config/read.rs
@@ -21,10 +21,14 @@
 use std::collections::HashMap;
 use std::str::FromStr;
 
-use crate::config::{ConfigParser, HudiConfigValue};
-use anyhow::{anyhow, Result};
 use strum_macros::EnumIter;
 
+use crate::{
+    config::{ConfigParser, HudiConfigValue},
+    CoreError::{ConfigNotFound, InvalidConfig},
+    Result,
+};
+
 /// Configurations for reading Hudi tables.
 ///
 /// **Example**
@@ -37,6 +41,7 @@ use strum_macros::EnumIter;
 /// HudiTable::new_with_options("/tmp/hudi_data", options)
 /// ```
 ///
+
 #[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)]
 pub enum HudiReadConfig {
     /// Define input splits
@@ -74,11 +79,16 @@ impl ConfigParser for HudiReadConfig {
         let get_result = configs
             .get(self.as_ref())
             .map(|v| v.as_str())
-            .ok_or(anyhow!("Config '{}' not found", self.as_ref()));
+            .ok_or(ConfigNotFound(self.as_ref().to_string()));
 
         match self {
             Self::InputPartitions => get_result
-                .and_then(|v| usize::from_str(v).map_err(|e| anyhow!(e)))
+                .and_then(|v| {
+                    usize::from_str(v).map_err(|e| InvalidConfig {
+                        item: Self::InputPartitions.as_ref(),
+                        source: Box::new(e),
+                    })
+                })
                 .map(HudiConfigValue::UInteger),
             Self::AsOfTimestamp => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
         }
@@ -89,8 +99,8 @@ impl ConfigParser for HudiReadConfig {
 mod tests {
     use crate::config::read::HudiReadConfig::InputPartitions;
     use crate::config::ConfigParser;
+    use crate::CoreError::InvalidConfig;
     use std::collections::HashMap;
-    use std::num::ParseIntError;
 
     #[test]
     fn parse_valid_config_value() {
@@ -103,7 +113,10 @@ mod tests {
     fn parse_invalid_config_value() {
         let options = HashMap::from([(InputPartitions.as_ref().to_string(), 
"foo".to_string())]);
         let value = InputPartitions.parse_value(&options);
-        assert!(value.err().unwrap().is::<ParseIntError>());
+        assert!(matches!(
+            value.unwrap_err(),
+            InvalidConfig { item: _, source: _ }
+        ));
         assert_eq!(
             InputPartitions
                 .parse_value_or_default(&options)
diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs
index 98cca7d..bb19066 100644
--- a/crates/core/src/config/table.rs
+++ b/crates/core/src/config/table.rs
@@ -21,11 +21,13 @@
 use std::collections::HashMap;
 use std::str::FromStr;
 
-use anyhow::anyhow;
-use anyhow::Result;
 use strum_macros::{AsRefStr, EnumIter};
 
-use crate::config::{ConfigParser, HudiConfigValue};
+use crate::{
+    config::{ConfigParser, HudiConfigValue},
+    CoreError::{self, ConfigNotFound, InvalidConfig, Unsupported},
+    Result,
+};
 
 /// Configurations for Hudi tables, most of them are persisted in 
`hoodie.properties`.
 ///
@@ -146,7 +148,7 @@ impl ConfigParser for HudiTableConfig {
         let get_result = configs
             .get(self.as_ref())
             .map(|v| v.as_str())
-            .ok_or(anyhow!("Config '{}' not found", self.as_ref()));
+            .ok_or(ConfigNotFound(self.as_ref().to_string()));
 
         match self {
             Self::BaseFileFormat => get_result
@@ -154,24 +156,49 @@ impl ConfigParser for HudiTableConfig {
                 .map(|v| HudiConfigValue::String(v.as_ref().to_string())),
             Self::BasePath => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
             Self::Checksum => get_result
-                .and_then(|v| isize::from_str(v).map_err(|e| anyhow!(e)))
+                .and_then(|v| {
+                    isize::from_str(v).map_err(|e| InvalidConfig {
+                        item: Self::Checksum.as_ref(),
+                        source: Box::new(e),
+                    })
+                })
                 .map(HudiConfigValue::Integer),
             Self::DatabaseName => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
             Self::DropsPartitionFields => get_result
-                .and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e)))
+                .and_then(|v| {
+                    bool::from_str(v).map_err(|e| InvalidConfig {
+                        item: Self::DropsPartitionFields.as_ref(),
+                        source: Box::new(e),
+                    })
+                })
                 .map(HudiConfigValue::Boolean),
             Self::IsHiveStylePartitioning => get_result
-                .and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e)))
+                .and_then(|v| {
+                    bool::from_str(v).map_err(|e| InvalidConfig {
+                        item: Self::IsHiveStylePartitioning.as_ref(),
+                        source: Box::new(e),
+                    })
+                })
                 .map(HudiConfigValue::Boolean),
             Self::IsPartitionPathUrlencoded => get_result
-                .and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e)))
+                .and_then(|v| {
+                    bool::from_str(v).map_err(|e| InvalidConfig {
+                        item: Self::IsPartitionPathUrlencoded.as_ref(),
+                        source: Box::new(e),
+                    })
+                })
                 .map(HudiConfigValue::Boolean),
             Self::KeyGeneratorClass => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
             Self::PartitionFields => get_result
                 .map(|v| 
HudiConfigValue::List(v.split(',').map(str::to_string).collect())),
             Self::PrecombineField => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
             Self::PopulatesMetaFields => get_result
-                .and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e)))
+                .and_then(|v| {
+                    bool::from_str(v).map_err(|e| InvalidConfig {
+                        item: Self::PopulatesMetaFields.as_ref(),
+                        source: Box::new(e),
+                    })
+                })
                 .map(HudiConfigValue::Boolean),
             Self::RecordKeyFields => get_result
                 .map(|v| 
HudiConfigValue::List(v.split(',').map(str::to_string).collect())),
@@ -180,10 +207,20 @@ impl ConfigParser for HudiTableConfig {
                 .and_then(TableTypeValue::from_str)
                 .map(|v| HudiConfigValue::String(v.as_ref().to_string())),
             Self::TableVersion => get_result
-                .and_then(|v| isize::from_str(v).map_err(|e| anyhow!(e)))
+                .and_then(|v| {
+                    isize::from_str(v).map_err(|e| InvalidConfig {
+                        item: Self::TableVersion.as_ref(),
+                        source: Box::new(e),
+                    })
+                })
                 .map(HudiConfigValue::Integer),
             Self::TimelineLayoutVersion => get_result
-                .and_then(|v| isize::from_str(v).map_err(|e| anyhow!(e)))
+                .and_then(|v| {
+                    isize::from_str(v).map_err(|e| InvalidConfig {
+                        item: Self::TimelineLayoutVersion.as_ref(),
+                        source: Box::new(e),
+                    })
+                })
                 .map(HudiConfigValue::Integer),
         }
     }
@@ -199,13 +236,13 @@ pub enum TableTypeValue {
 }
 
 impl FromStr for TableTypeValue {
-    type Err = anyhow::Error;
+    type Err = CoreError;
 
     fn from_str(s: &str) -> Result<Self> {
         match s.to_ascii_lowercase().as_str() {
             "copy_on_write" | "copy-on-write" | "cow" => Ok(Self::CopyOnWrite),
             "merge_on_read" | "merge-on-read" | "mor" => Ok(Self::MergeOnRead),
-            _ => Err(anyhow!("Unsupported table type: {}", s)),
+            v => Err(Unsupported(format!("Unsupported table type {}", v))),
         }
     }
 }
@@ -218,12 +255,12 @@ pub enum BaseFileFormatValue {
 }
 
 impl FromStr for BaseFileFormatValue {
-    type Err = anyhow::Error;
+    type Err = CoreError;
 
     fn from_str(s: &str) -> Result<Self> {
         match s.to_ascii_lowercase().as_str() {
             "parquet" => Ok(Self::Parquet),
-            _ => Err(anyhow!("Unsupported base file format: {}", s)),
+            v => Err(Unsupported(format!("Unsupported base file format {}", 
v))),
         }
     }
 }
diff --git a/crates/core/src/config/utils.rs b/crates/core/src/config/utils.rs
index 800a81d..85784ec 100644
--- a/crates/core/src/config/utils.rs
+++ b/crates/core/src/config/utils.rs
@@ -18,11 +18,12 @@
  */
 //! Config utilities.
 
-use anyhow::{Context, Result};
 use bytes::Bytes;
 use std::collections::HashMap;
 use std::io::{BufRead, BufReader, Cursor};
 
+use crate::{CoreError, Result};
+
 /// Returns an empty iterator to represent an empty set of options.
 pub fn empty_options<'a>() -> std::iter::Empty<(&'a str, &'a str)> {
     std::iter::empty::<(&str, &str)>()
@@ -57,7 +58,7 @@ pub fn parse_data_for_options(data: &Bytes, split_chars: 
&str) -> Result<HashMap
     let mut options = HashMap::new();
 
     for line in lines {
-        let line = line.context("Failed to read line")?;
+        let line = line.map_err(|e| CoreError::Internal(format!("Failed to 
read line {e}")))?;
         let trimmed_line = line.trim();
         if trimmed_line.is_empty() || trimmed_line.starts_with('#') {
             continue;
@@ -65,7 +66,9 @@ pub fn parse_data_for_options(data: &Bytes, split_chars: 
&str) -> Result<HashMap
         let mut parts = trimmed_line.splitn(2, |c| split_chars.contains(c));
         let key = parts
             .next()
-            .context("Missing key in config line")?
+            .ok_or(CoreError::Internal(
+                "Missing key in config line".to_string(),
+            ))?
             .trim()
             .to_owned();
         let value = parts.next().unwrap_or("").trim().to_owned();
diff --git a/crates/core/src/file_group/mod.rs 
b/crates/core/src/file_group/mod.rs
index db96b24..2fde1bf 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -28,11 +28,10 @@ use std::fmt::Formatter;
 use std::hash::{Hash, Hasher};
 use std::path::PathBuf;
 
-use anyhow::{anyhow, Result};
-
 use crate::storage::file_info::FileInfo;
 use crate::storage::file_stats::FileStats;
 use crate::storage::Storage;
+use crate::{CoreError::Internal, Result};
 
 /// Represents common metadata about a Hudi Base File.
 #[derive(Clone, Debug)]
@@ -51,10 +50,12 @@ impl BaseFile {
     /// Parse file name and extract file_group_id and commit_time.
     fn parse_file_name(file_name: &str) -> Result<(String, String)> {
         let err_msg = format!("Failed to parse file name '{}' for base file.", 
file_name);
-        let (name, _) = 
file_name.rsplit_once('.').ok_or(anyhow!(err_msg.clone()))?;
+        let (name, _) = file_name
+            .rsplit_once('.')
+            .ok_or(Internal(err_msg.clone()))?;
         let parts: Vec<&str> = name.split('_').collect();
-        let file_group_id = 
parts.first().ok_or(anyhow!(err_msg.clone()))?.to_string();
-        let commit_time = 
parts.get(2).ok_or(anyhow!(err_msg.clone()))?.to_string();
+        let file_group_id = 
parts.first().ok_or(Internal(err_msg.clone()))?.to_string();
+        let commit_time = 
parts.get(2).ok_or(Internal(err_msg.clone()))?.to_string();
         Ok((file_group_id, commit_time))
     }
 
@@ -189,11 +190,11 @@ impl FileGroup {
     pub fn add_base_file(&mut self, base_file: BaseFile) -> Result<&Self> {
         let commit_time = base_file.commit_time.as_str();
         if self.file_slices.contains_key(commit_time) {
-            Err(anyhow!(
+            Err(Internal(format!(
                 "Commit time {0} is already present in File Group {1}",
                 commit_time.to_owned(),
                 self.id,
-            ))
+            )))
         } else {
             self.file_slices.insert(
                 commit_time.to_owned(),
diff --git a/crates/core/src/file_group/reader.rs 
b/crates/core/src/file_group/reader.rs
index 1c7d748..eeb283a 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -21,7 +21,7 @@ use crate::config::utils::split_hudi_options_from_others;
 use crate::config::HudiConfigs;
 use crate::file_group::FileSlice;
 use crate::storage::Storage;
-use anyhow::Result;
+use crate::Result;
 use arrow_array::RecordBatch;
 use std::sync::Arc;
 
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
index 3190dec..6485c1d 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/lib.rs
@@ -48,3 +48,49 @@ pub mod file_group;
 pub mod storage;
 pub mod table;
 pub mod util;
+
+use thiserror::Error;
+
+#[derive(Error, Debug)]
+pub enum CoreError {
+    #[error("Config '{0}' not found")]
+    ConfigNotFound(String),
+
+    #[error("Invalid config item '{item}', {source:?}")]
+    InvalidConfig {
+        item: &'static str,
+        source: Box<dyn std::error::Error + Sync + Send + 'static>,
+    },
+
+    #[error("Parse url '{url}' failed, {source}")]
+    UrlParse {
+        url: String,
+        source: url::ParseError,
+    },
+
+    #[error("Invalid file path '{name}', {detail}")]
+    InvalidPath { name: String, detail: String },
+
+    #[error("{0}")]
+    Unsupported(String),
+
+    #[error("{0}")]
+    Internal(String),
+
+    #[error(transparent)]
+    Utf8Error(#[from] std::str::Utf8Error),
+
+    #[error(transparent)]
+    ObjectStore(#[from] object_store::Error),
+
+    #[error(transparent)]
+    ObjectStorePath(#[from] object_store::path::Error),
+
+    #[error(transparent)]
+    Parquet(#[from] parquet::errors::ParquetError),
+
+    #[error(transparent)]
+    Arrow(#[from] arrow::error::ArrowError),
+}
+
+type Result<T, E = CoreError> = std::result::Result<T, E>;
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 9a4aae1..0a9f0e1 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -22,11 +22,6 @@ use std::collections::HashMap;
 use std::path::PathBuf;
 use std::sync::Arc;
 
-use crate::config::table::HudiTableConfig;
-use crate::config::HudiConfigs;
-use crate::storage::file_info::FileInfo;
-use crate::storage::utils::join_url_segments;
-use anyhow::{anyhow, Context, Result};
 use arrow::compute::concat_batches;
 use arrow::record_batch::RecordBatch;
 use async_recursion::async_recursion;
@@ -39,6 +34,12 @@ use parquet::arrow::ParquetRecordBatchStreamBuilder;
 use parquet::file::metadata::ParquetMetaData;
 use url::Url;
 
+use crate::config::table::HudiTableConfig;
+use crate::config::HudiConfigs;
+use crate::storage::file_info::FileInfo;
+use crate::storage::utils::join_url_segments;
+use crate::{CoreError, Result};
+
 pub mod file_info;
 pub mod file_stats;
 pub mod utils;
@@ -60,10 +61,10 @@ impl Storage {
         hudi_configs: Arc<HudiConfigs>,
     ) -> Result<Arc<Storage>> {
         if !hudi_configs.contains(HudiTableConfig::BasePath) {
-            return Err(anyhow!(
+            return Err(CoreError::Internal(format!(
                 "Failed to create storage: {} is required.",
                 HudiTableConfig::BasePath.as_ref()
-            ));
+            )));
         }
 
         let base_url = hudi_configs.get(HudiTableConfig::BasePath)?.to_url()?;
@@ -75,7 +76,7 @@ impl Storage {
                 options,
                 hudi_configs,
             })),
-            Err(e) => Err(anyhow!("Failed to create storage: {}", e)),
+            Err(e) => Err(CoreError::ObjectStore(e)),
         }
     }
 
@@ -108,7 +109,10 @@ impl Storage {
         let uri = obj_url.to_string();
         let name = obj_path
             .filename()
-            .ok_or(anyhow!("Failed to get file name for {}", obj_path))?
+            .ok_or(CoreError::InvalidPath {
+                name: obj_path.to_string(),
+                detail: "failed to get file name".to_string(),
+            })?
             .to_string();
         Ok(FileInfo {
             uri,
@@ -156,16 +160,14 @@ impl Storage {
         let mut batches = Vec::new();
 
         while let Some(r) = stream.next().await {
-            let batch = r.context("Failed to read record batch.")?;
-            batches.push(batch)
+            batches.push(r?)
         }
 
         if batches.is_empty() {
             return Ok(RecordBatch::new_empty(schema.clone()));
         }
 
-        concat_batches(&schema, &batches)
-            .map_err(|e| anyhow!("Failed to concat record batches: {}", e))
+        Ok(concat_batches(&schema, &batches)?)
     }
 
     pub async fn list_dirs(&self, subdir: Option<&str>) -> Result<Vec<String>> 
{
@@ -174,7 +176,10 @@ impl Storage {
         for dir in dir_paths {
             dirs.push(
                 dir.filename()
-                    .ok_or(anyhow!("Failed to get file name for {}", dir))?
+                    .ok_or(CoreError::InvalidPath {
+                        name: dir.to_string(),
+                        detail: "failed to get file name".to_string(),
+                    })?
                     .to_string(),
             )
         }
@@ -203,10 +208,10 @@ impl Storage {
             let name = obj_meta
                 .location
                 .filename()
-                .ok_or(anyhow!(
-                    "Failed to get file name for {:?}",
-                    obj_meta.location
-                ))?
+                .ok_or(CoreError::InvalidPath {
+                    name: obj_meta.location.to_string(),
+                    detail: "failed to get file name".to_string(),
+                })?
                 .to_string();
             let uri = join_url_segments(&prefix_url, &[&name])?.to_string();
             file_info.push(FileInfo {
@@ -241,9 +246,10 @@ pub async fn get_leaf_dirs(storage: &Storage, subdir: 
Option<&str>) -> Result<Ve
                 next_subdir.push(curr);
             }
             next_subdir.push(child_dir);
-            let next_subdir = next_subdir
-                .to_str()
-                .ok_or(anyhow!("Failed to convert path: {:?}", next_subdir))?;
+            let next_subdir = 
next_subdir.to_str().ok_or(CoreError::InvalidPath {
+                name: format!("{:?}", next_subdir),
+                detail: "failed to convert path".to_string(),
+            })?;
             let curr_leaf_dir = get_leaf_dirs(storage, 
Some(next_subdir)).await?;
             leaf_dirs.extend(curr_leaf_dir);
         }
@@ -293,10 +299,7 @@ mod tests {
             result.is_err(),
             "Should return error when no base path is invalid."
         );
-        assert!(result
-            .unwrap_err()
-            .to_string()
-            .contains("Failed to create storage"));
+        assert!(matches!(result.unwrap_err(), CoreError::ObjectStore(_)));
     }
 
     #[tokio::test]
diff --git a/crates/core/src/storage/utils.rs b/crates/core/src/storage/utils.rs
index 2413613..10198a1 100644
--- a/crates/core/src/storage/utils.rs
+++ b/crates/core/src/storage/utils.rs
@@ -16,13 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-//! Utility functions for storage.
-use std::path::{Path, PathBuf};
-use std::str::FromStr;
 
-use anyhow::{anyhow, Result};
+//! Utility functions for storage.
+use std::path::Path;
 use url::{ParseError, Url};
 
+use crate::{
+    CoreError::{InvalidPath, UrlParse},
+    Result,
+};
+
 /// Splits a filename into a stem and an extension.
 pub fn split_filename(filename: &str) -> Result<(String, String)> {
     let path = Path::new(filename);
@@ -30,7 +33,10 @@ pub fn split_filename(filename: &str) -> Result<(String, 
String)> {
     let stem = path
         .file_stem()
         .and_then(|s| s.to_str())
-        .ok_or_else(|| anyhow!("No file stem found"))?
+        .ok_or_else(|| InvalidPath {
+            name: filename.to_string(),
+            detail: "no file stem found".to_string(),
+        })?
         .to_string();
 
     let extension = path
@@ -44,13 +50,20 @@ pub fn split_filename(filename: &str) -> Result<(String, 
String)> {
 
 /// Parses a URI string into a URL.
 pub fn parse_uri(uri: &str) -> Result<Url> {
-    let mut url = Url::parse(uri)
-        .or(Url::from_file_path(PathBuf::from_str(uri)?))
-        .map_err(|_| anyhow!("Failed to parse uri: {}", uri))?;
+    let mut url = match Url::parse(uri) {
+        Ok(url) => url,
+        Err(source) => Url::from_directory_path(uri).map_err(|_| UrlParse {
+            url: uri.to_string(),
+            source,
+        })?,
+    };
 
     if url.path().ends_with('/') {
         url.path_segments_mut()
-            .map_err(|_| anyhow!("Failed to parse uri: {}", uri))?
+            .map_err(|_| InvalidPath {
+                name: uri.to_string(),
+                detail: "parse uri failed".to_string(),
+            })?
             .pop();
     }
 
@@ -73,7 +86,10 @@ pub fn join_url_segments(base_url: &Url, segments: &[&str]) 
-> Result<Url> {
     for &seg in segments {
         let segs: Vec<_> = seg.split('/').filter(|&s| !s.is_empty()).collect();
         url.path_segments_mut()
-            .map_err(|_| ParseError::RelativeUrlWithoutBase)?
+            .map_err(|_| UrlParse {
+                url: base_url.to_string(),
+                source: ParseError::RelativeUrlWithoutBase,
+            })?
             .extend(segs);
     }
 
diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs
index c037a0c..eca7d2b 100644
--- a/crates/core/src/table/builder.rs
+++ b/crates/core/src/table/builder.rs
@@ -17,7 +17,6 @@
  * under the License.
  */
 
-use anyhow::{anyhow, Context, Result};
 use paste::paste;
 use std::collections::HashMap;
 use std::env;
@@ -38,6 +37,7 @@ use crate::storage::Storage;
 use crate::table::fs_view::FileSystemView;
 use crate::table::timeline::Timeline;
 use crate::table::Table;
+use crate::{CoreError, Result};
 
 /// Builder for creating a [Table] instance.
 #[derive(Debug, Clone)]
@@ -112,13 +112,10 @@ impl TableBuilder {
         let hudi_configs = Arc::from(hudi_configs);
         let storage_options = Arc::from(self.storage_options.clone());
 
-        let timeline = Timeline::new(hudi_configs.clone(), 
storage_options.clone())
-            .await
-            .context("Failed to load timeline")?;
+        let timeline = Timeline::new(hudi_configs.clone(), 
storage_options.clone()).await?;
 
-        let file_system_view = FileSystemView::new(hudi_configs.clone(), 
storage_options.clone())
-            .await
-            .context("Failed to load file system view")?;
+        let file_system_view =
+            FileSystemView::new(hudi_configs.clone(), 
storage_options.clone()).await?;
 
         Ok(Table {
             hudi_configs,
@@ -253,22 +250,26 @@ impl TableBuilder {
         // additional validation
         let table_type = hudi_configs.get(TableType)?.to::<String>();
         if TableTypeValue::from_str(&table_type)? != CopyOnWrite {
-            return Err(anyhow!("Only support copy-on-write table."));
+            return Err(CoreError::Unsupported(
+                "Only support copy-on-write table.".to_string(),
+            ));
         }
 
         let table_version = hudi_configs.get(TableVersion)?.to::<isize>();
         if !(5..=6).contains(&table_version) {
-            return Err(anyhow!("Only support table version 5 and 6."));
+            return Err(CoreError::Unsupported(
+                "Only support table version 5 and 6.".to_string(),
+            ));
         }
 
         let drops_partition_cols = hudi_configs
             .get_or_default(DropsPartitionFields)
             .to::<bool>();
         if drops_partition_cols {
-            return Err(anyhow!(
+            return Err(CoreError::Unsupported(format!(
                 "Only support when `{}` is disabled",
                 DropsPartitionFields.as_ref()
-            ));
+            )));
         }
 
         Ok(())
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index f2309e3..ee68eec 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -24,8 +24,9 @@ use crate::config::HudiConfigs;
 use crate::file_group::{BaseFile, FileGroup, FileSlice};
 use crate::storage::file_info::FileInfo;
 use crate::storage::{get_leaf_dirs, Storage};
+
 use crate::table::partition::PartitionPruner;
-use anyhow::Result;
+use crate::{CoreError, Result};
 use dashmap::DashMap;
 use futures::stream::{self, StreamExt, TryStreamExt};
 
@@ -134,7 +135,7 @@ impl FileSystemView {
             .map(|path| async move {
                 let file_groups =
                     Self::load_file_groups_for_partition(&self.storage, 
&path).await?;
-                Ok::<_, anyhow::Error>((path, file_groups))
+                Ok::<_, CoreError>((path, file_groups))
             })
             // TODO parameterize the parallelism for partition loading
             .buffer_unordered(10)
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 4d69bbf..4836d1c 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -87,7 +87,6 @@
 use std::collections::{HashMap, HashSet};
 use std::sync::Arc;
 
-use anyhow::{anyhow, Context, Result};
 use arrow::record_batch::RecordBatch;
 use arrow_schema::{Field, Schema};
 use url::Url;
@@ -102,6 +101,7 @@ use crate::table::builder::TableBuilder;
 use crate::table::fs_view::FileSystemView;
 use crate::table::partition::PartitionPruner;
 use crate::table::timeline::Timeline;
+use crate::{CoreError, Result};
 
 pub mod builder;
 mod fs_view;
@@ -161,7 +161,6 @@ impl Table {
             .register_object_store(runtime_env.clone());
     }
 
-    /// Get the latest [Schema] of the table.
     pub async fn get_schema(&self) -> Result<Schema> {
         self.timeline.get_latest_schema().await
     }
@@ -259,16 +258,18 @@ impl Table {
         timestamp: &str,
         filters: &[(&str, &str, &str)],
     ) -> Result<Vec<RecordBatch>> {
-        let file_slices = self
-            .get_file_slices_as_of(timestamp, filters)
-            .await
-            .context(format!("Failed to get file slices as of {}", 
timestamp))?;
+        let file_slices = self.get_file_slices_as_of(timestamp, 
filters).await?;
         let mut batches = Vec::new();
         let fg_reader = self.create_file_group_reader();
         for f in file_slices {
             match fg_reader.read_file_slice(&f).await {
                 Ok(batch) => batches.push(batch),
-                Err(e) => return Err(anyhow!("Failed to read file slice {:?} - 
{}", f, e)),
+                Err(e) => {
+                    return Err(CoreError::Internal(format!(
+                        "Failed to read file slice {:?} - {}",
+                        f, e
+                    )))
+                }
             }
         }
         Ok(batches)
diff --git a/crates/core/src/table/partition.rs 
b/crates/core/src/table/partition.rs
index 17927eb..8f03521 100644
--- a/crates/core/src/table/partition.rs
+++ b/crates/core/src/table/partition.rs
@@ -18,8 +18,7 @@
  */
 use crate::config::table::HudiTableConfig;
 use crate::config::HudiConfigs;
-use anyhow::Result;
-use anyhow::{anyhow, Context};
+use crate::{CoreError, Result};
 use arrow_array::{ArrayRef, Scalar, StringArray};
 use arrow_cast::{cast_with_options, CastOptions};
 use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
@@ -120,11 +119,11 @@ impl PartitionPruner {
         let parts: Vec<&str> = partition_path.split('/').collect();
 
         if parts.len() != self.schema.fields().len() {
-            return Err(anyhow!(
+            return Err(CoreError::Internal(format!(
                 "Partition path should have {} part(s) but got {}",
                 self.schema.fields().len(),
                 parts.len()
-            ));
+            )));
         }
 
         self.schema
@@ -134,14 +133,17 @@ impl PartitionPruner {
             .map(|(field, part)| {
                 let value = if self.is_hive_style {
                     let (name, value) = part.split_once('=').ok_or_else(|| {
-                        anyhow!("Partition path should be hive-style but got 
{}", part)
+                        CoreError::Internal(format!(
+                            "Partition path should be hive-style but got {}",
+                            part
+                        ))
                     })?;
                     if name != field.name() {
-                        return Err(anyhow!(
+                        return Err(CoreError::Internal(format!(
                             "Partition path should contain {} but got {}",
                             field.name(),
                             name
-                        ));
+                        )));
                     }
                     value
                 } else {
@@ -177,13 +179,13 @@ impl Operator {
 }
 
 impl FromStr for Operator {
-    type Err = anyhow::Error;
+    type Err = crate::CoreError;
 
     fn from_str(s: &str) -> Result<Self> {
         Operator::TOKEN_OP_PAIRS
             .iter()
             .find_map(|&(token, op)| if token == s { Some(op) } else { None })
-            .ok_or_else(|| anyhow!("Unsupported operator: {}", s))
+            .ok_or_else(|| CoreError::Internal(format!("Unsupported operator: 
{}", s)))
     }
 }
 
@@ -196,21 +198,17 @@ pub struct PartitionFilter {
 }
 
 impl TryFrom<((&str, &str, &str), &Schema)> for PartitionFilter {
-    type Error = anyhow::Error;
+    type Error = crate::CoreError;
 
     fn try_from((filter, partition_schema): ((&str, &str, &str), &Schema)) -> 
Result<Self> {
         let (field_name, operator_str, value_str) = filter;
 
-        let field: &Field = partition_schema
-            .field_with_name(field_name)
-            .with_context(|| format!("Field '{}' not found in partition 
schema", field_name))?;
+        let field: &Field = partition_schema.field_with_name(field_name)?;
 
-        let operator = Operator::from_str(operator_str)
-            .with_context(|| format!("Unsupported operator: {}", 
operator_str))?;
+        let operator = Operator::from_str(operator_str)?;
 
         let value = &[value_str];
-        let value = Self::cast_value(value, field.data_type())
-            .with_context(|| format!("Unable to cast {:?} as {:?}", value, 
field.data_type()))?;
+        let value = Self::cast_value(value, field.data_type())?;
 
         let field = field.clone();
         Ok(PartitionFilter {
@@ -290,7 +288,7 @@ mod tests {
         assert!(filter
             .unwrap_err()
             .to_string()
-            .contains("not found in partition schema"));
+            .contains("Unable to get field named"));
     }
 
     #[test]
@@ -311,7 +309,10 @@ mod tests {
         let filter_tuple = ("count", "=", "not_a_number");
         let filter = PartitionFilter::try_from((filter_tuple, &schema));
         assert!(filter.is_err());
-        assert!(filter.unwrap_err().to_string().contains("Unable to cast"));
+        assert!(filter
+            .unwrap_err()
+            .to_string()
+            .contains("Cannot cast string"));
     }
 
     #[test]
diff --git a/crates/core/src/table/timeline.rs 
b/crates/core/src/table/timeline.rs
index 2993c95..d147abf 100644
--- a/crates/core/src/table/timeline.rs
+++ b/crates/core/src/table/timeline.rs
@@ -23,7 +23,6 @@ use std::fmt::Debug;
 use std::path::PathBuf;
 use std::sync::Arc;
 
-use anyhow::{anyhow, Context, Result};
 use arrow_schema::Schema;
 use parquet::arrow::parquet_to_arrow_schema;
 use serde_json::{Map, Value};
@@ -32,6 +31,7 @@ use crate::config::HudiConfigs;
 use crate::file_group::FileGroup;
 use crate::storage::utils::split_filename;
 use crate::storage::Storage;
+use crate::{CoreError, Result};
 
 /// The [State] of an [Instant] represents the status of the action performed 
on the table.
 #[allow(dead_code)]
@@ -80,7 +80,10 @@ impl Instant {
         commit_file_path.push(self.file_name());
         commit_file_path
             .to_str()
-            .ok_or(anyhow!("Failed to get file path for {:?}", self))
+            .ok_or(CoreError::Internal(format!(
+                "Failed to get file path for {:?}",
+                self
+            )))
             .map(|s| s.to_string())
     }
 
@@ -140,10 +143,11 @@ impl Timeline {
             .storage
             .get_file_data(instant.relative_path()?.as_str())
             .await?;
-        let json: Value = serde_json::from_slice(&bytes)?;
+        let json: Value = serde_json::from_slice(&bytes)
+            .map_err(|e| CoreError::Internal(format!("Invalid instant {:?}", 
e)))?;
         let commit_metadata = json
             .as_object()
-            .ok_or_else(|| anyhow!("Expected JSON object"))?
+            .ok_or_else(|| CoreError::Internal("Expected JSON 
object".to_string()))?
             .clone();
         Ok(commit_metadata)
     }
@@ -167,17 +171,15 @@ impl Timeline {
             .and_then(|first_value| first_value["path"].as_str());
 
         if let Some(path) = parquet_path {
-            let parquet_meta = self
-                .storage
-                .get_parquet_file_metadata(path)
-                .await
-                .context("Failed to get parquet file metadata")?;
-
-            
parquet_to_arrow_schema(parquet_meta.file_metadata().schema_descr(), None)
-                .context("Failed to resolve the latest schema")
+            let parquet_meta = 
self.storage.get_parquet_file_metadata(path).await?;
+
+            Ok(parquet_to_arrow_schema(
+                parquet_meta.file_metadata().schema_descr(),
+                None,
+            )?)
         } else {
-            Err(anyhow!(
-                "Failed to resolve the latest schema: no file path found"
+            Err(CoreError::Internal(
+                "Failed to resolve the latest schema: no file path 
found".to_string(),
             ))
         }
     }
diff --git a/python/Cargo.toml b/python/Cargo.toml
index 9033667..1861c65 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -39,7 +39,7 @@ arrow = { workspace = true }
 arrow-schema = { workspace = true }
 
 # "stdlib"
-anyhow = { workspace = true }
+thiserror = { workspace = true }
 
 # runtime / async
 futures = { workspace = true }
@@ -47,4 +47,4 @@ tokio = { workspace = true }
 
 [dependencies.pyo3]
 version = "0.22.2"
-features = ["extension-module", "abi3", "abi3-py39", "anyhow"]
+features = ["extension-module", "abi3", "abi3-py39"]
diff --git a/python/src/internal.rs b/python/src/internal.rs
index 37201cd..549a503 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -18,10 +18,10 @@
  */
 
 use std::collections::HashMap;
+use std::convert::From;
 use std::path::PathBuf;
 use std::sync::OnceLock;
 
-use anyhow::Context;
 use arrow::pyarrow::ToPyArrow;
 use pyo3::{pyclass, pyfunction, pymethods, PyErr, PyObject, PyResult, Python};
 use tokio::runtime::Runtime;
@@ -32,6 +32,30 @@ use hudi::table::builder::TableBuilder;
 use hudi::table::Table;
 use hudi::util::convert_vec_to_slice;
 use hudi::util::vec_to_slice;
+use hudi::CoreError;
+use pyo3::create_exception;
+use pyo3::exceptions::PyException;
+
+create_exception!(_internal, HudiCoreError, PyException);
+
+fn convert_to_py_err(err: CoreError) -> PyErr {
+    // TODO(xushiyan): match and map all sub types
+    HudiCoreError::new_err(err.to_string())
+}
+
+#[derive(thiserror::Error, Debug)]
+pub enum PythonError {
+    #[error("Error in Hudi core")]
+    HudiCore(#[from] CoreError),
+}
+
+impl From<PythonError> for PyErr {
+    fn from(err: PythonError) -> PyErr {
+        match err {
+            PythonError::HudiCore(err) => convert_to_py_err(err),
+        }
+    }
+}
 
 #[cfg(not(tarpaulin))]
 #[derive(Clone, Debug)]
@@ -46,7 +70,8 @@ impl HudiFileGroupReader {
     #[new]
     #[pyo3(signature = (base_uri, options=None))]
     fn new(base_uri: &str, options: Option<HashMap<String, String>>) -> 
PyResult<Self> {
-        let inner = FileGroupReader::new_with_options(base_uri, 
options.unwrap_or_default())?;
+        let inner = FileGroupReader::new_with_options(base_uri, 
options.unwrap_or_default())
+            .map_err(PythonError::from)?;
         Ok(HudiFileGroupReader { inner })
     }
 
@@ -55,7 +80,8 @@ impl HudiFileGroupReader {
         relative_path: &str,
         py: Python,
     ) -> PyResult<PyObject> {
-        
rt().block_on(self.inner.read_file_slice_by_base_file_path(relative_path))?
+        
rt().block_on(self.inner.read_file_slice_by_base_file_path(relative_path))
+            .map_err(PythonError::from)?
             .to_pyarrow(py)
     }
 }
@@ -84,15 +110,17 @@ pub struct HudiFileSlice {
 #[pymethods]
 impl HudiFileSlice {
     fn base_file_relative_path(&self) -> PyResult<String> {
-        PathBuf::from(&self.partition_path)
+        Ok(PathBuf::from(&self.partition_path)
             .join(&self.base_file_name)
             .to_str()
             .map(String::from)
-            .context(format!(
-                "Failed to get base file relative path for file slice: {:?}",
-                self
-            ))
-            .map_err(PyErr::from)
+            .ok_or_else(|| {
+                CoreError::Internal(format!(
+                    "Failed to get base file relative path for file slice: 
{:?}",
+                    self
+                ))
+            })
+            .map_err(PythonError::from)?)
     }
 }
 
@@ -132,10 +160,12 @@ impl HudiTable {
         base_uri: &str,
         options: Option<HashMap<String, String>>,
     ) -> PyResult<Self> {
-        let inner: Table = rt().block_on(Table::new_with_options(
-            base_uri,
-            options.unwrap_or_default(),
-        ))?;
+        let inner: Table = rt()
+            .block_on(Table::new_with_options(
+                base_uri,
+                options.unwrap_or_default(),
+            ))
+            .map_err(PythonError::from)?;
         Ok(HudiTable { inner })
     }
 
@@ -148,11 +178,14 @@ impl HudiTable {
     }
 
     fn get_schema(&self, py: Python) -> PyResult<PyObject> {
-        rt().block_on(self.inner.get_schema())?.to_pyarrow(py)
+        rt().block_on(self.inner.get_schema())
+            .map_err(PythonError::from)?
+            .to_pyarrow(py)
     }
 
     fn get_partition_schema(&self, py: Python) -> PyResult<PyObject> {
-        rt().block_on(self.inner.get_partition_schema())?
+        rt().block_on(self.inner.get_partition_schema())
+            .map_err(PythonError::from)?
             .to_pyarrow(py)
     }
 
@@ -164,10 +197,12 @@ impl HudiTable {
         py: Python,
     ) -> PyResult<Vec<Vec<HudiFileSlice>>> {
         py.allow_threads(|| {
-            let file_slices = rt().block_on(
-                self.inner
-                    .get_file_slices_splits(n, 
vec_to_slice!(filters.unwrap_or_default())),
-            )?;
+            let file_slices = rt()
+                .block_on(
+                    self.inner
+                        .get_file_slices_splits(n, 
vec_to_slice!(filters.unwrap_or_default())),
+                )
+                .map_err(PythonError::from)?;
             Ok(file_slices
                 .iter()
                 .map(|inner_vec| 
inner_vec.iter().map(convert_file_slice).collect())
@@ -182,10 +217,12 @@ impl HudiTable {
         py: Python,
     ) -> PyResult<Vec<HudiFileSlice>> {
         py.allow_threads(|| {
-            let file_slices = rt().block_on(
-                self.inner
-                    
.get_file_slices(vec_to_slice!(filters.unwrap_or_default())),
-            )?;
+            let file_slices = rt()
+                .block_on(
+                    self.inner
+                        
.get_file_slices(vec_to_slice!(filters.unwrap_or_default())),
+                )
+                .map_err(PythonError::from)?;
             Ok(file_slices.iter().map(convert_file_slice).collect())
         })
     }
@@ -204,7 +241,8 @@ impl HudiTable {
         rt().block_on(
             self.inner
                 .read_snapshot(vec_to_slice!(filters.unwrap_or_default())),
-        )?
+        )
+        .map_err(PythonError::from)?
         .to_pyarrow(py)
     }
 }
@@ -218,13 +256,15 @@ pub fn build_hudi_table(
     storage_options: Option<HashMap<String, String>>,
     options: Option<HashMap<String, String>>,
 ) -> PyResult<HudiTable> {
-    let inner = rt().block_on(
-        TableBuilder::from_base_uri(&base_uri)
-            .with_hudi_options(hudi_options.unwrap_or_default())
-            .with_storage_options(storage_options.unwrap_or_default())
-            .with_options(options.unwrap_or_default())
-            .build(),
-    )?;
+    let inner = rt()
+        .block_on(
+            TableBuilder::from_base_uri(&base_uri)
+                .with_hudi_options(hudi_options.unwrap_or_default())
+                .with_storage_options(storage_options.unwrap_or_default())
+                .with_options(options.unwrap_or_default())
+                .build(),
+        )
+        .map_err(PythonError::from)?;
     Ok(HudiTable { inner })
 }
 


Reply via email to