This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 38830ef feat(scan): support deletion vector cardinality (#200)
38830ef is described below
commit 38830ef279500f403ade34411de1316c7d0893d9
Author: yuxia Luo <[email protected]>
AuthorDate: Fri Apr 3 19:43:26 2026 +0800
feat(scan): support deletion vector cardinality (#200)
---
crates/paimon/src/spec/index_file_meta.rs | 39 ++++++++---
crates/paimon/src/spec/index_manifest.rs | 72 +++++++++------------
crates/paimon/src/table/source.rs | 12 ----
crates/paimon/src/table/table_scan.rs | 53 +++++++++++++--
...manifest-7e816ed9-9f3b-4786-9985-8937d4e07b6e-0 | Bin 0 -> 965 bytes
...manifest-85cc6729-f5af-431a-a1c3-ef45319328fb-0 | Bin 899 -> 0 bytes
6 files changed, 108 insertions(+), 68 deletions(-)
diff --git a/crates/paimon/src/spec/index_file_meta.rs
b/crates/paimon/src/spec/index_file_meta.rs
index 16407f2..5c78c4a 100644
--- a/crates/paimon/src/spec/index_file_meta.rs
+++ b/crates/paimon/src/spec/index_file_meta.rs
@@ -20,6 +20,13 @@ use std::fmt::{Display, Formatter};
use indexmap::IndexMap;
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct DeletionVectorMeta {
+ pub offset: i32,
+ pub length: i32,
+ pub cardinality: Option<i64>,
+}
+
/// Metadata of index file.
///
/// Impl Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java>
@@ -44,7 +51,7 @@ pub struct IndexFileMeta {
rename = "_DELETIONS_VECTORS_RANGES",
alias = "_DELETION_VECTORS_RANGES"
)]
- pub deletion_vectors_ranges: Option<IndexMap<String, (i32, i32)>>,
+ pub deletion_vectors_ranges: Option<IndexMap<String, DeletionVectorMeta>>,
}
impl Display for IndexFileMeta {
@@ -65,26 +72,31 @@ mod map_serde {
use indexmap::IndexMap;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
+ use super::DeletionVectorMeta;
+
#[derive(Deserialize, Serialize)]
struct Temp {
f0: String,
f1: i32,
f2: i32,
+ #[serde(default, rename = "_CARDINALITY")]
+ cardinality: Option<i64>,
}
pub fn serialize<S>(
- date: &Option<IndexMap<String, (i32, i32)>>,
+ data: &Option<IndexMap<String, DeletionVectorMeta>>,
s: S,
) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
- match *date {
+ match data {
None => s.serialize_none(),
- Some(ref d) => s.collect_seq(d.iter().map(|(s, (i1, i2))| Temp {
- f0: s.into(),
- f1: *i1,
- f2: *i2,
+ Some(d) => s.collect_seq(d.iter().map(|(path, meta)| Temp {
+ f0: path.clone(),
+ f1: meta.offset,
+ f2: meta.length,
+ cardinality: meta.cardinality,
})),
}
}
@@ -92,7 +104,7 @@ mod map_serde {
#[allow(clippy::type_complexity)]
pub fn deserialize<'de, D>(
deserializer: D,
- ) -> Result<Option<IndexMap<String, (i32, i32)>>, D::Error>
+ ) -> Result<Option<IndexMap<String, DeletionVectorMeta>>, D::Error>
where
D: Deserializer<'de>,
{
@@ -100,7 +112,16 @@ mod map_serde {
None => Ok(None),
Some::<Vec<Temp>>(s) => Ok(Some(
s.into_iter()
- .map(|t| (t.f0, (t.f1, t.f2)))
+ .map(|t| {
+ (
+ t.f0,
+ DeletionVectorMeta {
+ offset: t.f1,
+ length: t.f2,
+ cardinality: t.cardinality,
+ },
+ )
+ })
.collect::<IndexMap<_, _>>(),
)),
}
diff --git a/crates/paimon/src/spec/index_manifest.rs
b/crates/paimon/src/spec/index_manifest.rs
index 6215030..3131160 100644
--- a/crates/paimon/src/spec/index_manifest.rs
+++ b/crates/paimon/src/spec/index_manifest.rs
@@ -90,54 +90,38 @@ mod tests {
use indexmap::IndexMap;
use super::*;
+ use crate::spec::DeletionVectorMeta;
#[test]
fn test_read_index_manifest_file() {
let workdir =
std::env::current_dir().unwrap_or_else(|err| panic!("current_dir
must exist: {err}"));
let path = workdir
-
.join("tests/fixtures/manifest/index-manifest-85cc6729-f5af-431a-a1c3-ef45319328fb-0");
+
.join("tests/fixtures/manifest/index-manifest-7e816ed9-9f3b-4786-9985-8937d4e07b6e-0");
let source = std::fs::read(path.to_str().unwrap()).unwrap();
- let mut reader =
-
serde_avro_fast::object_container_file_encoding::Reader::from_slice(source.as_slice())
- .unwrap();
- let res: Vec<_> = reader
- .deserialize::<IndexManifestEntry>()
- .collect::<Result<_, _>>()
- .unwrap();
+ let res = IndexManifest::read_from_bytes(&source).unwrap();
assert_eq!(
res,
- vec![
- IndexManifestEntry {
- version: 1,
- kind: FileKind::Add,
- partition: vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
- bucket: 0,
- index_file: IndexFileMeta {
- index_type: "HASH".into(),
- file_name:
"index-a984b43a-c3fb-40b4-ad29-536343c239a6-0".into(),
- file_size: 16,
- row_count: 4,
- deletion_vectors_ranges: None,
- }
- },
- IndexManifestEntry {
- version: 1,
- kind: FileKind::Add,
- partition: vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
- bucket: 0,
- index_file: IndexFileMeta {
- index_type: "DELETION_VECTORS".into(),
- file_name:
"index-3f0986c5-4398-449b-be82-95f019d7a748-0".into(),
- file_size: 33,
- row_count: 1,
- deletion_vectors_ranges: Some(IndexMap::from([(
-
"data-9b76122c-6bb5-4952-a946-b5bce29694a1-0.orc".into(),
- (1, 24)
- )])),
- }
+ vec![IndexManifestEntry {
+ version: 1,
+ kind: FileKind::Add,
+ partition: vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
+ bucket: 0,
+ index_file: IndexFileMeta {
+ index_type: "DELETION_VECTORS".into(),
+ file_name:
"index-4326356b-aad7-4fd8-9d88-2bb6993c8ce9-0".into(),
+ file_size: 35,
+ row_count: 1,
+ deletion_vectors_ranges: Some(IndexMap::from([(
+
"data-a989fc44-a361-42c2-801f-e50baba95a92-0.parquet".into(),
+ DeletionVectorMeta {
+ offset: 1,
+ length: 26,
+ cardinality: Some(3),
+ }
+ )])),
}
- ]
+ }]
);
}
@@ -153,7 +137,14 @@ mod tests {
file_name: "test1".into(),
file_size: 33,
row_count: 1,
- deletion_vectors_ranges: Some(IndexMap::from([("test1".into(),
(1, 24))])),
+ deletion_vectors_ranges: Some(IndexMap::from([(
+ "test1".into(),
+ DeletionVectorMeta {
+ offset: 1,
+ length: 24,
+ cardinality: Some(7),
+ },
+ )])),
},
};
@@ -180,7 +171,8 @@ mod tests {
"fields": [
{"name": "f0", "type": "string"},
{"name": "f1", "type": "int"},
- {"name": "f2", "type": "int"}
+ {"name": "f2", "type": "int"},
+ {"name": "_CARDINALITY", "type": ["null",
"long"], "default": null}
]
}]
}]
diff --git a/crates/paimon/src/table/source.rs
b/crates/paimon/src/table/source.rs
index 9e96b5b..689793c 100644
--- a/crates/paimon/src/table/source.rs
+++ b/crates/paimon/src/table/source.rs
@@ -168,18 +168,6 @@ impl DataSplit {
format!("{}/{}", base, file.file_name)
}
- /// Iterate over each data file in this split, yielding `(path,
&DataFileMeta)`.
- /// Use this to read each data file one by one (e.g. in ArrowReader).
- pub fn data_file_entries(&self) -> impl Iterator<Item = (String,
&DataFileMeta)> + '_ {
- let base = self.bucket_path.trim_end_matches('/');
- // todo: consider partition table
- // todo: consider external path
- self.data_files.iter().map(move |file| {
- let path = format!("{}/{}", base, file.file_name);
- (path, file)
- })
- }
-
/// Total row count of all data files in this split.
pub fn row_count(&self) -> i64 {
self.data_files.iter().map(|f| f.row_count).sum()
diff --git a/crates/paimon/src/table/table_scan.rs
b/crates/paimon/src/table/table_scan.rs
index 027dd75..1c1285d 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -113,15 +113,14 @@ fn build_deletion_files_map(
let key = PartitionBucket::new(entry.partition.clone(), entry.bucket);
let dv_path = format!("{}/{}", index_path_prefix,
entry.index_file.file_name);
let per_bucket = map.entry(key).or_default();
- for (data_file_name, (offset, length)) in ranges {
+ for (data_file_name, meta) in ranges {
per_bucket.insert(
data_file_name.clone(),
DeletionFile::new(
dv_path.clone(),
- *offset as i64,
- *length as i64,
- // todo: consider cardinality
- None,
+ meta.offset as i64,
+ meta.length as i64,
+ meta.cardinality,
),
);
}
@@ -513,9 +512,11 @@ impl<'a> TableScan<'a> {
mod tests {
use super::{group_by_overlapping_row_id, partition_matches_predicate};
use crate::spec::{
- stats::BinaryTableStats, ArrayType, DataField, DataFileMeta, DataType,
Datum, IntType,
- Predicate, PredicateBuilder, PredicateOperator, VarCharType,
+ stats::BinaryTableStats, ArrayType, DataField, DataFileMeta, DataType,
Datum,
+ DeletionVectorMeta, FileKind, IndexFileMeta, IndexManifestEntry,
IntType, Predicate,
+ PredicateBuilder, PredicateOperator, VarCharType,
};
+ use crate::table::source::DeletionFile;
use crate::Error;
use chrono::{DateTime, Utc};
@@ -718,4 +719,42 @@ mod tests {
// Sorted by descending max_sequence_number: b(3), c(2), a(1)
assert_eq!(file_names(&groups), vec![vec!["b", "c", "a"]]);
}
+
+ #[test]
+ fn test_build_deletion_files_map_preserves_cardinality() {
+ let entries = vec![IndexManifestEntry {
+ version: 1,
+ kind: FileKind::Add,
+ partition: vec![1, 2, 3],
+ bucket: 7,
+ index_file: IndexFileMeta {
+ index_type: "DELETION_VECTORS".into(),
+ file_name: "index-file".into(),
+ file_size: 128,
+ row_count: 1,
+ deletion_vectors_ranges: Some(indexmap::IndexMap::from([(
+ "data-file.parquet".into(),
+ DeletionVectorMeta {
+ offset: 11,
+ length: 22,
+ cardinality: Some(33),
+ },
+ )])),
+ },
+ }];
+
+ let map = super::build_deletion_files_map(&entries, "file:/tmp/table");
+
+ let by_bucket = map
+ .get(&super::PartitionBucket::new(vec![1, 2, 3], 7))
+ .expect("partition bucket should exist");
+ let deletion_file = by_bucket
+ .get("data-file.parquet")
+ .expect("deletion file should exist");
+
+ assert_eq!(
+ deletion_file,
+ &DeletionFile::new("file:/tmp/table/index/index-file".into(), 11,
22, Some(33))
+ );
+ }
}
diff --git
a/crates/paimon/tests/fixtures/manifest/index-manifest-7e816ed9-9f3b-4786-9985-8937d4e07b6e-0
b/crates/paimon/tests/fixtures/manifest/index-manifest-7e816ed9-9f3b-4786-9985-8937d4e07b6e-0
new file mode 100644
index 0000000..e4256c4
Binary files /dev/null and
b/crates/paimon/tests/fixtures/manifest/index-manifest-7e816ed9-9f3b-4786-9985-8937d4e07b6e-0
differ
diff --git
a/crates/paimon/tests/fixtures/manifest/index-manifest-85cc6729-f5af-431a-a1c3-ef45319328fb-0
b/crates/paimon/tests/fixtures/manifest/index-manifest-85cc6729-f5af-431a-a1c3-ef45319328fb-0
deleted file mode 100644
index a24063c..0000000
Binary files
a/crates/paimon/tests/fixtures/manifest/index-manifest-85cc6729-f5af-431a-a1c3-ef45319328fb-0
and /dev/null differ