This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 199a25d feat: add APIs for time-travel read (#33)
199a25d is described below
commit 199a25d82ba09c0bedeb12430bf22299603209b2
Author: Shiyan Xu <[email protected]>
AuthorDate: Tue Jul 2 15:48:05 2024 -0500
feat: add APIs for time-travel read (#33)
---
crates/core/src/file_group/mod.rs | 27 ++++++++---
crates/core/src/storage/mod.rs | 12 ++---
crates/core/src/table/fs_view.rs | 38 +++++++++------
crates/core/src/table/mod.rs | 98 +++++++++++++++++++++++++++++++++++----
crates/core/src/table/timeline.rs | 70 ++++++++++++++++++++--------
crates/datafusion/src/lib.rs | 14 ++----
crates/tests/src/lib.rs | 7 +++
python/hudi/_internal.pyi | 12 +++--
python/hudi/table.py | 22 +++++----
python/src/lib.rs | 31 ++++++++-----
python/tests/test_table_read.py | 44 ++++++++++++++----
11 files changed, 279 insertions(+), 96 deletions(-)
diff --git a/crates/core/src/file_group/mod.rs
b/crates/core/src/file_group/mod.rs
index 6b9b22c..c0af0b3 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -100,7 +100,7 @@ impl FileSlice {
if self.base_file.stats.is_none() {
let parquet_meta = storage
.get_parquet_file_metadata(&self.base_file_relative_path())
- .await;
+ .await?;
let num_records = parquet_meta.file_metadata().num_rows();
let stats = FileStats { num_records };
self.base_file.stats = Some(stats);
@@ -163,12 +163,22 @@ impl FileGroup {
}
}
- pub fn get_latest_file_slice(&self) -> Option<&FileSlice> {
- return self.file_slices.values().next_back();
+ pub fn get_file_slice_as_of(&self, timestamp: &str) -> Option<&FileSlice> {
+ let as_of = timestamp.to_string();
+ return if let Some((_, file_slice)) =
self.file_slices.range(..=as_of).next_back() {
+ Some(file_slice)
+ } else {
+ None
+ };
}
- pub fn get_latest_file_slice_mut(&mut self) -> Option<&mut FileSlice> {
- return self.file_slices.values_mut().next_back();
+ pub fn get_file_slice_mut_as_of(&mut self, timestamp: &str) -> Option<&mut
FileSlice> {
+ let as_of = timestamp.to_string();
+ return if let Some((_, file_slice)) =
self.file_slices.range_mut(..=as_of).next_back() {
+ Some(file_slice)
+ } else {
+ None
+ };
}
}
@@ -203,8 +213,11 @@ mod tests {
let commit_times: Vec<&str> = fg.file_slices.keys().map(|k|
k.as_str()).collect();
assert_eq!(commit_times, vec!["20240402123035233",
"20240402144910683"]);
assert_eq!(
- fg.get_latest_file_slice().unwrap().base_file.commit_time,
- "20240402144910683"
+ fg.get_file_slice_as_of("20240402123035233")
+ .unwrap()
+ .base_file
+ .commit_time,
+ "20240402123035233"
)
}
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 76e085f..0f09c05 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -72,14 +72,14 @@ impl Storage {
}
}
- pub async fn get_parquet_file_metadata(&self, relative_path: &str) ->
ParquetMetaData {
- let obj_url = join_url_segments(&self.base_url,
&[relative_path]).unwrap();
- let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap();
+ pub async fn get_parquet_file_metadata(&self, relative_path: &str) ->
Result<ParquetMetaData> {
+ let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
+ let obj_path = ObjPath::from_url_path(obj_url.path())?;
let obj_store = self.object_store.clone();
- let meta = obj_store.head(&obj_path).await.unwrap();
+ let meta = obj_store.head(&obj_path).await?;
let reader = ParquetObjectReader::new(obj_store, meta);
- let builder =
ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
- builder.metadata().as_ref().to_owned()
+ let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
+ Ok(builder.metadata().as_ref().to_owned())
}
pub async fn get_file_data(&self, relative_path: &str) -> Bytes {
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index 8f278dd..b7cd77c 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -44,7 +44,7 @@ impl FileSystemView {
props: Arc<HashMap<String, String>>,
) -> Result<Self> {
let storage = Storage::new(base_url, storage_options)?;
- let partition_paths = Self::get_partition_paths(&storage).await?;
+ let partition_paths = Self::load_partition_paths(&storage).await?;
let partition_to_file_groups =
Self::load_file_groups_for_partitions(&storage,
partition_paths).await?;
let partition_to_file_groups =
Arc::new(DashMap::from_iter(partition_to_file_groups));
@@ -55,7 +55,7 @@ impl FileSystemView {
})
}
- async fn get_partition_paths(storage: &Storage) -> Result<Vec<String>> {
+ async fn load_partition_paths(storage: &Storage) -> Result<Vec<String>> {
let top_level_dirs: Vec<String> = storage
.list_dirs(None)
.await
@@ -120,24 +120,25 @@ impl FileSystemView {
Ok(file_groups)
}
- pub fn get_latest_file_slices(&self) -> Result<Vec<FileSlice>> {
+ pub fn get_file_slices_as_of(&self, timestamp: &str) ->
Result<Vec<FileSlice>> {
let mut file_slices = Vec::new();
for fgs in self.partition_to_file_groups.iter() {
let fgs_ref = fgs.value();
for fg in fgs_ref {
- if let Some(fsl) = fg.get_latest_file_slice() {
- file_slices.push(fsl.clone())
+ if let Some(fsl) = fg.get_file_slice_as_of(timestamp) {
+ // TODO: pass ref instead of copying
+ file_slices.push(fsl.clone());
}
}
}
Ok(file_slices)
}
- pub async fn load_latest_file_slices_stats(&self) -> Result<()> {
+ pub async fn load_file_slices_stats_as_of(&self, timestamp: &str) ->
Result<()> {
for mut fgs in self.partition_to_file_groups.iter_mut() {
let fgs_ref = fgs.value_mut();
for fg in fgs_ref {
- if let Some(file_slice) = fg.get_latest_file_slice_mut() {
+ if let Some(file_slice) =
fg.get_file_slice_mut_as_of(timestamp) {
file_slice
.load_stats(&self.storage)
.await
@@ -148,11 +149,17 @@ impl FileSystemView {
Ok(())
}
- pub async fn read_file_slice_by_path(&self, relative_path: &str) ->
Result<Vec<RecordBatch>> {
+ pub async fn read_file_slice_by_path_unchecked(
+ &self,
+ relative_path: &str,
+ ) -> Result<Vec<RecordBatch>> {
Ok(self.storage.get_parquet_file_data(relative_path).await)
}
- pub async fn read_file_slice(&self, file_slice: &FileSlice) ->
Result<Vec<RecordBatch>> {
- self.read_file_slice_by_path(&file_slice.base_file_relative_path())
+ pub async fn read_file_slice_unchecked(
+ &self,
+ file_slice: &FileSlice,
+ ) -> Result<Vec<RecordBatch>> {
+
self.read_file_slice_by_path_unchecked(&file_slice.base_file_relative_path())
.await
}
}
@@ -171,7 +178,9 @@ mod tests {
async fn get_partition_paths_for_nonpartitioned_table() {
let base_url = TestTable::V6Nonpartitioned.url();
let storage = Storage::new(Arc::new(base_url),
Arc::new(HashMap::new())).unwrap();
- let partition_paths =
FileSystemView::get_partition_paths(&storage).await.unwrap();
+ let partition_paths = FileSystemView::load_partition_paths(&storage)
+ .await
+ .unwrap();
let partition_path_set: HashSet<&str> =
HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
assert_eq!(partition_path_set, HashSet::from([""]))
@@ -181,7 +190,9 @@ mod tests {
async fn get_partition_paths_for_complexkeygen_table() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
let storage = Storage::new(Arc::new(base_url),
Arc::new(HashMap::new())).unwrap();
- let partition_paths =
FileSystemView::get_partition_paths(&storage).await.unwrap();
+ let partition_paths = FileSystemView::load_partition_paths(&storage)
+ .await
+ .unwrap();
let partition_path_set: HashSet<&str> =
HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
assert_eq!(
@@ -204,7 +215,8 @@ mod tests {
)
.await
.unwrap();
- let file_slices = fs_view.get_latest_file_slices().unwrap();
+
+ let file_slices =
fs_view.get_file_slices_as_of("20240418173551906").unwrap();
assert_eq!(file_slices.len(), 1);
let fg_ids = file_slices
.iter()
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 825b2d4..52daf12 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -115,11 +115,42 @@ impl Table {
}
pub async fn get_file_slices(&self) -> Result<Vec<FileSlice>> {
+ if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() {
+ self.get_file_slices_as_of(timestamp).await
+ } else {
+ Ok(Vec::new())
+ }
+ }
+
+ pub async fn get_file_slices_as_of(&self, timestamp: &str) ->
Result<Vec<FileSlice>> {
self.file_system_view
- .load_latest_file_slices_stats()
+ .load_file_slices_stats_as_of(timestamp)
.await
- .expect("Successful loading of file slice stats.");
- self.file_system_view.get_latest_file_slices()
+ .context("Fail to load file slice stats.")?;
+ self.file_system_view.get_file_slices_as_of(timestamp)
+ }
+
+ pub async fn read_snapshot(&self) -> Result<Vec<RecordBatch>> {
+ if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() {
+ self.read_snapshot_as_of(timestamp).await
+ } else {
+ Ok(Vec::new())
+ }
+ }
+
+ pub async fn read_snapshot_as_of(&self, timestamp: &str) ->
Result<Vec<RecordBatch>> {
+ let file_slices = self
+ .get_file_slices_as_of(timestamp)
+ .await
+ .context(format!("Failed to get file slices as of {}",
timestamp))?;
+ let mut batches = Vec::new();
+ for f in file_slices {
+ match self.file_system_view.read_file_slice_unchecked(&f).await {
+ Ok(batch) => batches.extend(batch),
+ Err(e) => return Err(anyhow!("Failed to read file slice {:?} -
{}", f, e)),
+ }
+ }
+ Ok(batches)
}
#[cfg(test)]
@@ -133,13 +164,9 @@ impl Table {
pub async fn read_file_slice_by_path(&self, relative_path: &str) ->
Result<Vec<RecordBatch>> {
self.file_system_view
- .read_file_slice_by_path(relative_path)
+ .read_file_slice_by_path_unchecked(relative_path)
.await
}
-
- pub async fn read_file_slice(&self, file_slice: &FileSlice) ->
Result<Vec<RecordBatch>> {
- self.file_system_view.read_file_slice(file_slice).await
- }
}
impl ProvidesTableMetadata for Table {
@@ -240,7 +267,7 @@ mod tests {
use crate::table::Table;
#[tokio::test]
- async fn hudi_table_get_latest_schema() {
+ async fn hudi_table_get_schema() {
let base_url = TestTable::V6Nonpartitioned.url();
let hudi_table = Table::new(base_url.path(),
HashMap::new()).await.unwrap();
let fields: Vec<String> = hudi_table
@@ -308,7 +335,7 @@ mod tests {
}
#[tokio::test]
- async fn hudi_table_get_latest_file_paths() {
+ async fn hudi_table_get_file_paths() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
let hudi_table = Table::new(base_url.path(),
HashMap::new()).await.unwrap();
assert_eq!(hudi_table.timeline.instants.len(), 2);
@@ -324,6 +351,57 @@ mod tests {
assert_eq!(actual, expected);
}
+ #[tokio::test]
+ async fn hudi_table_get_file_slices_as_of_timestamps() {
+ let base_url = TestTable::V6Nonpartitioned.url();
+ let hudi_table = Table::new(base_url.path(),
HashMap::new()).await.unwrap();
+
+ let file_slices = hudi_table.get_file_slices().await.unwrap();
+ assert_eq!(
+ file_slices
+ .iter()
+ .map(|f| f.base_file_relative_path())
+ .collect::<Vec<_>>(),
+
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",]
+ );
+
+ // as of the latest timestamp
+ let file_slices = hudi_table
+ .get_file_slices_as_of("20240418173551906")
+ .await
+ .unwrap();
+ assert_eq!(
+ file_slices
+ .iter()
+ .map(|f| f.base_file_relative_path())
+ .collect::<Vec<_>>(),
+
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",]
+ );
+
+ // as of just smaller than the latest timestamp
+ let file_slices = hudi_table
+ .get_file_slices_as_of("20240418173551905")
+ .await
+ .unwrap();
+ assert_eq!(
+ file_slices
+ .iter()
+ .map(|f| f.base_file_relative_path())
+ .collect::<Vec<_>>(),
+
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-182-253_20240418173550988.parquet",]
+ );
+
+ // as of non-exist old timestamp
+ let file_slices = hudi_table.get_file_slices_as_of("0").await.unwrap();
+ assert_eq!(
+ file_slices
+ .iter()
+ .map(|f| f.base_file_relative_path())
+ .collect::<Vec<_>>(),
+ Vec::<String>::new()
+ );
+ }
+
#[tokio::test]
async fn hudi_table_get_table_metadata() {
let base_path =
diff --git a/crates/core/src/table/timeline.rs
b/crates/core/src/table/timeline.rs
index 9dcf6e2..70fc6ee 100644
--- a/crates/core/src/table/timeline.rs
+++ b/crates/core/src/table/timeline.rs
@@ -23,7 +23,7 @@ use std::fmt::Debug;
use std::path::PathBuf;
use std::sync::Arc;
-use anyhow::{anyhow, Result};
+use anyhow::{anyhow, Context, Result};
use arrow_schema::Schema;
use parquet::arrow::parquet_to_arrow_schema;
use serde_json::{Map, Value};
@@ -112,6 +112,13 @@ impl Timeline {
Ok(completed_commits)
}
+ pub fn get_latest_commit_timestamp(&self) -> Option<&str> {
+ self.instants
+ .iter()
+ .next_back()
+ .map(|instant| instant.timestamp.as_str())
+ }
+
async fn get_latest_commit_metadata(&self) -> Result<Map<String, Value>> {
match self.instants.iter().next_back() {
Some(instant) => {
@@ -120,8 +127,8 @@ impl Timeline {
let relative_path = commit_file_path.to_str().ok_or(anyhow!(
"Failed to get commit file path for instant: {:?}",
instant
- ));
- let bytes = self.storage.get_file_data(relative_path?).await;
+ ))?;
+ let bytes = self.storage.get_file_data(relative_path).await;
let json: Value = serde_json::from_slice(&bytes)?;
let commit_metadata = json
.as_object()
@@ -135,22 +142,29 @@ impl Timeline {
pub async fn get_latest_schema(&self) -> Result<Schema> {
let commit_metadata = self.get_latest_commit_metadata().await?;
- if let Some(partition_to_write_stats) =
commit_metadata["partitionToWriteStats"].as_object()
- {
- if let Some((_, value)) = partition_to_write_stats.iter().next() {
- if let Some(first_value) = value.as_array().and_then(|arr|
arr.first()) {
- if let Some(path) = first_value["path"].as_str() {
- let parquet_meta =
self.storage.get_parquet_file_metadata(path).await;
- let arrow_schema = parquet_to_arrow_schema(
- parquet_meta.file_metadata().schema_descr(),
- None,
- )?;
- return Ok(arrow_schema);
- }
- }
- }
+
+ let parquet_path = commit_metadata
+ .get("partitionToWriteStats")
+ .and_then(|v| v.as_object())
+ .and_then(|obj| obj.values().next())
+ .and_then(|value| value.as_array())
+ .and_then(|arr| arr.first())
+ .and_then(|first_value| first_value["path"].as_str());
+
+ if let Some(path) = parquet_path {
+ let parquet_meta = self
+ .storage
+ .get_parquet_file_metadata(path)
+ .await
+ .context("Failed to get parquet file metadata")?;
+
+
parquet_to_arrow_schema(parquet_meta.file_metadata().schema_descr(), None)
+ .context("Failed to resolve the latest schema")
+ } else {
+ Err(anyhow!(
+ "Failed to resolve the latest schema: no file path found"
+ ))
}
- Err(anyhow!("Failed to resolve schema."))
}
}
@@ -168,7 +182,7 @@ mod tests {
use crate::table::timeline::{Instant, State, Timeline};
#[tokio::test]
- async fn read_latest_schema() {
+ async fn timeline_read_latest_schema() {
let base_url = TestTable::V6Nonpartitioned.url();
let timeline = Timeline::new(
Arc::new(base_url),
@@ -181,6 +195,24 @@ mod tests {
assert_eq!(table_schema.fields.len(), 21)
}
+ #[tokio::test]
+ async fn timeline_read_latest_schema_from_empty_table() {
+ let base_url = TestTable::V6Empty.url();
+ let timeline = Timeline::new(
+ Arc::new(base_url),
+ Arc::new(HashMap::new()),
+ Arc::new(HashMap::new()),
+ )
+ .await
+ .unwrap();
+ let table_schema = timeline.get_latest_schema().await;
+ assert!(table_schema.is_err());
+ assert_eq!(
+ table_schema.err().unwrap().to_string(),
+ "Failed to resolve the latest schema: no file path found"
+ )
+ }
+
#[tokio::test]
async fn init_commits_timeline() {
let base_url =
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index f4a1bba..f677247 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -65,17 +65,11 @@ impl HudiDataSource {
}
async fn get_record_batches(&mut self) ->
datafusion_common::Result<Vec<RecordBatch>> {
- let file_slices = self.table.get_file_slices().await.map_err(|e| {
- DataFusionError::Execution(format!("Failed to load file slices
from table: {}", e))
- })?;
-
- let mut record_batches = Vec::new();
- for fsl in file_slices {
- let batches = self.table.read_file_slice(&fsl).await.map_err(|e| {
- DataFusionError::Execution(format!("Failed to read records
from table: {}", e))
+ let record_batches =
+ self.table.read_snapshot().await.map_err(|e| {
+ DataFusionError::Execution(format!("Failed to read snapshot:
{}", e))
})?;
- record_batches.extend(batches)
- }
+
Ok(record_batches)
}
}
diff --git a/crates/tests/src/lib.rs b/crates/tests/src/lib.rs
index e467818..f94d5fd 100644
--- a/crates/tests/src/lib.rs
+++ b/crates/tests/src/lib.rs
@@ -35,6 +35,7 @@ pub fn extract_test_table(zip_path: &Path) -> PathBuf {
pub enum TestTable {
V6ComplexkeygenHivestyle,
+ V6Empty,
V6Nonpartitioned,
}
@@ -46,6 +47,7 @@ impl TestTable {
Self::V6ComplexkeygenHivestyle => data_path
.join("v6_complexkeygen_hivestyle.zip")
.into_boxed_path(),
+ Self::V6Empty => data_path.join("v6_empty.zip").into_boxed_path(),
Self::V6Nonpartitioned =>
data_path.join("v6_nonpartitioned.zip").into_boxed_path(),
}
}
@@ -58,6 +60,11 @@ impl TestTable {
.to_str()
.unwrap()
.to_string(),
+ Self::V6Empty => extract_test_table(&zip_path)
+ .join("v6_empty")
+ .to_str()
+ .unwrap()
+ .to_string(),
Self::V6Nonpartitioned => extract_test_table(&zip_path)
.join("v6_nonpartitioned")
.to_str()
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index 83ed929..67dd0cc 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -33,8 +33,10 @@ class HudiFileSlice:
base_file_size: int
num_records: int
+ @property
def base_file_relative_path(self) -> str: ...
+
class BindingHudiTable:
def __init__(
@@ -43,8 +45,12 @@ class BindingHudiTable:
storage_options: Optional[Dict[str, str]] = None,
): ...
- def schema(self) -> "pyarrow.Schema": ...
+ def get_schema(self) -> "pyarrow.Schema": ...
+
+ def get_file_slices(self) -> List[HudiFileSlice]: ...
+
+ def read_file_slice(self, base_file_relative_path) ->
List["pyarrow.RecordBatch"]: ...
- def get_latest_file_slices(self) -> List[HudiFileSlice]: ...
+ def read_snapshot(self) -> List["pyarrow.RecordBatch"]: ...
- def read_file_slice(self, relative_path) -> List["pyarrow.RecordBatch"]:
...
+ def read_snapshot_as_of(self, timestamp: str) ->
List["pyarrow.RecordBatch"]: ...
diff --git a/python/hudi/table.py b/python/hudi/table.py
index c9cab1b..943f423 100644
--- a/python/hudi/table.py
+++ b/python/hudi/table.py
@@ -36,16 +36,22 @@ class HudiTable:
):
self._table = BindingHudiTable(str(table_uri), storage_options)
- def schema(self) -> "pyarrow.Schema":
- return self._table.schema()
+ def get_schema(self) -> "pyarrow.Schema":
+ return self._table.get_schema()
- def split_latest_file_slices(self, n) -> Iterator[List[HudiFileSlice]]:
- file_slices = self.get_latest_file_slices()
+ def split_file_slices(self, n: int) -> Iterator[List[HudiFileSlice]]:
+ file_slices = self.get_file_slices()
for split in split_list(file_slices, n):
yield split
- def get_latest_file_slices(self) -> List[HudiFileSlice]:
- return self._table.get_latest_file_slices()
+ def get_file_slices(self) -> List[HudiFileSlice]:
+ return self._table.get_file_slices()
- def read_file_slice(self, relative_path) -> List["pyarrow.RecordBatch"]:
- return self._table.read_file_slice(relative_path)
+ def read_file_slice(self, base_file_relative_path: str) ->
List["pyarrow.RecordBatch"]:
+ return self._table.read_file_slice(base_file_relative_path)
+
+ def read_snapshot(self) -> List["pyarrow.RecordBatch"]:
+ return self._table.read_snapshot()
+
+ def read_snapshot_as_of(self, timestamp: str) ->
List["pyarrow.RecordBatch"]:
+ return self._table.read_snapshot_as_of(timestamp)
diff --git a/python/src/lib.rs b/python/src/lib.rs
index 2436ee5..10f39ec 100644
--- a/python/src/lib.rs
+++ b/python/src/lib.rs
@@ -21,7 +21,7 @@ use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::OnceLock;
-use anyhow::anyhow;
+use anyhow::Context;
use arrow::pyarrow::ToPyArrow;
use pyo3::prelude::*;
use tokio::runtime::Runtime;
@@ -51,15 +51,15 @@ struct HudiFileSlice {
#[pymethods]
impl HudiFileSlice {
pub fn base_file_relative_path(&self) -> PyResult<String> {
- let mut p = PathBuf::from(&self.partition_path);
- p.push(&self.base_file_name);
- match p.to_str() {
- Some(s) => Ok(s.to_string()),
- None => Err(PyErr::from(anyhow!(
+ PathBuf::from(&self.partition_path)
+ .join(&self.base_file_name)
+ .to_str()
+ .map(String::from)
+ .context(format!(
"Failed to get base file relative path for file slice: {:?}",
self
- ))),
- }
+ ))
+ .map_err(PyErr::from)
}
}
@@ -100,21 +100,30 @@ impl BindingHudiTable {
Ok(BindingHudiTable { _table })
}
- pub fn schema(&self, py: Python) -> PyResult<PyObject> {
+ pub fn get_schema(&self, py: Python) -> PyResult<PyObject> {
rt().block_on(self._table.get_schema())?.to_pyarrow(py)
}
- pub fn get_latest_file_slices(&mut self, py: Python) ->
PyResult<Vec<HudiFileSlice>> {
+ pub fn get_file_slices(&self, py: Python) -> PyResult<Vec<HudiFileSlice>> {
py.allow_threads(|| {
let file_slices = rt().block_on(self._table.get_file_slices())?;
Ok(file_slices.iter().map(convert_file_slice).collect())
})
}
- pub fn read_file_slice(&mut self, relative_path: &str, py: Python) ->
PyResult<PyObject> {
+ pub fn read_file_slice(&self, relative_path: &str, py: Python) ->
PyResult<PyObject> {
rt().block_on(self._table.read_file_slice_by_path(relative_path))?
.to_pyarrow(py)
}
+
+ pub fn read_snapshot(&self, py: Python) -> PyResult<PyObject> {
+ rt().block_on(self._table.read_snapshot())?.to_pyarrow(py)
+ }
+
+ pub fn read_snapshot_as_of(&self, timestamp: &str, py: Python) ->
PyResult<PyObject> {
+ rt().block_on(self._table.read_snapshot_as_of(timestamp))?
+ .to_pyarrow(py)
+ }
}
#[cfg(not(tarpaulin))]
diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py
index f0266f0..aec6d70 100644
--- a/python/tests/test_table_read.py
+++ b/python/tests/test_table_read.py
@@ -28,26 +28,52 @@ def test_sample_table(get_sample_table):
table_path = get_sample_table
table = HudiTable(table_path, {})
- assert table.schema().names == ['_hoodie_commit_time',
'_hoodie_commit_seqno', '_hoodie_record_key',
- '_hoodie_partition_path',
'_hoodie_file_name', 'ts', 'uuid', 'rider', 'driver',
- 'fare', 'city']
+ assert table.get_schema().names == ['_hoodie_commit_time',
'_hoodie_commit_seqno', '_hoodie_record_key',
+ '_hoodie_partition_path',
'_hoodie_file_name', 'ts', 'uuid', 'rider', 'driver',
+ 'fare', 'city']
- file_slices = table.get_latest_file_slices()
+ file_slices = table.get_file_slices()
assert len(file_slices) == 5
assert set(f.commit_time for f in file_slices) == {'20240402123035233',
'20240402144910683'}
assert all(f.num_records == 1 for f in file_slices)
file_slice_paths = [f.base_file_relative_path() for f in file_slices]
assert set(file_slice_paths) ==
{'chennai/68d3c349-f621-4cd8-9e8b-c6dd8eb20d08-0_4-12-0_20240402123035233.parquet',
-
'san_francisco/d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0_1-9-0_20240402123035233.parquet',
-
'san_francisco/780b8586-3ad0-48ef-a6a1-d2217845ce4a-0_0-8-0_20240402123035233.parquet',
-
'san_francisco/5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet',
-
'sao_paulo/ee915c68-d7f8-44f6-9759-e691add290d8-0_3-11-0_20240402123035233.parquet'}
+
'san_francisco/d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0_1-9-0_20240402123035233.parquet',
+
'san_francisco/780b8586-3ad0-48ef-a6a1-d2217845ce4a-0_0-8-0_20240402123035233.parquet',
+
'san_francisco/5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet',
+
'sao_paulo/ee915c68-d7f8-44f6-9759-e691add290d8-0_3-11-0_20240402123035233.parquet'}
batches = table.read_file_slice(file_slice_paths[0])
t = pa.Table.from_batches(batches)
assert t.num_rows == 1
assert t.num_columns == 11
- file_slices_gen = table.split_latest_file_slices(2)
+ file_slices_gen = table.split_file_slices(2)
assert len(next(file_slices_gen)) == 3
assert len(next(file_slices_gen)) == 2
+
+ batches = table.read_snapshot()
+ t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts")
+ assert t.to_pylist() == [{'_hoodie_commit_time': '20240402144910683',
'ts': 1695046462179,
+ 'uuid': '9909a8b1-2d15-4d3d-8ec9-efc48c536a00',
'fare': 339.0},
+ {'_hoodie_commit_time': '20240402123035233',
'ts': 1695091554788,
+ 'uuid': 'e96c4396-3fad-413a-a942-4cb36106d721',
'fare': 27.7},
+ {'_hoodie_commit_time': '20240402123035233',
'ts': 1695115999911,
+ 'uuid': 'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa',
'fare': 17.85},
+ {'_hoodie_commit_time': '20240402123035233',
'ts': 1695159649087,
+ 'uuid': '334e26e9-8355-45cc-97c6-c31daf0df330',
'fare': 19.1},
+ {'_hoodie_commit_time': '20240402123035233',
'ts': 1695516137016,
+ 'uuid': 'e3cf430c-889d-4015-bc98-59bdce1e530c',
'fare': 34.15}]
+
+ batches = table.read_snapshot_as_of("20240402123035233")
+ t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts")
+ assert t.to_pylist() == [{'_hoodie_commit_time': '20240402123035233',
'ts': 1695046462179,
+ 'uuid': '9909a8b1-2d15-4d3d-8ec9-efc48c536a00',
'fare': 33.9},
+ {'_hoodie_commit_time': '20240402123035233',
'ts': 1695091554788,
+ 'uuid': 'e96c4396-3fad-413a-a942-4cb36106d721',
'fare': 27.7},
+ {'_hoodie_commit_time': '20240402123035233',
'ts': 1695115999911,
+ 'uuid': 'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa',
'fare': 17.85},
+ {'_hoodie_commit_time': '20240402123035233',
'ts': 1695159649087,
+ 'uuid': '334e26e9-8355-45cc-97c6-c31daf0df330',
'fare': 19.1},
+ {'_hoodie_commit_time': '20240402123035233',
'ts': 1695516137016,
+ 'uuid': 'e3cf430c-889d-4015-bc98-59bdce1e530c',
'fare': 34.15}]