This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 587389b  feat: replace apache-avro with serde_avro_fast and 
parallelize manifest reads (#203)
587389b is described below

commit 587389b99d6f52225e4690d662b0e09fda8084f2
Author: Jingsong Lee <[email protected]>
AuthorDate: Sat Apr 4 17:13:24 2026 +0800

    feat: replace apache-avro with serde_avro_fast and parallelize manifest 
reads (#203)
    
    Switch Avro deserialization from apache-avro (Value intermediate repr) to
    serde_avro_fast (direct bytes→struct), eliminating redundant allocations
    for ~10-20x deserialization speedup. Read manifest files concurrently
    with buffer_unordered(64) instead of sequentially.
---
 Cargo.toml                               |  1 -
 crates/paimon/Cargo.toml                 |  5 +-
 crates/paimon/src/error.rs               | 17 ------
 crates/paimon/src/spec/data_file.rs      |  8 +++
 crates/paimon/src/spec/index_manifest.rs | 17 +++---
 crates/paimon/src/spec/manifest.rs       | 17 +++---
 crates/paimon/src/spec/manifest_entry.rs |  6 ++
 crates/paimon/src/spec/mod.rs            |  1 +
 crates/paimon/src/spec/objects_file.rs   | 19 ++++---
 crates/paimon/src/table/bin_pack.rs      |  1 +
 crates/paimon/src/table/table_scan.rs    | 94 +++++++++++++++++---------------
 11 files changed, 95 insertions(+), 91 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index f1f7295..2fea052 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -31,6 +31,5 @@ rust-version = "1.86.0"
 arrow-array = { version = "57.0", features = ["ffi"] }
 arrow-schema = "57.0"
 arrow-cast = "57.0"
-arrow-select = "57.0"
 parquet = "57.0"
 tokio = "1.39.2"
\ No newline at end of file
diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index b1677cf..6c6dee8 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -51,13 +51,12 @@ snafu = "0.9.0"
 typed-builder = "^0.19"
 opendal = { version = "0.55", features = ["services-fs"] }
 pretty_assertions = "1"
-apache-avro = { version = "0.17", features = ["snappy", "zstandard"] }
+serde_avro_fast = { version = "2.0.2", features = ["snappy", "zstandard"] }
 indexmap = "2.5.0"
 roaring = "0.11"
 arrow-array = { workspace = true }
 arrow-cast = { workspace = true }
 arrow-schema = { workspace = true }
-arrow-select = { workspace = true }
 futures = "0.3"
 parquet = { workspace = true, features = ["async", "zstd"] }
 async-stream = "0.3.6"
@@ -76,5 +75,5 @@ urlencoding = "2.1"
 [dev-dependencies]
 axum = { version = "0.7", features = ["macros", "tokio", "http1", "http2"] }
 rand = "0.8.5"
-serde_avro_fast = { version = "2.0.2", features = ["snappy"] }
+
 tempfile = "3"
diff --git a/crates/paimon/src/error.rs b/crates/paimon/src/error.rs
index 163e0dc..65402eb 100644
--- a/crates/paimon/src/error.rs
+++ b/crates/paimon/src/error.rs
@@ -68,14 +68,6 @@ pub enum Error {
         display("Paimon hitting invalid config: {}", message)
     )]
     ConfigInvalid { message: String },
-    #[snafu(
-        visibility(pub(crate)),
-        display("Paimon hitting unexpected avro error {}: {:?}", message, 
source)
-    )]
-    DataUnexpected {
-        message: String,
-        source: Box<apache_avro::Error>,
-    },
     #[snafu(
         visibility(pub(crate)),
         display("Paimon hitting invalid file index format: {}", message)
@@ -127,15 +119,6 @@ impl From<opendal::Error> for Error {
     }
 }
 
-impl From<apache_avro::Error> for Error {
-    fn from(source: apache_avro::Error) -> Self {
-        Error::DataUnexpected {
-            message: "".to_string(),
-            source: Box::new(source),
-        }
-    }
-}
-
 impl From<parquet::errors::ParquetError> for Error {
     fn from(source: parquet::errors::ParquetError) -> Self {
         Error::ParquetDataUnexpected {
diff --git a/crates/paimon/src/spec/data_file.rs 
b/crates/paimon/src/spec/data_file.rs
index 48bfe78..1d20ca2 100644
--- a/crates/paimon/src/spec/data_file.rs
+++ b/crates/paimon/src/spec/data_file.rs
@@ -412,6 +412,14 @@ pub struct DataFileMeta {
         skip_serializing_if = "Option::is_none"
     )]
     pub write_cols: Option<Vec<String>>,
+
+    /// External path for the data file (e.g. when data is stored outside the 
table directory).
+    #[serde(
+        rename = "_EXTERNAL_PATH",
+        default,
+        skip_serializing_if = "Option::is_none"
+    )]
+    pub external_path: Option<String>,
 }
 
 impl Display for DataFileMeta {
diff --git a/crates/paimon/src/spec/index_manifest.rs 
b/crates/paimon/src/spec/index_manifest.rs
index 3131160..b4a3c0b 100644
--- a/crates/paimon/src/spec/index_manifest.rs
+++ b/crates/paimon/src/spec/index_manifest.rs
@@ -18,12 +18,11 @@
 use crate::io::FileIO;
 use crate::spec::manifest_common::FileKind;
 use crate::spec::IndexFileMeta;
-use apache_avro::types::Value;
-use apache_avro::{from_value, Reader};
 use serde::{Deserialize, Serialize};
+use serde_avro_fast::object_container_file_encoding::Reader;
+use snafu::ResultExt;
 use std::fmt::{Display, Formatter};
 
-use crate::Error;
 use crate::Result;
 
 /// Manifest entry for index file.
@@ -76,12 +75,12 @@ impl IndexManifest {
 
     /// Read index manifest entries from Avro-encoded bytes.
     pub fn read_from_bytes(bytes: &[u8]) -> Result<Vec<IndexManifestEntry>> {
-        let reader = Reader::new(bytes).map_err(Error::from)?;
-        let records = reader
-            .collect::<std::result::Result<Vec<Value>, _>>()
-            .map_err(Error::from)?;
-        let values = Value::Array(records);
-        from_value::<Vec<IndexManifestEntry>>(&values).map_err(Error::from)
+        let mut reader = Reader::from_slice(bytes)
+            .whatever_context::<_, crate::Error>("read index manifest avro")?;
+        reader
+            .deserialize::<IndexManifestEntry>()
+            .collect::<std::result::Result<Vec<_>, _>>()
+            .whatever_context::<_, crate::Error>("deserialize index manifest 
entry")
     }
 }
 
diff --git a/crates/paimon/src/spec/manifest.rs 
b/crates/paimon/src/spec/manifest.rs
index ff88510..93eb045 100644
--- a/crates/paimon/src/spec/manifest.rs
+++ b/crates/paimon/src/spec/manifest.rs
@@ -17,10 +17,9 @@
 
 use crate::io::FileIO;
 use crate::spec::manifest_entry::ManifestEntry;
-use apache_avro::types::Value;
-use apache_avro::{from_value, Reader};
+use serde_avro_fast::object_container_file_encoding::Reader;
+use snafu::ResultExt;
 
-use crate::Error;
 use crate::Result;
 
 /// Manifest file reader and writer.
@@ -60,12 +59,12 @@ impl Manifest {
     /// # Returns
     /// A vector of ManifestEntry records
     fn read_from_bytes(bytes: &[u8]) -> Result<Vec<ManifestEntry>> {
-        let reader = Reader::new(bytes).map_err(Error::from)?;
-        let records = reader
-            .collect::<std::result::Result<Vec<Value>, _>>()
-            .map_err(Error::from)?;
-        let values = Value::Array(records);
-        from_value::<Vec<ManifestEntry>>(&values).map_err(Error::from)
+        let mut reader =
+            Reader::from_slice(bytes).whatever_context::<_, 
crate::Error>("read manifest avro")?;
+        reader
+            .deserialize::<ManifestEntry>()
+            .collect::<std::result::Result<Vec<_>, _>>()
+            .whatever_context::<_, crate::Error>("deserialize manifest entry")
     }
 }
 
diff --git a/crates/paimon/src/spec/manifest_entry.rs 
b/crates/paimon/src/spec/manifest_entry.rs
index 397c7f4..d7aa32c 100644
--- a/crates/paimon/src/spec/manifest_entry.rs
+++ b/crates/paimon/src/spec/manifest_entry.rs
@@ -28,6 +28,9 @@ pub struct Identifier {
     pub bucket: i32,
     pub level: i32,
     pub file_name: String,
+    pub extra_files: Vec<String>,
+    pub embedded_index: Option<Vec<u8>>,
+    pub external_path: Option<String>,
 }
 
 /// Entry of a manifest file, representing an addition / deletion of a data 
file.
@@ -90,6 +93,9 @@ impl ManifestEntry {
             bucket: self.bucket,
             level: self.file.level,
             file_name: self.file.file_name.clone(),
+            extra_files: self.file.extra_files.clone(),
+            embedded_index: self.file.embedded_index.clone(),
+            external_path: self.file.external_path.clone(),
         }
     }
 
diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs
index bcfe5ad..8891c43 100644
--- a/crates/paimon/src/spec/mod.rs
+++ b/crates/paimon/src/spec/mod.rs
@@ -47,6 +47,7 @@ pub use manifest::Manifest;
 mod manifest_common;
 pub use manifest_common::FileKind;
 mod manifest_entry;
+pub use manifest_entry::Identifier;
 pub use manifest_entry::ManifestEntry;
 mod objects_file;
 pub use objects_file::from_avro_bytes;
diff --git a/crates/paimon/src/spec/objects_file.rs 
b/crates/paimon/src/spec/objects_file.rs
index 9685f23..bff209e 100644
--- a/crates/paimon/src/spec/objects_file.rs
+++ b/crates/paimon/src/spec/objects_file.rs
@@ -15,19 +15,18 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::Error;
-use apache_avro::types::Value;
-use apache_avro::{from_value, Reader};
 use serde::de::DeserializeOwned;
+use serde_avro_fast::object_container_file_encoding::Reader;
+use snafu::ResultExt;
 
 #[allow(dead_code)]
 pub fn from_avro_bytes<T: DeserializeOwned>(bytes: &[u8]) -> 
crate::Result<Vec<T>> {
-    let reader = Reader::new(bytes).map_err(Error::from)?;
-    let records = reader
-        .collect::<Result<Vec<Value>, _>>()
-        .map_err(Error::from)?;
-    let values = Value::Array(records);
-    from_value::<Vec<T>>(&values).map_err(Error::from)
+    let mut reader = Reader::from_slice(bytes)
+        .whatever_context::<_, crate::Error>("read avro object container")?;
+    reader
+        .deserialize::<T>()
+        .collect::<std::result::Result<Vec<_>, _>>()
+        .whatever_context::<_, crate::Error>("deserialize avro records")
 }
 
 #[cfg(test)]
@@ -122,6 +121,7 @@ mod tests {
                         embedded_index: None,
                         first_row_id: None,
                         write_cols: None,
+                        external_path: None,
                     },
                     2
                 ),
@@ -158,6 +158,7 @@ mod tests {
                         embedded_index: None,
                         first_row_id: None,
                         write_cols: None,
+                        external_path: None,
                     },
                     2
                 ),
diff --git a/crates/paimon/src/table/bin_pack.rs 
b/crates/paimon/src/table/bin_pack.rs
index 6a567cd..841154e 100644
--- a/crates/paimon/src/table/bin_pack.rs
+++ b/crates/paimon/src/table/bin_pack.rs
@@ -98,6 +98,7 @@ mod tests {
             embedded_index: None,
             first_row_id: None,
             write_cols: None,
+            external_path: None,
         }
     }
 
diff --git a/crates/paimon/src/table/table_scan.rs 
b/crates/paimon/src/table/table_scan.rs
index 2f230ce..ba3e071 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -33,6 +33,7 @@ use crate::table::source::{DataSplit, DataSplitBuilder, 
DeletionFile, PartitionB
 use crate::table::SnapshotManager;
 use crate::table::TagManager;
 use crate::Error;
+use futures::{StreamExt, TryStreamExt};
 use std::cmp::Ordering;
 use std::collections::{HashMap, HashSet};
 use std::sync::Arc;
@@ -74,13 +75,17 @@ async fn read_all_manifest_entries(
     manifest_files.extend(delta);
 
     let manifest_path_prefix = format!("{}/{}", 
table_path.trim_end_matches('/'), MANIFEST_DIR);
-    let mut all_entries = Vec::new();
-    // todo: consider use multiple-threads read manifest
-    for meta in manifest_files {
-        let path = format!("{}/{}", manifest_path_prefix, meta.file_name());
-        let entries = crate::spec::Manifest::read(file_io, &path).await?;
-        all_entries.extend(entries);
-    }
+    let all_entries: Vec<ManifestEntry> = futures::stream::iter(manifest_files)
+        .map(|meta| {
+            let path = format!("{}/{}", manifest_path_prefix, 
meta.file_name());
+            async move { crate::spec::Manifest::read(file_io, &path).await }
+        })
+        .buffered(64)
+        .try_collect::<Vec<_>>()
+        .await?
+        .into_iter()
+        .flatten()
+        .collect();
     Ok(all_entries)
 }
 
@@ -953,7 +958,44 @@ mod tests {
     };
     use crate::table::source::DeletionFile;
     use crate::Error;
-    use chrono::Utc;
+    use chrono::{DateTime, Utc};
+
+    /// Helper to build a DataFileMeta with data evolution fields.
+    fn make_evo_file(
+        name: &str,
+        file_size: i64,
+        row_count: i64,
+        max_seq: i64,
+        first_row_id: Option<i64>,
+    ) -> DataFileMeta {
+        DataFileMeta {
+            file_name: name.to_string(),
+            file_size,
+            row_count,
+            min_key: Vec::new(),
+            max_key: Vec::new(),
+            key_stats: BinaryTableStats::new(Vec::new(), Vec::new(), 
Vec::new()),
+            value_stats: BinaryTableStats::new(Vec::new(), Vec::new(), 
Vec::new()),
+            min_sequence_number: 0,
+            max_sequence_number: max_seq,
+            schema_id: 0,
+            level: 0,
+            extra_files: Vec::new(),
+            creation_time: DateTime::<Utc>::from_timestamp(0, 0).unwrap(),
+            delete_row_count: None,
+            embedded_index: None,
+            first_row_id,
+            write_cols: None,
+            external_path: None,
+        }
+    }
+
+    fn file_names(groups: &[Vec<DataFileMeta>]) -> Vec<Vec<&str>> {
+        groups
+            .iter()
+            .map(|g| g.iter().map(|f| f.file_name.as_str()).collect())
+            .collect()
+    }
 
     struct SerializedBinaryRowBuilder {
         arity: i32,
@@ -1106,44 +1148,10 @@ mod tests {
             embedded_index: None,
             first_row_id: None,
             write_cols: None,
+            external_path: None,
         }
     }
 
-    fn make_evo_file(
-        name: &str,
-        file_size: i64,
-        row_count: i64,
-        max_seq: i64,
-        first_row_id: Option<i64>,
-    ) -> DataFileMeta {
-        DataFileMeta {
-            file_name: name.to_string(),
-            file_size,
-            row_count,
-            min_key: Vec::new(),
-            max_key: Vec::new(),
-            key_stats: BinaryTableStats::new(Vec::new(), Vec::new(), 
Vec::new()),
-            value_stats: BinaryTableStats::new(Vec::new(), Vec::new(), 
Vec::new()),
-            min_sequence_number: max_seq,
-            max_sequence_number: max_seq,
-            schema_id: 0,
-            level: 0,
-            extra_files: Vec::new(),
-            creation_time: Utc::now(),
-            delete_row_count: None,
-            embedded_index: None,
-            first_row_id,
-            write_cols: None,
-        }
-    }
-
-    fn file_names(groups: &[Vec<DataFileMeta>]) -> Vec<Vec<&str>> {
-        groups
-            .iter()
-            .map(|group| group.iter().map(|file| 
file.file_name.as_str()).collect())
-            .collect()
-    }
-
     #[test]
     fn test_partition_matches_predicate_decode_failure_fails_open() {
         let predicate = PredicateBuilder::new(&partition_string_field())

Reply via email to