This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 587389b feat: replace apache-avro with serde_avro_fast and
parallelize manifest reads (#203)
587389b is described below
commit 587389b99d6f52225e4690d662b0e09fda8084f2
Author: Jingsong Lee <[email protected]>
AuthorDate: Sat Apr 4 17:13:24 2026 +0800
feat: replace apache-avro with serde_avro_fast and parallelize manifest
reads (#203)
Switch Avro deserialization from apache-avro (Value intermediate repr) to
serde_avro_fast (direct bytes→struct), eliminating redundant allocations
for ~10-20x deserialization speedup. Read manifest files concurrently
with buffer_unordered(64) instead of sequentially.
---
Cargo.toml | 1 -
crates/paimon/Cargo.toml | 5 +-
crates/paimon/src/error.rs | 17 ------
crates/paimon/src/spec/data_file.rs | 8 +++
crates/paimon/src/spec/index_manifest.rs | 17 +++---
crates/paimon/src/spec/manifest.rs | 17 +++---
crates/paimon/src/spec/manifest_entry.rs | 6 ++
crates/paimon/src/spec/mod.rs | 1 +
crates/paimon/src/spec/objects_file.rs | 19 ++++---
crates/paimon/src/table/bin_pack.rs | 1 +
crates/paimon/src/table/table_scan.rs | 94 +++++++++++++++++---------------
11 files changed, 95 insertions(+), 91 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index f1f7295..2fea052 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -31,6 +31,5 @@ rust-version = "1.86.0"
arrow-array = { version = "57.0", features = ["ffi"] }
arrow-schema = "57.0"
arrow-cast = "57.0"
-arrow-select = "57.0"
parquet = "57.0"
tokio = "1.39.2"
\ No newline at end of file
diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index b1677cf..6c6dee8 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -51,13 +51,12 @@ snafu = "0.9.0"
typed-builder = "^0.19"
opendal = { version = "0.55", features = ["services-fs"] }
pretty_assertions = "1"
-apache-avro = { version = "0.17", features = ["snappy", "zstandard"] }
+serde_avro_fast = { version = "2.0.2", features = ["snappy", "zstandard"] }
indexmap = "2.5.0"
roaring = "0.11"
arrow-array = { workspace = true }
arrow-cast = { workspace = true }
arrow-schema = { workspace = true }
-arrow-select = { workspace = true }
futures = "0.3"
parquet = { workspace = true, features = ["async", "zstd"] }
async-stream = "0.3.6"
@@ -76,5 +75,5 @@ urlencoding = "2.1"
[dev-dependencies]
axum = { version = "0.7", features = ["macros", "tokio", "http1", "http2"] }
rand = "0.8.5"
-serde_avro_fast = { version = "2.0.2", features = ["snappy"] }
+
tempfile = "3"
diff --git a/crates/paimon/src/error.rs b/crates/paimon/src/error.rs
index 163e0dc..65402eb 100644
--- a/crates/paimon/src/error.rs
+++ b/crates/paimon/src/error.rs
@@ -68,14 +68,6 @@ pub enum Error {
display("Paimon hitting invalid config: {}", message)
)]
ConfigInvalid { message: String },
- #[snafu(
- visibility(pub(crate)),
- display("Paimon hitting unexpected avro error {}: {:?}", message,
source)
- )]
- DataUnexpected {
- message: String,
- source: Box<apache_avro::Error>,
- },
#[snafu(
visibility(pub(crate)),
display("Paimon hitting invalid file index format: {}", message)
@@ -127,15 +119,6 @@ impl From<opendal::Error> for Error {
}
}
-impl From<apache_avro::Error> for Error {
- fn from(source: apache_avro::Error) -> Self {
- Error::DataUnexpected {
- message: "".to_string(),
- source: Box::new(source),
- }
- }
-}
-
impl From<parquet::errors::ParquetError> for Error {
fn from(source: parquet::errors::ParquetError) -> Self {
Error::ParquetDataUnexpected {
diff --git a/crates/paimon/src/spec/data_file.rs
b/crates/paimon/src/spec/data_file.rs
index 48bfe78..1d20ca2 100644
--- a/crates/paimon/src/spec/data_file.rs
+++ b/crates/paimon/src/spec/data_file.rs
@@ -412,6 +412,14 @@ pub struct DataFileMeta {
skip_serializing_if = "Option::is_none"
)]
pub write_cols: Option<Vec<String>>,
+
+ /// External path for the data file (e.g. when data is stored outside the
table directory).
+ #[serde(
+ rename = "_EXTERNAL_PATH",
+ default,
+ skip_serializing_if = "Option::is_none"
+ )]
+ pub external_path: Option<String>,
}
impl Display for DataFileMeta {
diff --git a/crates/paimon/src/spec/index_manifest.rs
b/crates/paimon/src/spec/index_manifest.rs
index 3131160..b4a3c0b 100644
--- a/crates/paimon/src/spec/index_manifest.rs
+++ b/crates/paimon/src/spec/index_manifest.rs
@@ -18,12 +18,11 @@
use crate::io::FileIO;
use crate::spec::manifest_common::FileKind;
use crate::spec::IndexFileMeta;
-use apache_avro::types::Value;
-use apache_avro::{from_value, Reader};
use serde::{Deserialize, Serialize};
+use serde_avro_fast::object_container_file_encoding::Reader;
+use snafu::ResultExt;
use std::fmt::{Display, Formatter};
-use crate::Error;
use crate::Result;
/// Manifest entry for index file.
@@ -76,12 +75,12 @@ impl IndexManifest {
/// Read index manifest entries from Avro-encoded bytes.
pub fn read_from_bytes(bytes: &[u8]) -> Result<Vec<IndexManifestEntry>> {
- let reader = Reader::new(bytes).map_err(Error::from)?;
- let records = reader
- .collect::<std::result::Result<Vec<Value>, _>>()
- .map_err(Error::from)?;
- let values = Value::Array(records);
- from_value::<Vec<IndexManifestEntry>>(&values).map_err(Error::from)
+ let mut reader = Reader::from_slice(bytes)
+ .whatever_context::<_, crate::Error>("read index manifest avro")?;
+ reader
+ .deserialize::<IndexManifestEntry>()
+ .collect::<std::result::Result<Vec<_>, _>>()
+ .whatever_context::<_, crate::Error>("deserialize index manifest
entry")
}
}
diff --git a/crates/paimon/src/spec/manifest.rs
b/crates/paimon/src/spec/manifest.rs
index ff88510..93eb045 100644
--- a/crates/paimon/src/spec/manifest.rs
+++ b/crates/paimon/src/spec/manifest.rs
@@ -17,10 +17,9 @@
use crate::io::FileIO;
use crate::spec::manifest_entry::ManifestEntry;
-use apache_avro::types::Value;
-use apache_avro::{from_value, Reader};
+use serde_avro_fast::object_container_file_encoding::Reader;
+use snafu::ResultExt;
-use crate::Error;
use crate::Result;
/// Manifest file reader and writer.
@@ -60,12 +59,12 @@ impl Manifest {
/// # Returns
/// A vector of ManifestEntry records
fn read_from_bytes(bytes: &[u8]) -> Result<Vec<ManifestEntry>> {
- let reader = Reader::new(bytes).map_err(Error::from)?;
- let records = reader
- .collect::<std::result::Result<Vec<Value>, _>>()
- .map_err(Error::from)?;
- let values = Value::Array(records);
- from_value::<Vec<ManifestEntry>>(&values).map_err(Error::from)
+ let mut reader =
+ Reader::from_slice(bytes).whatever_context::<_,
crate::Error>("read manifest avro")?;
+ reader
+ .deserialize::<ManifestEntry>()
+ .collect::<std::result::Result<Vec<_>, _>>()
+ .whatever_context::<_, crate::Error>("deserialize manifest entry")
}
}
diff --git a/crates/paimon/src/spec/manifest_entry.rs
b/crates/paimon/src/spec/manifest_entry.rs
index 397c7f4..d7aa32c 100644
--- a/crates/paimon/src/spec/manifest_entry.rs
+++ b/crates/paimon/src/spec/manifest_entry.rs
@@ -28,6 +28,9 @@ pub struct Identifier {
pub bucket: i32,
pub level: i32,
pub file_name: String,
+ pub extra_files: Vec<String>,
+ pub embedded_index: Option<Vec<u8>>,
+ pub external_path: Option<String>,
}
/// Entry of a manifest file, representing an addition / deletion of a data
file.
@@ -90,6 +93,9 @@ impl ManifestEntry {
bucket: self.bucket,
level: self.file.level,
file_name: self.file.file_name.clone(),
+ extra_files: self.file.extra_files.clone(),
+ embedded_index: self.file.embedded_index.clone(),
+ external_path: self.file.external_path.clone(),
}
}
diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs
index bcfe5ad..8891c43 100644
--- a/crates/paimon/src/spec/mod.rs
+++ b/crates/paimon/src/spec/mod.rs
@@ -47,6 +47,7 @@ pub use manifest::Manifest;
mod manifest_common;
pub use manifest_common::FileKind;
mod manifest_entry;
+pub use manifest_entry::Identifier;
pub use manifest_entry::ManifestEntry;
mod objects_file;
pub use objects_file::from_avro_bytes;
diff --git a/crates/paimon/src/spec/objects_file.rs
b/crates/paimon/src/spec/objects_file.rs
index 9685f23..bff209e 100644
--- a/crates/paimon/src/spec/objects_file.rs
+++ b/crates/paimon/src/spec/objects_file.rs
@@ -15,19 +15,18 @@
// specific language governing permissions and limitations
// under the License.
-use crate::Error;
-use apache_avro::types::Value;
-use apache_avro::{from_value, Reader};
use serde::de::DeserializeOwned;
+use serde_avro_fast::object_container_file_encoding::Reader;
+use snafu::ResultExt;
#[allow(dead_code)]
pub fn from_avro_bytes<T: DeserializeOwned>(bytes: &[u8]) ->
crate::Result<Vec<T>> {
- let reader = Reader::new(bytes).map_err(Error::from)?;
- let records = reader
- .collect::<Result<Vec<Value>, _>>()
- .map_err(Error::from)?;
- let values = Value::Array(records);
- from_value::<Vec<T>>(&values).map_err(Error::from)
+ let mut reader = Reader::from_slice(bytes)
+ .whatever_context::<_, crate::Error>("read avro object container")?;
+ reader
+ .deserialize::<T>()
+ .collect::<std::result::Result<Vec<_>, _>>()
+ .whatever_context::<_, crate::Error>("deserialize avro records")
}
#[cfg(test)]
@@ -122,6 +121,7 @@ mod tests {
embedded_index: None,
first_row_id: None,
write_cols: None,
+ external_path: None,
},
2
),
@@ -158,6 +158,7 @@ mod tests {
embedded_index: None,
first_row_id: None,
write_cols: None,
+ external_path: None,
},
2
),
diff --git a/crates/paimon/src/table/bin_pack.rs
b/crates/paimon/src/table/bin_pack.rs
index 6a567cd..841154e 100644
--- a/crates/paimon/src/table/bin_pack.rs
+++ b/crates/paimon/src/table/bin_pack.rs
@@ -98,6 +98,7 @@ mod tests {
embedded_index: None,
first_row_id: None,
write_cols: None,
+ external_path: None,
}
}
diff --git a/crates/paimon/src/table/table_scan.rs
b/crates/paimon/src/table/table_scan.rs
index 2f230ce..ba3e071 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -33,6 +33,7 @@ use crate::table::source::{DataSplit, DataSplitBuilder,
DeletionFile, PartitionB
use crate::table::SnapshotManager;
use crate::table::TagManager;
use crate::Error;
+use futures::{StreamExt, TryStreamExt};
use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
@@ -74,13 +75,17 @@ async fn read_all_manifest_entries(
manifest_files.extend(delta);
let manifest_path_prefix = format!("{}/{}",
table_path.trim_end_matches('/'), MANIFEST_DIR);
- let mut all_entries = Vec::new();
- // todo: consider use multiple-threads read manifest
- for meta in manifest_files {
- let path = format!("{}/{}", manifest_path_prefix, meta.file_name());
- let entries = crate::spec::Manifest::read(file_io, &path).await?;
- all_entries.extend(entries);
- }
+ let all_entries: Vec<ManifestEntry> = futures::stream::iter(manifest_files)
+ .map(|meta| {
+ let path = format!("{}/{}", manifest_path_prefix,
meta.file_name());
+ async move { crate::spec::Manifest::read(file_io, &path).await }
+ })
+ .buffered(64)
+ .try_collect::<Vec<_>>()
+ .await?
+ .into_iter()
+ .flatten()
+ .collect();
Ok(all_entries)
}
@@ -953,7 +958,44 @@ mod tests {
};
use crate::table::source::DeletionFile;
use crate::Error;
- use chrono::Utc;
+ use chrono::{DateTime, Utc};
+
+ /// Helper to build a DataFileMeta with data evolution fields.
+ fn make_evo_file(
+ name: &str,
+ file_size: i64,
+ row_count: i64,
+ max_seq: i64,
+ first_row_id: Option<i64>,
+ ) -> DataFileMeta {
+ DataFileMeta {
+ file_name: name.to_string(),
+ file_size,
+ row_count,
+ min_key: Vec::new(),
+ max_key: Vec::new(),
+ key_stats: BinaryTableStats::new(Vec::new(), Vec::new(),
Vec::new()),
+ value_stats: BinaryTableStats::new(Vec::new(), Vec::new(),
Vec::new()),
+ min_sequence_number: 0,
+ max_sequence_number: max_seq,
+ schema_id: 0,
+ level: 0,
+ extra_files: Vec::new(),
+ creation_time: DateTime::<Utc>::from_timestamp(0, 0).unwrap(),
+ delete_row_count: None,
+ embedded_index: None,
+ first_row_id,
+ write_cols: None,
+ external_path: None,
+ }
+ }
+
+ fn file_names(groups: &[Vec<DataFileMeta>]) -> Vec<Vec<&str>> {
+ groups
+ .iter()
+ .map(|g| g.iter().map(|f| f.file_name.as_str()).collect())
+ .collect()
+ }
struct SerializedBinaryRowBuilder {
arity: i32,
@@ -1106,44 +1148,10 @@ mod tests {
embedded_index: None,
first_row_id: None,
write_cols: None,
+ external_path: None,
}
}
- fn make_evo_file(
- name: &str,
- file_size: i64,
- row_count: i64,
- max_seq: i64,
- first_row_id: Option<i64>,
- ) -> DataFileMeta {
- DataFileMeta {
- file_name: name.to_string(),
- file_size,
- row_count,
- min_key: Vec::new(),
- max_key: Vec::new(),
- key_stats: BinaryTableStats::new(Vec::new(), Vec::new(),
Vec::new()),
- value_stats: BinaryTableStats::new(Vec::new(), Vec::new(),
Vec::new()),
- min_sequence_number: max_seq,
- max_sequence_number: max_seq,
- schema_id: 0,
- level: 0,
- extra_files: Vec::new(),
- creation_time: Utc::now(),
- delete_row_count: None,
- embedded_index: None,
- first_row_id,
- write_cols: None,
- }
- }
-
- fn file_names(groups: &[Vec<DataFileMeta>]) -> Vec<Vec<&str>> {
- groups
- .iter()
- .map(|group| group.iter().map(|file|
file.file_name.as_str()).collect())
- .collect()
- }
-
#[test]
fn test_partition_matches_predicate_decode_failure_fails_open() {
let predicate = PredicateBuilder::new(&partition_string_field())