This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 38830ef  feat(scan): support deletion vector cardinality (#200)
38830ef is described below

commit 38830ef279500f403ade34411de1316c7d0893d9
Author: yuxia Luo <[email protected]>
AuthorDate: Fri Apr 3 19:43:26 2026 +0800

    feat(scan): support deletion vector cardinality (#200)
---
 crates/paimon/src/spec/index_file_meta.rs          |  39 ++++++++---
 crates/paimon/src/spec/index_manifest.rs           |  72 +++++++++------------
 crates/paimon/src/table/source.rs                  |  12 ----
 crates/paimon/src/table/table_scan.rs              |  53 +++++++++++++--
 ...manifest-7e816ed9-9f3b-4786-9985-8937d4e07b6e-0 | Bin 0 -> 965 bytes
 ...manifest-85cc6729-f5af-431a-a1c3-ef45319328fb-0 | Bin 899 -> 0 bytes
 6 files changed, 108 insertions(+), 68 deletions(-)

diff --git a/crates/paimon/src/spec/index_file_meta.rs 
b/crates/paimon/src/spec/index_file_meta.rs
index 16407f2..5c78c4a 100644
--- a/crates/paimon/src/spec/index_file_meta.rs
+++ b/crates/paimon/src/spec/index_file_meta.rs
@@ -20,6 +20,13 @@ use std::fmt::{Display, Formatter};
 
 use indexmap::IndexMap;
 
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct DeletionVectorMeta {
+    pub offset: i32,
+    pub length: i32,
+    pub cardinality: Option<i64>,
+}
+
 /// Metadata of index file.
 ///
 /// Impl Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java>
@@ -44,7 +51,7 @@ pub struct IndexFileMeta {
         rename = "_DELETIONS_VECTORS_RANGES",
         alias = "_DELETION_VECTORS_RANGES"
     )]
-    pub deletion_vectors_ranges: Option<IndexMap<String, (i32, i32)>>,
+    pub deletion_vectors_ranges: Option<IndexMap<String, DeletionVectorMeta>>,
 }
 
 impl Display for IndexFileMeta {
@@ -65,26 +72,31 @@ mod map_serde {
     use indexmap::IndexMap;
     use serde::{Deserialize, Deserializer, Serialize, Serializer};
 
+    use super::DeletionVectorMeta;
+
     #[derive(Deserialize, Serialize)]
     struct Temp {
         f0: String,
         f1: i32,
         f2: i32,
+        #[serde(default, rename = "_CARDINALITY")]
+        cardinality: Option<i64>,
     }
 
     pub fn serialize<S>(
-        date: &Option<IndexMap<String, (i32, i32)>>,
+        data: &Option<IndexMap<String, DeletionVectorMeta>>,
         s: S,
     ) -> Result<S::Ok, S::Error>
     where
         S: Serializer,
     {
-        match *date {
+        match data {
             None => s.serialize_none(),
-            Some(ref d) => s.collect_seq(d.iter().map(|(s, (i1, i2))| Temp {
-                f0: s.into(),
-                f1: *i1,
-                f2: *i2,
+            Some(d) => s.collect_seq(d.iter().map(|(path, meta)| Temp {
+                f0: path.clone(),
+                f1: meta.offset,
+                f2: meta.length,
+                cardinality: meta.cardinality,
             })),
         }
     }
@@ -92,7 +104,7 @@ mod map_serde {
     #[allow(clippy::type_complexity)]
     pub fn deserialize<'de, D>(
         deserializer: D,
-    ) -> Result<Option<IndexMap<String, (i32, i32)>>, D::Error>
+    ) -> Result<Option<IndexMap<String, DeletionVectorMeta>>, D::Error>
     where
         D: Deserializer<'de>,
     {
@@ -100,7 +112,16 @@ mod map_serde {
             None => Ok(None),
             Some::<Vec<Temp>>(s) => Ok(Some(
                 s.into_iter()
-                    .map(|t| (t.f0, (t.f1, t.f2)))
+                    .map(|t| {
+                        (
+                            t.f0,
+                            DeletionVectorMeta {
+                                offset: t.f1,
+                                length: t.f2,
+                                cardinality: t.cardinality,
+                            },
+                        )
+                    })
                     .collect::<IndexMap<_, _>>(),
             )),
         }
diff --git a/crates/paimon/src/spec/index_manifest.rs 
b/crates/paimon/src/spec/index_manifest.rs
index 6215030..3131160 100644
--- a/crates/paimon/src/spec/index_manifest.rs
+++ b/crates/paimon/src/spec/index_manifest.rs
@@ -90,54 +90,38 @@ mod tests {
     use indexmap::IndexMap;
 
     use super::*;
+    use crate::spec::DeletionVectorMeta;
 
     #[test]
     fn test_read_index_manifest_file() {
         let workdir =
             std::env::current_dir().unwrap_or_else(|err| panic!("current_dir 
must exist: {err}"));
         let path = workdir
-            
.join("tests/fixtures/manifest/index-manifest-85cc6729-f5af-431a-a1c3-ef45319328fb-0");
+            
.join("tests/fixtures/manifest/index-manifest-7e816ed9-9f3b-4786-9985-8937d4e07b6e-0");
         let source = std::fs::read(path.to_str().unwrap()).unwrap();
-        let mut reader =
-            
serde_avro_fast::object_container_file_encoding::Reader::from_slice(source.as_slice())
-                .unwrap();
-        let res: Vec<_> = reader
-            .deserialize::<IndexManifestEntry>()
-            .collect::<Result<_, _>>()
-            .unwrap();
+        let res = IndexManifest::read_from_bytes(&source).unwrap();
         assert_eq!(
             res,
-            vec![
-                IndexManifestEntry {
-                    version: 1,
-                    kind: FileKind::Add,
-                    partition: vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
-                    bucket: 0,
-                    index_file: IndexFileMeta {
-                        index_type: "HASH".into(),
-                        file_name: 
"index-a984b43a-c3fb-40b4-ad29-536343c239a6-0".into(),
-                        file_size: 16,
-                        row_count: 4,
-                        deletion_vectors_ranges: None,
-                    }
-                },
-                IndexManifestEntry {
-                    version: 1,
-                    kind: FileKind::Add,
-                    partition: vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
-                    bucket: 0,
-                    index_file: IndexFileMeta {
-                        index_type: "DELETION_VECTORS".into(),
-                        file_name: 
"index-3f0986c5-4398-449b-be82-95f019d7a748-0".into(),
-                        file_size: 33,
-                        row_count: 1,
-                        deletion_vectors_ranges: Some(IndexMap::from([(
-                            
"data-9b76122c-6bb5-4952-a946-b5bce29694a1-0.orc".into(),
-                            (1, 24)
-                        )])),
-                    }
+            vec![IndexManifestEntry {
+                version: 1,
+                kind: FileKind::Add,
+                partition: vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
+                bucket: 0,
+                index_file: IndexFileMeta {
+                    index_type: "DELETION_VECTORS".into(),
+                    file_name: 
"index-4326356b-aad7-4fd8-9d88-2bb6993c8ce9-0".into(),
+                    file_size: 35,
+                    row_count: 1,
+                    deletion_vectors_ranges: Some(IndexMap::from([(
+                        
"data-a989fc44-a361-42c2-801f-e50baba95a92-0.parquet".into(),
+                        DeletionVectorMeta {
+                            offset: 1,
+                            length: 26,
+                            cardinality: Some(3),
+                        }
+                    )])),
                 }
-            ]
+            }]
         );
     }
 
@@ -153,7 +137,14 @@ mod tests {
                 file_name: "test1".into(),
                 file_size: 33,
                 row_count: 1,
-                deletion_vectors_ranges: Some(IndexMap::from([("test1".into(), 
(1, 24))])),
+                deletion_vectors_ranges: Some(IndexMap::from([(
+                    "test1".into(),
+                    DeletionVectorMeta {
+                        offset: 1,
+                        length: 24,
+                        cardinality: Some(7),
+                    },
+                )])),
             },
         };
 
@@ -180,7 +171,8 @@ mod tests {
                             "fields": [
                                 {"name": "f0", "type": "string"}, 
                                 {"name": "f1", "type": "int"}, 
-                                {"name": "f2", "type": "int"}
+                                {"name": "f2", "type": "int"},
+                                {"name": "_CARDINALITY", "type": ["null", 
"long"], "default": null}
                             ]
                         }]
                     }]
diff --git a/crates/paimon/src/table/source.rs 
b/crates/paimon/src/table/source.rs
index 9e96b5b..689793c 100644
--- a/crates/paimon/src/table/source.rs
+++ b/crates/paimon/src/table/source.rs
@@ -168,18 +168,6 @@ impl DataSplit {
         format!("{}/{}", base, file.file_name)
     }
 
-    /// Iterate over each data file in this split, yielding `(path, 
&DataFileMeta)`.
-    /// Use this to read each data file one by one (e.g. in ArrowReader).
-    pub fn data_file_entries(&self) -> impl Iterator<Item = (String, 
&DataFileMeta)> + '_ {
-        let base = self.bucket_path.trim_end_matches('/');
-        // todo: consider partition table
-        // todo: consider external path
-        self.data_files.iter().map(move |file| {
-            let path = format!("{}/{}", base, file.file_name);
-            (path, file)
-        })
-    }
-
     /// Total row count of all data files in this split.
     pub fn row_count(&self) -> i64 {
         self.data_files.iter().map(|f| f.row_count).sum()
diff --git a/crates/paimon/src/table/table_scan.rs 
b/crates/paimon/src/table/table_scan.rs
index 027dd75..1c1285d 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -113,15 +113,14 @@ fn build_deletion_files_map(
         let key = PartitionBucket::new(entry.partition.clone(), entry.bucket);
         let dv_path = format!("{}/{}", index_path_prefix, 
entry.index_file.file_name);
         let per_bucket = map.entry(key).or_default();
-        for (data_file_name, (offset, length)) in ranges {
+        for (data_file_name, meta) in ranges {
             per_bucket.insert(
                 data_file_name.clone(),
                 DeletionFile::new(
                     dv_path.clone(),
-                    *offset as i64,
-                    *length as i64,
-                    // todo: consider cardinality
-                    None,
+                    meta.offset as i64,
+                    meta.length as i64,
+                    meta.cardinality,
                 ),
             );
         }
@@ -513,9 +512,11 @@ impl<'a> TableScan<'a> {
 mod tests {
     use super::{group_by_overlapping_row_id, partition_matches_predicate};
     use crate::spec::{
-        stats::BinaryTableStats, ArrayType, DataField, DataFileMeta, DataType, 
Datum, IntType,
-        Predicate, PredicateBuilder, PredicateOperator, VarCharType,
+        stats::BinaryTableStats, ArrayType, DataField, DataFileMeta, DataType, 
Datum,
+        DeletionVectorMeta, FileKind, IndexFileMeta, IndexManifestEntry, 
IntType, Predicate,
+        PredicateBuilder, PredicateOperator, VarCharType,
     };
+    use crate::table::source::DeletionFile;
     use crate::Error;
     use chrono::{DateTime, Utc};
 
@@ -718,4 +719,42 @@ mod tests {
         // Sorted by descending max_sequence_number: b(3), c(2), a(1)
         assert_eq!(file_names(&groups), vec![vec!["b", "c", "a"]]);
     }
+
+    #[test]
+    fn test_build_deletion_files_map_preserves_cardinality() {
+        let entries = vec![IndexManifestEntry {
+            version: 1,
+            kind: FileKind::Add,
+            partition: vec![1, 2, 3],
+            bucket: 7,
+            index_file: IndexFileMeta {
+                index_type: "DELETION_VECTORS".into(),
+                file_name: "index-file".into(),
+                file_size: 128,
+                row_count: 1,
+                deletion_vectors_ranges: Some(indexmap::IndexMap::from([(
+                    "data-file.parquet".into(),
+                    DeletionVectorMeta {
+                        offset: 11,
+                        length: 22,
+                        cardinality: Some(33),
+                    },
+                )])),
+            },
+        }];
+
+        let map = super::build_deletion_files_map(&entries, "file:/tmp/table");
+
+        let by_bucket = map
+            .get(&super::PartitionBucket::new(vec![1, 2, 3], 7))
+            .expect("partition bucket should exist");
+        let deletion_file = by_bucket
+            .get("data-file.parquet")
+            .expect("deletion file should exist");
+
+        assert_eq!(
+            deletion_file,
+            &DeletionFile::new("file:/tmp/table/index/index-file".into(), 11, 
22, Some(33))
+        );
+    }
 }
diff --git 
a/crates/paimon/tests/fixtures/manifest/index-manifest-7e816ed9-9f3b-4786-9985-8937d4e07b6e-0
 
b/crates/paimon/tests/fixtures/manifest/index-manifest-7e816ed9-9f3b-4786-9985-8937d4e07b6e-0
new file mode 100644
index 0000000..e4256c4
Binary files /dev/null and 
b/crates/paimon/tests/fixtures/manifest/index-manifest-7e816ed9-9f3b-4786-9985-8937d4e07b6e-0
 differ
diff --git 
a/crates/paimon/tests/fixtures/manifest/index-manifest-85cc6729-f5af-431a-a1c3-ef45319328fb-0
 
b/crates/paimon/tests/fixtures/manifest/index-manifest-85cc6729-f5af-431a-a1c3-ef45319328fb-0
deleted file mode 100644
index a24063c..0000000
Binary files 
a/crates/paimon/tests/fixtures/manifest/index-manifest-85cc6729-f5af-431a-a1c3-ef45319328fb-0
 and /dev/null differ

Reply via email to