This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new db71611 feat(table): Add commit pipeline with SnapshotCommit
abstraction (#233)
db71611 is described below
commit db716113c50103430fbad85e33bd273dea886a89
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Apr 10 17:10:29 2026 +0800
feat(table): Add commit pipeline with SnapshotCommit abstraction (#233)
Implement the table write and commit infrastructure, including:
- SnapshotCommit trait with RenamingSnapshotCommit (filesystem) and
RESTSnapshotCommit (REST catalog) implementations
- TableCommit with retry logic, append/overwrite/truncate support,
partition statistics generation, and row tracking
- WriteBuilder as the entry point for creating TableCommit instances,
with overwrite mode configured at construction time
- RESTEnv to hold REST catalog context (API client, identifier, uuid)
- CommitMessage, PartitionStatistics, and ManifestList types
- SnapshotManager extensions for atomic snapshot commit and latest hint
- BinaryRow write_datum and datums_to_binary_row utilities
- CoreOptions accessors for bucket, commit retry, and row tracking
Reference: pypaimon commit pipeline
---
.../datafusion/src/physical_plan/scan.rs | 3 +
crates/paimon/src/api/resource_paths.rs | 12 +
crates/paimon/src/api/rest_api.rs | 30 +-
crates/paimon/src/catalog/filesystem.rs | 1 +
crates/paimon/src/catalog/rest/rest_catalog.rs | 21 +-
crates/paimon/src/lib.rs | 5 +-
crates/paimon/src/spec/binary_row.rs | 149 +++
crates/paimon/src/spec/core_options.rs | 85 ++
crates/paimon/src/spec/manifest.rs | 21 +-
crates/paimon/src/spec/manifest_entry.rs | 77 +-
crates/paimon/src/spec/manifest_file_meta.rs | 24 +
crates/paimon/src/spec/manifest_list.rs | 120 +++
crates/paimon/src/spec/mod.rs | 5 +
crates/paimon/src/spec/objects_file.rs | 104 ++-
.../src/{lib.rs => spec/partition_statistics.rs} | 45 +-
crates/paimon/src/spec/schema.rs | 7 +
crates/paimon/src/spec/snapshot.rs | 16 +
.../paimon/src/{lib.rs => table/commit_message.rs} | 50 +-
crates/paimon/src/table/mod.rs | 21 +
crates/paimon/src/table/read_builder.rs | 6 +
crates/paimon/src/table/rest_env.rs | 61 ++
crates/paimon/src/table/snapshot_commit.rs | 98 ++
crates/paimon/src/table/snapshot_manager.rs | 155 +++-
crates/paimon/src/table/table_commit.rs | 995 +++++++++++++++++++++
crates/paimon/src/table/table_scan.rs | 48 +-
crates/paimon/src/table/write_builder.rs | 63 ++
crates/paimon/src/tantivy/directory.rs | 11 +-
27 files changed, 2121 insertions(+), 112 deletions(-)
diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs
b/crates/integrations/datafusion/src/physical_plan/scan.rs
index 57ecfa4..a1c35a8 100644
--- a/crates/integrations/datafusion/src/physical_plan/scan.rs
+++ b/crates/integrations/datafusion/src/physical_plan/scan.rs
@@ -248,6 +248,7 @@ mod tests {
use paimon::spec::{
BinaryRow, DataType, Datum, IntType, PredicateBuilder, Schema as
PaimonSchema, TableSchema,
};
+ use paimon::table::Table;
use std::fs;
use tempfile::tempdir;
use test_utils::{local_file_path, test_data_file, write_int_parquet_file};
@@ -298,6 +299,7 @@ mod tests {
Identifier::new("test_db", "test_table"),
"/tmp/test-table".to_string(),
table_schema,
+ None,
)
}
@@ -329,6 +331,7 @@ mod tests {
Identifier::new("default", "t"),
table_path,
table_schema,
+ None,
);
let split = paimon::DataSplitBuilder::new()
diff --git a/crates/paimon/src/api/resource_paths.rs
b/crates/paimon/src/api/resource_paths.rs
index 1cbc88f..9345310 100644
--- a/crates/paimon/src/api/resource_paths.rs
+++ b/crates/paimon/src/api/resource_paths.rs
@@ -131,6 +131,18 @@ impl ResourcePaths {
pub fn rename_table(&self) -> String {
format!("{}/{}/rename", self.base_path, Self::TABLES)
}
+
+ /// Get the commit table endpoint path.
+ pub fn commit_table(&self, database_name: &str, table_name: &str) ->
String {
+ format!(
+ "{}/{}/{}/{}/{}/commit",
+ self.base_path,
+ Self::DATABASES,
+ RESTUtil::encode_string(database_name),
+ Self::TABLES,
+ RESTUtil::encode_string(table_name)
+ )
+ }
}
#[cfg(test)]
diff --git a/crates/paimon/src/api/rest_api.rs
b/crates/paimon/src/api/rest_api.rs
index 00a32b8..8a3416e 100644
--- a/crates/paimon/src/api/rest_api.rs
+++ b/crates/paimon/src/api/rest_api.rs
@@ -25,7 +25,7 @@ use std::collections::HashMap;
use crate::api::rest_client::HttpClient;
use crate::catalog::Identifier;
use crate::common::{CatalogOptions, Options};
-use crate::spec::Schema;
+use crate::spec::{PartitionStatistics, Schema, Snapshot};
use crate::Result;
use super::api_request::{
@@ -391,4 +391,32 @@ impl RESTApi {
let path = self.resource_paths.table_token(database, table);
self.client.get(&path, None::<&[(&str, &str)]>).await
}
+
+ // ==================== Commit Operations ====================
+
+ /// Commit a snapshot for a table.
+ ///
+ /// Corresponds to Python `RESTApi.commit_snapshot`.
+ pub async fn commit_snapshot(
+ &self,
+ identifier: &Identifier,
+ table_uuid: &str,
+ snapshot: &Snapshot,
+ statistics: &[PartitionStatistics],
+ ) -> Result<bool> {
+ let database = identifier.database();
+ let table = identifier.object();
+ validate_non_empty_multi(&[(database, "database name"), (table, "table
name")])?;
+ let path = self.resource_paths.commit_table(database, table);
+ let request = serde_json::json!({
+ "tableUuid": table_uuid,
+ "snapshot": snapshot,
+ "statistics": statistics,
+ });
+ let resp: serde_json::Value = self.client.post(&path, &request).await?;
+ Ok(resp
+ .get("success")
+ .and_then(|v| v.as_bool())
+ .unwrap_or(false))
+ }
}
diff --git a/crates/paimon/src/catalog/filesystem.rs
b/crates/paimon/src/catalog/filesystem.rs
index 313b6ef..a061e8a 100644
--- a/crates/paimon/src/catalog/filesystem.rs
+++ b/crates/paimon/src/catalog/filesystem.rs
@@ -327,6 +327,7 @@ impl Catalog for FileSystemCatalog {
identifier.clone(),
table_path,
schema,
+ None,
))
}
diff --git a/crates/paimon/src/catalog/rest/rest_catalog.rs
b/crates/paimon/src/catalog/rest/rest_catalog.rs
index e5d023f..1da2aa1 100644
--- a/crates/paimon/src/catalog/rest/rest_catalog.rs
+++ b/crates/paimon/src/catalog/rest/rest_catalog.rs
@@ -21,6 +21,7 @@
//! a Paimon REST catalog server for database and table CRUD operations.
use std::collections::HashMap;
+use std::sync::Arc;
use async_trait::async_trait;
@@ -32,7 +33,7 @@ use crate::common::{CatalogOptions, Options};
use crate::error::Error;
use crate::io::FileIO;
use crate::spec::{Schema, SchemaChange, TableSchema};
-use crate::table::Table;
+use crate::table::{RESTEnv, Table};
use crate::Result;
use super::rest_token_file_io::RESTTokenFileIO;
@@ -44,8 +45,8 @@ use super::rest_token_file_io::RESTTokenFileIO;
///
/// Corresponds to Python `RESTCatalog` in
`pypaimon/catalog/rest/rest_catalog.py`.
pub struct RESTCatalog {
- /// The REST API client.
- api: RESTApi,
+ /// The REST API client (shared with RESTEnv).
+ api: Arc<RESTApi>,
/// Catalog configuration options.
options: Options,
/// Warehouse path.
@@ -71,7 +72,7 @@ impl RESTCatalog {
message: format!("Missing required option: {}",
CatalogOptions::WAREHOUSE),
})?;
- let api = RESTApi::new(options.clone(), config_required).await?;
+ let api = Arc::new(RESTApi::new(options.clone(),
config_required).await?);
let data_token_enabled = api
.options()
@@ -232,6 +233,15 @@ impl Catalog for RESTCatalog {
source: None,
})?;
+ // Extract table uuid for RESTEnv
+ let uuid = response.id.ok_or_else(|| Error::DataInvalid {
+ message: format!(
+ "Table {} response missing id (uuid)",
+ identifier.full_name()
+ ),
+ source: None,
+ })?;
+
// Build FileIO based on data_token_enabled and is_external
// TODO Support token cache and direct oss access
let file_io = if self.data_token_enabled && !is_external {
@@ -244,11 +254,14 @@ impl Catalog for RESTCatalog {
FileIO::from_path(&table_path)?.build()?
};
+ let rest_env = RESTEnv::new(identifier.clone(), uuid,
self.api.clone());
+
Ok(Table::new(
file_io,
identifier.clone(),
table_path,
table_schema,
+ Some(rest_env),
))
}
diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs
index a68ba6b..3867d69 100644
--- a/crates/paimon/src/lib.rs
+++ b/crates/paimon/src/lib.rs
@@ -42,6 +42,7 @@ pub use catalog::CatalogFactory;
pub use catalog::FileSystemCatalog;
pub use table::{
- DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan,
ReadBuilder, RowRange,
- SnapshotManager, Table, TableRead, TableScan, TagManager,
+ CommitMessage, DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket,
Plan, RESTEnv,
+ RESTSnapshotCommit, ReadBuilder, RenamingSnapshotCommit, RowRange,
SnapshotCommit,
+ SnapshotManager, Table, TableCommit, TableRead, TableScan, TagManager,
WriteBuilder,
};
diff --git a/crates/paimon/src/spec/binary_row.rs
b/crates/paimon/src/spec/binary_row.rs
index 1621476..0ba6b78 100644
--- a/crates/paimon/src/spec/binary_row.rs
+++ b/crates/paimon/src/spec/binary_row.rs
@@ -19,6 +19,7 @@
//! and BinaryRowBuilder for constructing BinaryRow instances.
use crate::spec::murmur_hash::hash_by_words;
+use crate::spec::{DataType, Datum};
use serde::{Deserialize, Serialize};
pub const EMPTY_BINARY_ROW: BinaryRow = BinaryRow::new(0);
@@ -523,6 +524,87 @@ impl BinaryRowBuilder {
serialized.extend_from_slice(&self.data);
serialized
}
+
+ /// Write a Datum value at the given position, dispatching by type.
+ pub fn write_datum(&mut self, pos: usize, datum: &Datum, data_type:
&DataType) {
+ match datum {
+ Datum::Bool(v) => self.write_boolean(pos, *v),
+ Datum::TinyInt(v) => self.write_byte(pos, *v),
+ Datum::SmallInt(v) => self.write_short(pos, *v),
+ Datum::Int(v) | Datum::Date(v) | Datum::Time(v) =>
self.write_int(pos, *v),
+ Datum::Long(v) => self.write_long(pos, *v),
+ Datum::Float(v) => self.write_float(pos, *v),
+ Datum::Double(v) => self.write_double(pos, *v),
+ Datum::Timestamp { millis, nanos } => {
+ let precision = match data_type {
+ DataType::Timestamp(ts) => ts.precision(),
+ _ => 3,
+ };
+ if precision <= 3 {
+ self.write_timestamp_compact(pos, *millis);
+ } else {
+ self.write_timestamp_non_compact(pos, *millis, *nanos);
+ }
+ }
+ Datum::LocalZonedTimestamp { millis, nanos } => {
+ let precision = match data_type {
+ DataType::LocalZonedTimestamp(ts) => ts.precision(),
+ _ => 3,
+ };
+ if precision <= 3 {
+ self.write_timestamp_compact(pos, *millis);
+ } else {
+ self.write_timestamp_non_compact(pos, *millis, *nanos);
+ }
+ }
+ Datum::Decimal {
+ unscaled,
+ precision,
+ ..
+ } => {
+ if *precision <= 18 {
+ self.write_decimal_compact(pos, *unscaled as i64);
+ } else {
+ self.write_decimal_var_len(pos, *unscaled);
+ }
+ }
+ Datum::String(s) => {
+ if s.len() <= 7 {
+ self.write_string_inline(pos, s);
+ } else {
+ self.write_string(pos, s);
+ }
+ }
+ Datum::Bytes(b) => {
+ if b.len() <= 7 {
+ self.write_binary_inline(pos, b);
+ } else {
+ self.write_binary(pos, b);
+ }
+ }
+ }
+ }
+}
+
+/// Build a serialized BinaryRow from optional Datum values.
+/// Returns empty vec if all values are None.
+pub fn datums_to_binary_row(datums: &[(&Option<Datum>, &DataType)]) -> Vec<u8>
{
+ if datums.iter().all(|(d, _)| d.is_none()) {
+ return vec![];
+ }
+ let arity = datums.len() as i32;
+ let mut builder = BinaryRowBuilder::new(arity);
+ for (pos, (datum_opt, data_type)) in datums.iter().enumerate() {
+ match datum_opt {
+ Some(datum) => {
+ builder.write_datum(pos, datum, data_type);
+ }
+ None => {
+ builder.set_null_at(pos);
+ }
+ }
+ }
+ builder.build_serialized()
}
#[cfg(test)]
@@ -756,6 +838,73 @@ mod tests {
assert_eq!(nano, 0);
}
+ #[test]
+ fn test_write_datum_int_and_string() {
+ let mut builder = BinaryRowBuilder::new(2);
+ builder.write_datum(
+ 0,
+ &Datum::Int(42),
+ &DataType::Int(crate::spec::IntType::new()),
+ );
+ builder.write_datum(
+ 1,
+ &Datum::String("hello".to_string()),
+ &DataType::VarChar(crate::spec::VarCharType::string_type()),
+ );
+ let row = builder.build();
+ assert_eq!(row.get_int(0).unwrap(), 42);
+ assert_eq!(row.get_string(1).unwrap(), "hello");
+ }
+
+ #[test]
+ fn test_write_datum_long_string() {
+ let mut builder = BinaryRowBuilder::new(1);
+ builder.write_datum(
+ 0,
+ &Datum::String("long_string_value".to_string()),
+ &DataType::VarChar(crate::spec::VarCharType::string_type()),
+ );
+ let row = builder.build();
+ assert_eq!(row.get_string(0).unwrap(), "long_string_value");
+ }
+
+ #[test]
+ fn test_datums_to_binary_row_roundtrip() {
+ let d1 = Some(Datum::Int(100));
+ let d2 = Some(Datum::String("abc".to_string()));
+ let dt1 = DataType::Int(crate::spec::IntType::new());
+ let dt2 = DataType::VarChar(crate::spec::VarCharType::string_type());
+ let datums = vec![(&d1, &dt1), (&d2, &dt2)];
+ let bytes = datums_to_binary_row(&datums);
+ assert!(!bytes.is_empty());
+ let row = BinaryRow::from_serialized_bytes(&bytes).unwrap();
+ assert_eq!(row.get_int(0).unwrap(), 100);
+ assert_eq!(row.get_string(1).unwrap(), "abc");
+ }
+
+ #[test]
+ fn test_datums_to_binary_row_all_none() {
+ let d1: Option<Datum> = None;
+ let dt1 = DataType::Int(crate::spec::IntType::new());
+ let datums = vec![(&d1, &dt1)];
+ let bytes = datums_to_binary_row(&datums);
+ assert!(bytes.is_empty());
+ }
+
+ #[test]
+ fn test_datums_to_binary_row_mixed_null() {
+ let d1 = Some(Datum::Int(7));
+ let d2: Option<Datum> = None;
+ let dt1 = DataType::Int(crate::spec::IntType::new());
+ let dt2 = DataType::Int(crate::spec::IntType::new());
+ let datums = vec![(&d1, &dt1), (&d2, &dt2)];
+ let bytes = datums_to_binary_row(&datums);
+ assert!(!bytes.is_empty());
+ let row = BinaryRow::from_serialized_bytes(&bytes).unwrap();
+ assert_eq!(row.get_int(0).unwrap(), 7);
+ assert!(row.is_null_at(1));
+ }
+
#[test]
fn test_get_timestamp_non_compact() {
let epoch_millis: i64 = 1_704_067_200_123;
diff --git a/crates/paimon/src/spec/core_options.rs
b/crates/paimon/src/spec/core_options.rs
index 93c3fec..17993d9 100644
--- a/crates/paimon/src/spec/core_options.rs
+++ b/crates/paimon/src/spec/core_options.rs
@@ -26,6 +26,17 @@ const PARTITION_DEFAULT_NAME_OPTION: &str =
"partition.default-name";
const PARTITION_LEGACY_NAME_OPTION: &str = "partition.legacy-name";
const BUCKET_KEY_OPTION: &str = "bucket-key";
const BUCKET_FUNCTION_TYPE_OPTION: &str = "bucket-function.type";
+const BUCKET_OPTION: &str = "bucket";
+const DEFAULT_BUCKET: i32 = 1;
+const COMMIT_MAX_RETRIES_OPTION: &str = "commit.max-retries";
+const COMMIT_TIMEOUT_OPTION: &str = "commit.timeout";
+const COMMIT_MIN_RETRY_WAIT_OPTION: &str = "commit.min-retry-wait";
+const COMMIT_MAX_RETRY_WAIT_OPTION: &str = "commit.max-retry-wait";
+const ROW_TRACKING_ENABLED_OPTION: &str = "row-tracking.enabled";
+const DEFAULT_COMMIT_MAX_RETRIES: u32 = 10;
+const DEFAULT_COMMIT_TIMEOUT_MS: u64 = 120_000;
+const DEFAULT_COMMIT_MIN_RETRY_WAIT_MS: u64 = 1_000;
+const DEFAULT_COMMIT_MAX_RETRY_WAIT_MS: u64 = 10_000;
pub const SCAN_SNAPSHOT_ID_OPTION: &str = "scan.snapshot-id";
pub const SCAN_TIMESTAMP_MILLIS_OPTION: &str = "scan.timestamp-millis";
pub const SCAN_TAG_NAME_OPTION: &str = "scan.tag-name";
@@ -200,6 +211,49 @@ impl<'a> CoreOptions<'a> {
.map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
}
+ pub fn commit_max_retries(&self) -> u32 {
+ self.options
+ .get(COMMIT_MAX_RETRIES_OPTION)
+ .and_then(|v| v.parse().ok())
+ .unwrap_or(DEFAULT_COMMIT_MAX_RETRIES)
+ }
+
+ pub fn commit_timeout_ms(&self) -> u64 {
+ self.options
+ .get(COMMIT_TIMEOUT_OPTION)
+ .and_then(|v| v.parse().ok())
+ .unwrap_or(DEFAULT_COMMIT_TIMEOUT_MS)
+ }
+
+ pub fn commit_min_retry_wait_ms(&self) -> u64 {
+ self.options
+ .get(COMMIT_MIN_RETRY_WAIT_OPTION)
+ .and_then(|v| v.parse().ok())
+ .unwrap_or(DEFAULT_COMMIT_MIN_RETRY_WAIT_MS)
+ }
+
+ pub fn commit_max_retry_wait_ms(&self) -> u64 {
+ self.options
+ .get(COMMIT_MAX_RETRY_WAIT_OPTION)
+ .and_then(|v| v.parse().ok())
+ .unwrap_or(DEFAULT_COMMIT_MAX_RETRY_WAIT_MS)
+ }
+
+ pub fn row_tracking_enabled(&self) -> bool {
+ self.options
+ .get(ROW_TRACKING_ENABLED_OPTION)
+ .map(|v| v.eq_ignore_ascii_case("true"))
+ .unwrap_or(false)
+ }
+
+ /// Number of buckets for the table. Default is 1.
+ pub fn bucket(&self) -> i32 {
+ self.options
+ .get(BUCKET_OPTION)
+ .and_then(|v| v.parse().ok())
+ .unwrap_or(DEFAULT_BUCKET)
+ }
+
/// Whether the bucket function type is the default hash-based function.
///
/// Only the default function (`Math.abs(hash % numBuckets)`) is supported
@@ -363,6 +417,37 @@ mod tests {
}
}
+ #[test]
+ fn test_commit_options_defaults() {
+ let options = HashMap::new();
+ let core = CoreOptions::new(&options);
+ assert_eq!(core.bucket(), 1);
+ assert_eq!(core.commit_max_retries(), 10);
+ assert_eq!(core.commit_timeout_ms(), 120_000);
+ assert_eq!(core.commit_min_retry_wait_ms(), 1_000);
+ assert_eq!(core.commit_max_retry_wait_ms(), 10_000);
+ assert!(!core.row_tracking_enabled());
+ }
+
+ #[test]
+ fn test_commit_options_custom() {
+ let options = HashMap::from([
+ (BUCKET_OPTION.to_string(), "4".to_string()),
+ (COMMIT_MAX_RETRIES_OPTION.to_string(), "20".to_string()),
+ (COMMIT_TIMEOUT_OPTION.to_string(), "60000".to_string()),
+ (COMMIT_MIN_RETRY_WAIT_OPTION.to_string(), "500".to_string()),
+ (COMMIT_MAX_RETRY_WAIT_OPTION.to_string(), "5000".to_string()),
+ (ROW_TRACKING_ENABLED_OPTION.to_string(), "true".to_string()),
+ ]);
+ let core = CoreOptions::new(&options);
+ assert_eq!(core.bucket(), 4);
+ assert_eq!(core.commit_max_retries(), 20);
+ assert_eq!(core.commit_timeout_ms(), 60_000);
+ assert_eq!(core.commit_min_retry_wait_ms(), 500);
+ assert_eq!(core.commit_max_retry_wait_ms(), 5_000);
+ assert!(core.row_tracking_enabled());
+ }
+
#[test]
fn test_try_time_travel_selector_normalizes_valid_selector() {
let tag_options = HashMap::from([(SCAN_TAG_NAME_OPTION.to_string(),
"tag1".to_string())]);
diff --git a/crates/paimon/src/spec/manifest.rs
b/crates/paimon/src/spec/manifest.rs
index 1893a7d..84f3629 100644
--- a/crates/paimon/src/spec/manifest.rs
+++ b/crates/paimon/src/spec/manifest.rs
@@ -17,6 +17,7 @@
use crate::io::FileIO;
use crate::spec::manifest_entry::ManifestEntry;
+use crate::spec::manifest_entry::MANIFEST_ENTRY_SCHEMA;
use serde_avro_fast::object_container_file_encoding::Reader;
use snafu::ResultExt;
@@ -32,13 +33,6 @@ pub struct Manifest;
impl Manifest {
/// Read manifest entries from a file.
- ///
- /// # Arguments
- /// * `file_io` - FileIO instance for reading files
- /// * `path` - Path to the manifest file
- ///
- /// # Returns
- /// A vector of ManifestEntry records
pub async fn read(file_io: &FileIO, path: &str) ->
Result<Vec<ManifestEntry>> {
let input_file = file_io.new_input(path)?;
@@ -51,12 +45,6 @@ impl Manifest {
}
/// Read manifest entries from bytes.
- ///
- /// # Arguments
- /// * `bytes` - Avro-encoded manifest file content
- ///
- /// # Returns
- /// A vector of ManifestEntry records
fn read_from_bytes(bytes: &[u8]) -> Result<Vec<ManifestEntry>> {
let mut reader =
Reader::from_slice(bytes).whatever_context::<_,
crate::Error>("read manifest avro")?;
@@ -65,6 +53,13 @@ impl Manifest {
.collect::<std::result::Result<Vec<_>, _>>()
.whatever_context::<_, crate::Error>("deserialize manifest entry")
}
+
+ /// Write manifest entries to a file.
+ pub async fn write(file_io: &FileIO, path: &str, entries:
&[ManifestEntry]) -> Result<()> {
+ let bytes = crate::spec::to_avro_bytes(MANIFEST_ENTRY_SCHEMA,
entries)?;
+ let output = file_io.new_output(path)?;
+ output.write(bytes::Bytes::from(bytes)).await
+ }
}
#[cfg(test)]
diff --git a/crates/paimon/src/spec/manifest_entry.rs
b/crates/paimon/src/spec/manifest_entry.rs
index d7aa32c..cbe295b 100644
--- a/crates/paimon/src/spec/manifest_entry.rs
+++ b/crates/paimon/src/spec/manifest_entry.rs
@@ -35,7 +35,7 @@ pub struct Identifier {
/// Entry of a manifest file, representing an addition / deletion of a data
file.
/// Impl Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java>
-#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ManifestEntry {
#[serde(rename = "_KIND")]
kind: FileKind,
@@ -124,4 +124,79 @@ impl ManifestEntry {
version,
}
}
+
+ /// Return a copy with a different kind.
+ pub fn with_kind(mut self, kind: FileKind) -> Self {
+ self.kind = kind;
+ self
+ }
+
+ /// Return a copy with sequence numbers set on the file.
+ pub fn with_sequence_number(mut self, min_seq: i64, max_seq: i64) -> Self {
+ self.file.min_sequence_number = min_seq;
+ self.file.max_sequence_number = max_seq;
+ self
+ }
+
+ /// Return a copy with first_row_id set on the file.
+ pub fn with_first_row_id(mut self, first_row_id: i64) -> Self {
+ self.file.first_row_id = Some(first_row_id);
+ self
+ }
}
+
+/// Avro schema for ManifestEntry (used in manifest files).
+pub const MANIFEST_ENTRY_SCHEMA: &str = r#"["null", {
+ "type": "record",
+ "name": "record",
+ "namespace": "org.apache.paimon.avro.generated",
+ "fields": [
+ {"name": "_KIND", "type": "int"},
+ {"name": "_PARTITION", "type": "bytes"},
+ {"name": "_BUCKET", "type": "int"},
+ {"name": "_TOTAL_BUCKETS", "type": "int"},
+ {"name": "_FILE", "type": ["null", {
+ "type": "record",
+ "name": "record__FILE",
+ "fields": [
+ {"name": "_FILE_NAME", "type": "string"},
+ {"name": "_FILE_SIZE", "type": "long"},
+ {"name": "_ROW_COUNT", "type": "long"},
+ {"name": "_MIN_KEY", "type": "bytes"},
+ {"name": "_MAX_KEY", "type": "bytes"},
+ {"name": "_KEY_STATS", "type": ["null", {
+ "type": "record",
+ "name": "record__FILE__KEY_STATS",
+ "fields": [
+ {"name": "_MIN_VALUES", "type": "bytes"},
+ {"name": "_MAX_VALUES", "type": "bytes"},
+ {"name": "_NULL_COUNTS", "type": ["null", {"type":
"array", "items": ["null", "long"]}], "default": null}
+ ]
+ }], "default": null},
+ {"name": "_VALUE_STATS", "type": ["null", {
+ "type": "record",
+ "name": "record__FILE__VALUE_STATS",
+ "fields": [
+ {"name": "_MIN_VALUES", "type": "bytes"},
+ {"name": "_MAX_VALUES", "type": "bytes"},
+ {"name": "_NULL_COUNTS", "type": ["null", {"type":
"array", "items": ["null", "long"]}], "default": null}
+ ]
+ }], "default": null},
+ {"name": "_MIN_SEQUENCE_NUMBER", "type": "long"},
+ {"name": "_MAX_SEQUENCE_NUMBER", "type": "long"},
+ {"name": "_SCHEMA_ID", "type": "long"},
+ {"name": "_LEVEL", "type": "int"},
+ {"name": "_EXTRA_FILES", "type": {"type": "array", "items":
"string"}},
+ {"name": "_CREATION_TIME", "type": ["null", {"type": "long",
"logicalType": "timestamp-millis"}], "default": null},
+ {"name": "_DELETE_ROW_COUNT", "type": ["null", "long"],
"default": null},
+ {"name": "_EMBEDDED_FILE_INDEX", "type": ["null", "bytes"],
"default": null},
+ {"name": "_FILE_SOURCE", "type": ["null", "int"], "default":
null},
+ {"name": "_VALUE_STATS_COLS", "type": ["null", {"type":
"array", "items": "string"}], "default": null},
+ {"name": "_EXTERNAL_PATH", "type": ["null", "string"],
"default": null},
+ {"name": "_FIRST_ROW_ID", "type": ["null", "long"], "default":
null},
+ {"name": "_WRITE_COLS", "type": ["null", {"type": "array",
"items": "string"}], "default": null}
+ ]
+ }], "default": null},
+ {"name": "_VERSION", "type": "int"}
+ ]
+}]"#;
diff --git a/crates/paimon/src/spec/manifest_file_meta.rs
b/crates/paimon/src/spec/manifest_file_meta.rs
index 36f92b9..b74389e 100644
--- a/crates/paimon/src/spec/manifest_file_meta.rs
+++ b/crates/paimon/src/spec/manifest_file_meta.rs
@@ -115,6 +115,30 @@ impl ManifestFileMeta {
}
}
+/// Avro schema for ManifestFileMeta (used in manifest-list files).
+pub const MANIFEST_FILE_META_SCHEMA: &str = r#"["null", {
+ "type": "record",
+ "name": "record",
+ "namespace": "org.apache.paimon.avro.generated",
+ "fields": [
+ {"name": "_VERSION", "type": "int"},
+ {"name": "_FILE_NAME", "type": "string"},
+ {"name": "_FILE_SIZE", "type": "long"},
+ {"name": "_NUM_ADDED_FILES", "type": "long"},
+ {"name": "_NUM_DELETED_FILES", "type": "long"},
+ {"name": "_PARTITION_STATS", "type": ["null", {
+ "type": "record",
+ "name": "record__PARTITION_STATS",
+ "fields": [
+ {"name": "_MIN_VALUES", "type": "bytes"},
+ {"name": "_MAX_VALUES", "type": "bytes"},
+ {"name": "_NULL_COUNTS", "type": ["null", {"type": "array",
"items": ["null", "long"]}], "default": null}
+ ]
+ }], "default": null},
+ {"name": "_SCHEMA_ID", "type": "long"}
+ ]
+}]"#;
+
impl Display for ManifestFileMeta {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
diff --git a/crates/paimon/src/spec/manifest_list.rs
b/crates/paimon/src/spec/manifest_list.rs
new file mode 100644
index 0000000..050edf4
--- /dev/null
+++ b/crates/paimon/src/spec/manifest_list.rs
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::io::FileIO;
+use crate::spec::manifest_file_meta::MANIFEST_FILE_META_SCHEMA;
+use crate::spec::ManifestFileMeta;
+use crate::Result;
+
+/// Manifest list file reader and writer.
+///
+/// A manifest list file contains a list of ManifestFileMeta records in Avro
format.
+/// Each record describes a manifest file.
+///
+/// Impl Reference:
<https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java>
+pub struct ManifestList;
+
+impl ManifestList {
+ /// Read manifest file metas from a manifest list file.
+ pub async fn read(file_io: &FileIO, path: &str) ->
Result<Vec<ManifestFileMeta>> {
+ let input = file_io.new_input(path)?;
+ if !input.exists().await? {
+ return Ok(Vec::new());
+ }
+ let content = input.read().await?;
+ crate::spec::from_avro_bytes(&content)
+ }
+
+ /// Write manifest file metas to a manifest list file.
+ pub async fn write(file_io: &FileIO, path: &str, metas:
&[ManifestFileMeta]) -> Result<()> {
+ let bytes = crate::spec::to_avro_bytes(MANIFEST_FILE_META_SCHEMA,
metas)?;
+ let output = file_io.new_output(path)?;
+ output.write(bytes::Bytes::from(bytes)).await
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::io::FileIOBuilder;
+ use crate::spec::stats::BinaryTableStats;
+
+ fn test_file_io() -> FileIO {
+ FileIOBuilder::new("memory").build().unwrap()
+ }
+
+ #[tokio::test]
+ async fn test_manifest_list_roundtrip() {
+ let file_io = test_file_io();
+ let path = "memory:/test_manifest_list_roundtrip/manifest-list-0";
+ file_io
+ .mkdirs("memory:/test_manifest_list_roundtrip/")
+ .await
+ .unwrap();
+
+ let value_bytes = vec![
+ 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 49, 0,
0, 0, 0, 0, 0, 129,
+ ];
+ let original = vec![
+ ManifestFileMeta::new(
+ "manifest-a".to_string(),
+ 1024,
+ 5,
+ 2,
+ BinaryTableStats::new(value_bytes.clone(),
value_bytes.clone(), vec![Some(1)]),
+ 0,
+ ),
+ ManifestFileMeta::new(
+ "manifest-b".to_string(),
+ 2048,
+ 10,
+ 0,
+ BinaryTableStats::new(value_bytes.clone(),
value_bytes.clone(), vec![Some(3)]),
+ 1,
+ ),
+ ];
+
+ ManifestList::write(&file_io, path, &original)
+ .await
+ .unwrap();
+ let decoded = ManifestList::read(&file_io, path).await.unwrap();
+ assert_eq!(original, decoded);
+ }
+
+ #[tokio::test]
+ async fn test_manifest_list_read_nonexistent() {
+ let file_io = test_file_io();
+ let result = ManifestList::read(&file_io,
"memory:/nonexistent/manifest-list")
+ .await
+ .unwrap();
+ assert!(result.is_empty());
+ }
+
+ #[tokio::test]
+ async fn test_manifest_list_write_empty() {
+ let file_io = test_file_io();
+ let path = "memory:/test_manifest_list_empty/manifest-list-0";
+ file_io
+ .mkdirs("memory:/test_manifest_list_empty/")
+ .await
+ .unwrap();
+
+ ManifestList::write(&file_io, path, &[]).await.unwrap();
+ let decoded = ManifestList::read(&file_io, path).await.unwrap();
+ assert!(decoded.is_empty());
+ }
+}
diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs
index 50402a3..f30df9f 100644
--- a/crates/paimon/src/spec/mod.rs
+++ b/crates/paimon/src/spec/mod.rs
@@ -53,8 +53,11 @@ pub use manifest_common::FileKind;
mod manifest_entry;
pub use manifest_entry::Identifier;
pub use manifest_entry::ManifestEntry;
+mod manifest_list;
+pub use manifest_list::ManifestList;
mod objects_file;
pub use objects_file::from_avro_bytes;
+pub use objects_file::to_avro_bytes;
pub(crate) mod stats;
mod types;
pub use types::*;
@@ -67,3 +70,5 @@ pub use predicate::{
field_idx_to_partition_idx, Datum, Predicate, PredicateBuilder,
PredicateOperator,
};
pub(crate) mod murmur_hash;
+mod partition_statistics;
+pub use partition_statistics::PartitionStatistics;
diff --git a/crates/paimon/src/spec/objects_file.rs
b/crates/paimon/src/spec/objects_file.rs
index 171de1d..864ea00 100644
--- a/crates/paimon/src/spec/objects_file.rs
+++ b/crates/paimon/src/spec/objects_file.rs
@@ -16,7 +16,8 @@
// under the License.
use serde::de::DeserializeOwned;
-use serde_avro_fast::object_container_file_encoding::Reader;
+use serde::Serialize;
+use serde_avro_fast::object_container_file_encoding::{Compression, Reader};
use snafu::ResultExt;
pub fn from_avro_bytes<T: DeserializeOwned>(bytes: &[u8]) ->
crate::Result<Vec<T>> {
@@ -28,15 +29,112 @@ pub fn from_avro_bytes<T: DeserializeOwned>(bytes: &[u8])
-> crate::Result<Vec<T
.whatever_context::<_, crate::Error>("deserialize avro records")
}
+/// Serialize records into Avro Object Container File bytes.
+///
+/// The `schema_json` must be a valid Avro schema JSON string that matches
+/// the serde serialization layout of `T`.
+pub fn to_avro_bytes<T: Serialize>(schema_json: &str, records: &[T]) ->
crate::Result<Vec<u8>> {
+ let schema: serde_avro_fast::Schema =
+ schema_json
+ .parse()
+ .map_err(
+ |e: serde_avro_fast::schema::SchemaError|
crate::Error::DataInvalid {
+ message: format!("invalid avro schema: {e}"),
+ source: Some(Box::new(e)),
+ },
+ )?;
+ serde_avro_fast::object_container_file_encoding::write_all(
+ &schema,
+ Compression::Null,
+ Vec::new(),
+ records.iter(),
+ )
+ .map_err(|e| crate::Error::DataInvalid {
+ message: format!("avro serialization failed: {e}"),
+ source: Some(Box::new(e)),
+ })
+}
+
#[cfg(test)]
mod tests {
+ use super::*;
use crate::spec::manifest_common::FileKind;
- use crate::spec::manifest_entry::ManifestEntry;
- use crate::spec::objects_file::from_avro_bytes;
+ use crate::spec::manifest_entry::{ManifestEntry, MANIFEST_ENTRY_SCHEMA};
+ use crate::spec::manifest_file_meta::MANIFEST_FILE_META_SCHEMA;
use crate::spec::stats::BinaryTableStats;
use crate::spec::{DataFileMeta, ManifestFileMeta};
use chrono::{DateTime, Utc};
+ #[test]
+ fn test_roundtrip_manifest_file_meta() {
+ let value_bytes = vec![
+ 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 49, 0,
0, 0, 0, 0, 0, 129,
+ ];
+ let original = vec![ManifestFileMeta::new(
+ "manifest-test-0".to_string(),
+ 1024,
+ 5,
+ 2,
+ BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(),
vec![Some(1)]),
+ 0,
+ )];
+ let bytes = to_avro_bytes(MANIFEST_FILE_META_SCHEMA,
&original).unwrap();
+ let decoded = from_avro_bytes::<ManifestFileMeta>(&bytes).unwrap();
+ assert_eq!(original, decoded);
+ }
+
+ #[test]
+ fn test_roundtrip_manifest_entry() {
+ let value_bytes = vec![
+ 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 49, 0, 0, 0, 0, 0, 0, 129, 1,
0, 0, 0, 0, 0, 0, 0,
+ ];
+ let single_value = vec![0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0,
0, 0, 0, 0, 0];
+ let original = vec![ManifestEntry::new(
+ FileKind::Add,
+ single_value.clone(),
+ 1,
+ 10,
+ DataFileMeta {
+ file_name: "test.parquet".to_string(),
+ file_size: 100,
+ row_count: 50,
+ min_key: single_value.clone(),
+ max_key: single_value.clone(),
+ key_stats: BinaryTableStats::new(
+ value_bytes.clone(),
+ value_bytes.clone(),
+ vec![Some(1), Some(2)],
+ ),
+ value_stats: BinaryTableStats::new(
+ value_bytes.clone(),
+ value_bytes.clone(),
+ vec![Some(1), Some(2)],
+ ),
+ min_sequence_number: 1,
+ max_sequence_number: 50,
+ schema_id: 0,
+ level: 0,
+ extra_files: vec![],
+ creation_time: Some(
+ "2024-09-06T07:45:55.039+00:00"
+ .parse::<DateTime<Utc>>()
+ .unwrap(),
+ ),
+ delete_row_count: Some(0),
+ embedded_index: None,
+ first_row_id: None,
+ write_cols: None,
+ external_path: None,
+ file_source: None,
+ value_stats_cols: None,
+ },
+ 2,
+ )];
+ let bytes = to_avro_bytes(MANIFEST_ENTRY_SCHEMA, &original).unwrap();
+ let decoded = from_avro_bytes::<ManifestEntry>(&bytes).unwrap();
+ assert_eq!(original, decoded);
+ }
+
#[tokio::test]
async fn test_read_manifest_list() {
let workdir =
diff --git a/crates/paimon/src/lib.rs
b/crates/paimon/src/spec/partition_statistics.rs
similarity index 53%
copy from crates/paimon/src/lib.rs
copy to crates/paimon/src/spec/partition_statistics.rs
index a68ba6b..30f0579 100644
--- a/crates/paimon/src/lib.rs
+++ b/crates/paimon/src/spec/partition_statistics.rs
@@ -15,33 +15,20 @@
// specific language governing permissions and limitations
// under the License.
-mod error;
-pub use error::Error;
-pub use error::Result;
+use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
-pub mod common;
-pub use common::{CatalogOptions, Options};
-
-pub mod api;
-pub use api::rest_api::RESTApi;
-
-pub mod arrow;
-pub mod btree;
-pub mod catalog;
-mod deletion_vector;
-pub mod file_index;
-pub mod io;
-mod predicate_stats;
-pub mod spec;
-pub mod table;
-#[cfg(feature = "fulltext")]
-pub mod tantivy;
-
-pub use catalog::Catalog;
-pub use catalog::CatalogFactory;
-pub use catalog::FileSystemCatalog;
-
-pub use table::{
- DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan,
ReadBuilder, RowRange,
- SnapshotManager, Table, TableRead, TableScan, TagManager,
-};
+/// Partition-level statistics for snapshot commits.
+///
+/// Reference:
[org.apache.paimon.partition.PartitionStatistics](https://github.com/apache/paimon)
+/// and [pypaimon snapshot_commit.py
PartitionStatistics](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/snapshot/snapshot_commit.py)
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct PartitionStatistics {
+ pub spec: HashMap<String, String>,
+ pub record_count: i64,
+ pub file_size_in_bytes: i64,
+ pub file_count: i64,
+ pub last_file_creation_time: u64,
+ pub total_buckets: i32,
+}
diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs
index 39d6aee..62ab3fe 100644
--- a/crates/paimon/src/spec/schema.rs
+++ b/crates/paimon/src/spec/schema.rs
@@ -86,6 +86,13 @@ impl TableSchema {
&self.partition_keys
}
+ pub fn partition_fields(&self) -> Vec<DataField> {
+ self.partition_keys
+ .iter()
+ .filter_map(|key| self.fields.iter().find(|f| f.name() ==
key).cloned())
+ .collect()
+ }
+
pub fn primary_keys(&self) -> &[String] {
&self.primary_keys
}
diff --git a/crates/paimon/src/spec/snapshot.rs
b/crates/paimon/src/spec/snapshot.rs
index 28dc92d..013211b 100644
--- a/crates/paimon/src/spec/snapshot.rs
+++ b/crates/paimon/src/spec/snapshot.rs
@@ -92,6 +92,10 @@ pub struct Snapshot {
#[builder(default = None)]
#[serde(skip_serializing_if = "Option::is_none")]
statistics: Option<String>,
+ /// next row id for row tracking
+ #[builder(default = None)]
+ #[serde(skip_serializing_if = "Option::is_none")]
+ next_row_id: Option<i64>,
}
impl Snapshot {
@@ -190,6 +194,18 @@ impl Snapshot {
pub fn statistics(&self) -> Option<&str> {
self.statistics.as_deref()
}
+
+ /// Get the next row id of this snapshot.
+ #[inline]
+ pub fn next_row_id(&self) -> Option<i64> {
+ self.next_row_id
+ }
+
+ /// Get the commit kind of this snapshot.
+ #[inline]
+ pub fn commit_kind(&self) -> &CommitKind {
+ &self.commit_kind
+ }
}
#[cfg(test)]
diff --git a/crates/paimon/src/lib.rs
b/crates/paimon/src/table/commit_message.rs
similarity index 51%
copy from crates/paimon/src/lib.rs
copy to crates/paimon/src/table/commit_message.rs
index a68ba6b..4e9c4ed 100644
--- a/crates/paimon/src/lib.rs
+++ b/crates/paimon/src/table/commit_message.rs
@@ -15,33 +15,27 @@
// specific language governing permissions and limitations
// under the License.
-mod error;
-pub use error::Error;
-pub use error::Result;
+use crate::spec::DataFileMeta;
-pub mod common;
-pub use common::{CatalogOptions, Options};
+/// A commit message representing new files to be committed for a specific
partition and bucket.
+///
+/// Reference:
[org.apache.paimon.table.sink.CommitMessage](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java)
+#[derive(Debug, Clone)]
+pub struct CommitMessage {
+ /// Binary row bytes for the partition.
+ pub partition: Vec<u8>,
+ /// Bucket id.
+ pub bucket: i32,
+ /// New data files to be added.
+ pub new_files: Vec<DataFileMeta>,
+}
-pub mod api;
-pub use api::rest_api::RESTApi;
-
-pub mod arrow;
-pub mod btree;
-pub mod catalog;
-mod deletion_vector;
-pub mod file_index;
-pub mod io;
-mod predicate_stats;
-pub mod spec;
-pub mod table;
-#[cfg(feature = "fulltext")]
-pub mod tantivy;
-
-pub use catalog::Catalog;
-pub use catalog::CatalogFactory;
-pub use catalog::FileSystemCatalog;
-
-pub use table::{
- DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan,
ReadBuilder, RowRange,
- SnapshotManager, Table, TableRead, TableScan, TagManager,
-};
+impl CommitMessage {
+ pub fn new(partition: Vec<u8>, bucket: i32, new_files: Vec<DataFileMeta>)
-> Self {
+ Self {
+ partition,
+ bucket,
+ new_files,
+ }
+ }
+}
diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs
index 1b34324..c17ebbc 100644
--- a/crates/paimon/src/table/mod.rs
+++ b/crates/paimon/src/table/mod.rs
@@ -19,31 +19,41 @@
pub(crate) mod bin_pack;
mod bucket_filter;
+mod commit_message;
#[cfg(feature = "fulltext")]
mod full_text_search_builder;
pub(crate) mod global_index_scanner;
mod read_builder;
+pub(crate) mod rest_env;
pub(crate) mod row_id_predicate;
pub(crate) mod schema_manager;
+pub(crate) mod snapshot_commit;
mod snapshot_manager;
mod source;
mod stats_filter;
+pub(crate) mod table_commit;
mod table_scan;
mod tag_manager;
+mod write_builder;
use crate::Result;
use arrow_array::RecordBatch;
+pub use commit_message::CommitMessage;
#[cfg(feature = "fulltext")]
pub use full_text_search_builder::FullTextSearchBuilder;
use futures::stream::BoxStream;
pub use read_builder::{ReadBuilder, TableRead};
+pub use rest_env::RESTEnv;
pub use schema_manager::SchemaManager;
+pub use snapshot_commit::{RESTSnapshotCommit, RenamingSnapshotCommit,
SnapshotCommit};
pub use snapshot_manager::SnapshotManager;
pub use source::{
merge_row_ranges, DataSplit, DataSplitBuilder, DeletionFile,
PartitionBucket, Plan, RowRange,
};
+pub use table_commit::TableCommit;
pub use table_scan::TableScan;
pub use tag_manager::TagManager;
+pub use write_builder::WriteBuilder;
use crate::catalog::Identifier;
use crate::io::FileIO;
@@ -58,6 +68,7 @@ pub struct Table {
location: String,
schema: TableSchema,
schema_manager: SchemaManager,
+ rest_env: Option<RESTEnv>,
}
impl Table {
@@ -67,6 +78,7 @@ impl Table {
identifier: Identifier,
location: String,
schema: TableSchema,
+ rest_env: Option<RESTEnv>,
) -> Self {
let schema_manager = SchemaManager::new(file_io.clone(),
location.clone());
Self {
@@ -75,6 +87,7 @@ impl Table {
location,
schema,
schema_manager,
+ rest_env,
}
}
@@ -118,6 +131,13 @@ impl Table {
FullTextSearchBuilder::new(self)
}
+ /// Create a write builder for write/commit.
+ ///
+ /// Reference: [pypaimon
FileStoreTable.new_write_builder](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/table/file_store_table.py).
+ pub fn new_write_builder(&self) -> WriteBuilder<'_> {
+ WriteBuilder::new(self)
+ }
+
/// Create a copy of this table with extra options merged into the schema.
pub fn copy_with_options(&self, extra: HashMap<String, String>) -> Self {
Self {
@@ -126,6 +146,7 @@ impl Table {
location: self.location.clone(),
schema: self.schema.copy_with_options(extra),
schema_manager: self.schema_manager.clone(),
+ rest_env: self.rest_env.clone(),
}
}
}
diff --git a/crates/paimon/src/table/read_builder.rs
b/crates/paimon/src/table/read_builder.rs
index 6712b0e..63d9917 100644
--- a/crates/paimon/src/table/read_builder.rs
+++ b/crates/paimon/src/table/read_builder.rs
@@ -410,6 +410,7 @@ mod tests {
Identifier::new("default", "t"),
table_path,
table_schema,
+ None,
);
let split = DataSplitBuilder::new()
@@ -469,6 +470,7 @@ mod tests {
Identifier::new("default", "t"),
table_path,
table_schema,
+ None,
);
let split = DataSplitBuilder::new()
@@ -526,6 +528,7 @@ mod tests {
Identifier::new("default", "t"),
table_path,
table_schema,
+ None,
);
let split = DataSplitBuilder::new()
@@ -586,6 +589,7 @@ mod tests {
Identifier::new("default", "t"),
table_path,
table_schema,
+ None,
);
let split = DataSplitBuilder::new()
@@ -637,6 +641,7 @@ mod tests {
Identifier::new("default", "t"),
"/tmp/test".to_string(),
table_schema,
+ None,
);
let mut builder = table.new_read_builder();
@@ -692,6 +697,7 @@ mod tests {
Identifier::new("default", "t"),
"/tmp/test".to_string(),
table_schema,
+ None,
);
let mut builder = table.new_read_builder();
diff --git a/crates/paimon/src/table/rest_env.rs
b/crates/paimon/src/table/rest_env.rs
new file mode 100644
index 0000000..bd1b42a
--- /dev/null
+++ b/crates/paimon/src/table/rest_env.rs
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! REST environment for creating RESTSnapshotCommit instances.
+
+use crate::api::rest_api::RESTApi;
+use crate::catalog::Identifier;
+use crate::table::snapshot_commit::{RESTSnapshotCommit, SnapshotCommit};
+use std::sync::Arc;
+
+/// REST environment that holds the REST API client, identifier, and uuid
+/// needed to create a `RESTSnapshotCommit`.
+#[derive(Clone)]
+pub struct RESTEnv {
+ identifier: Identifier,
+ uuid: String,
+ api: Arc<RESTApi>,
+}
+
+impl std::fmt::Debug for RESTEnv {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("RESTEnv")
+ .field("identifier", &self.identifier)
+ .field("uuid", &self.uuid)
+ .finish()
+ }
+}
+
+impl RESTEnv {
+ /// Create a new RESTEnv.
+ pub fn new(identifier: Identifier, uuid: String, api: Arc<RESTApi>) ->
Self {
+ Self {
+ identifier,
+ uuid,
+ api,
+ }
+ }
+
+ /// Create a `RESTSnapshotCommit` from this environment.
+ pub fn snapshot_commit(&self) -> Arc<dyn SnapshotCommit> {
+ Arc::new(RESTSnapshotCommit::new(
+ self.api.clone(),
+ self.identifier.clone(),
+ self.uuid.clone(),
+ ))
+ }
+}
diff --git a/crates/paimon/src/table/snapshot_commit.rs
b/crates/paimon/src/table/snapshot_commit.rs
new file mode 100644
index 0000000..9d141f7
--- /dev/null
+++ b/crates/paimon/src/table/snapshot_commit.rs
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! SnapshotCommit abstraction for atomic snapshot commits.
+//!
+//! Reference: [pypaimon
snapshot_commit.py](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/snapshot/snapshot_commit.py)
+
+use crate::api::rest_api::RESTApi;
+use crate::catalog::Identifier;
+use crate::spec::{PartitionStatistics, Snapshot};
+use crate::table::SnapshotManager;
+use crate::Result;
+use async_trait::async_trait;
+use std::sync::Arc;
+
+/// Interface to commit a snapshot atomically.
+///
+/// Two implementations:
+/// - `RenamingSnapshotCommit` — file system atomic rename
+/// - `RESTSnapshotCommit` — via Catalog API (e.g. REST)
+#[async_trait]
+pub trait SnapshotCommit: Send + Sync {
+ /// Commit the given snapshot. Returns true if successful, false if
+ /// another writer won the race.
+ async fn commit(&self, snapshot: &Snapshot, statistics:
&[PartitionStatistics])
+ -> Result<bool>;
+}
+
+/// A SnapshotCommit using file renaming to commit.
+///
+/// Reference: [pypaimon
RenamingSnapshotCommit](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/snapshot/renaming_snapshot_commit.py)
+pub struct RenamingSnapshotCommit {
+ snapshot_manager: SnapshotManager,
+}
+
+impl RenamingSnapshotCommit {
+ pub fn new(snapshot_manager: SnapshotManager) -> Self {
+ Self { snapshot_manager }
+ }
+}
+
+#[async_trait]
+impl SnapshotCommit for RenamingSnapshotCommit {
+ async fn commit(
+ &self,
+ snapshot: &Snapshot,
+ _statistics: &[PartitionStatistics],
+ ) -> Result<bool> {
+ // statistics are not used in file system mode (same as Python)
+ self.snapshot_manager.commit_snapshot(snapshot).await
+ }
+}
+
+/// A SnapshotCommit using REST API to commit (e.g. REST Catalog).
+///
+/// Reference: [pypaimon
RESTSnapshotCommit](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/snapshot/catalog_snapshot_commit.py)
+pub struct RESTSnapshotCommit {
+ api: Arc<RESTApi>,
+ identifier: Identifier,
+ uuid: String,
+}
+
+impl RESTSnapshotCommit {
+ pub fn new(api: Arc<RESTApi>, identifier: Identifier, uuid: String) ->
Self {
+ Self {
+ api,
+ identifier,
+ uuid,
+ }
+ }
+}
+
+#[async_trait]
+impl SnapshotCommit for RESTSnapshotCommit {
+ async fn commit(
+ &self,
+ snapshot: &Snapshot,
+ statistics: &[PartitionStatistics],
+ ) -> Result<bool> {
+ self.api
+ .commit_snapshot(&self.identifier, &self.uuid, snapshot,
statistics)
+ .await
+ }
+}
diff --git a/crates/paimon/src/table/snapshot_manager.rs
b/crates/paimon/src/table/snapshot_manager.rs
index 6d8eab2..abff248 100644
--- a/crates/paimon/src/table/snapshot_manager.rs
+++ b/crates/paimon/src/table/snapshot_manager.rs
@@ -45,6 +45,10 @@ impl SnapshotManager {
}
}
+ pub fn file_io(&self) -> &FileIO {
+ &self.file_io
+ }
+
/// Path to the snapshot directory (e.g. `table_path/snapshot`).
pub fn snapshot_dir(&self) -> String {
format!("{}/{}", self.table_path, SNAPSHOT_DIR)
@@ -65,14 +69,22 @@ impl SnapshotManager {
format!("{}/snapshot-{}", self.snapshot_dir(), snapshot_id)
}
+ /// Path to the manifest directory.
+ pub fn manifest_dir(&self) -> String {
+ format!("{}/manifest", self.table_path)
+ }
+
+ /// Path to a manifest file.
+ pub fn manifest_path(&self, manifest_name: &str) -> String {
+ format!("{}/{}", self.manifest_dir(), manifest_name)
+ }
+
/// Read a hint file and return the id, or None if the file does not exist,
/// is being deleted, or contains invalid content.
///
/// Reference:
[HintFileUtils.readHint](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/HintFileUtils.java)
async fn read_hint(&self, path: &str) -> Option<i64> {
let input = self.file_io.new_input(path).ok()?;
- // Try to read directly without exists() check to avoid TOCTOU race.
- // The file may be deleted or overwritten concurrently.
let content = input.read().await.ok()?;
let id_str = str::from_utf8(&content).ok()?;
id_str.trim().parse().ok()
@@ -138,7 +150,7 @@ impl SnapshotManager {
self.find_by_list_files(i64::min).await
}
- /// Get a snapshot by id. Returns an error if the snapshot file does not
exist.
+ /// Get a snapshot by id.
pub async fn get_snapshot(&self, snapshot_id: i64) ->
crate::Result<Snapshot> {
let snapshot_path = self.snapshot_path(snapshot_id);
let snap_input = self.file_io.new_input(&snapshot_path)?;
@@ -176,6 +188,68 @@ impl SnapshotManager {
Ok(Some(snapshot))
}
+ /// Atomically commit a snapshot.
+ ///
+ /// Writes the snapshot JSON to the target path. Returns `false` if the
+ /// target already exists (another writer won the race).
+ ///
+ /// On file systems that support atomic rename, we write to a temp file
+ /// first then rename. On backends where rename is not supported (e.g.
+ /// memory, object stores), we fall back to a direct write after an
+ /// existence check.
+ pub async fn commit_snapshot(&self, snapshot: &Snapshot) ->
crate::Result<bool> {
+ let target_path = self.snapshot_path(snapshot.id());
+
+ let json = serde_json::to_string(snapshot).map_err(|e|
crate::Error::DataInvalid {
+ message: format!("failed to serialize snapshot: {e}"),
+ source: Some(Box::new(e)),
+ })?;
+
+ // Try rename-based atomic commit first, fall back to check-and-write.
+ //
+ // TODO: opendal's rename uses POSIX semantics which silently
overwrites the target.
+ // The exists() check below narrows the race window but does not
eliminate it.
+ // Java Paimon uses `lock.runWithLock(() -> !fileIO.exists(newPath)
&& callable.call())`
+ // for full mutual exclusion. We need an external lock mechanism
(like Java's Lock
+ // interface) for backends without atomic rename-no-replace support.
+ let tmp_path = format!("{}.tmp-{}", target_path, uuid::Uuid::new_v4());
+ let output = self.file_io.new_output(&tmp_path)?;
+ output.write(bytes::Bytes::from(json.clone())).await?;
+
+ // Check before rename to avoid silent overwrite (opendal uses POSIX
rename semantics)
+ if self.file_io.exists(&target_path).await? {
+ let _ = self.file_io.delete_file(&tmp_path).await;
+ return Ok(false);
+ }
+
+ match self.file_io.rename(&tmp_path, &target_path).await {
+ Ok(()) => {}
+ Err(_) => {
+ // Rename not supported (e.g. memory/object store).
+ // Clean up temp file, then check-and-write.
+ let _ = self.file_io.delete_file(&tmp_path).await;
+ if self.file_io.exists(&target_path).await? {
+ return Ok(false);
+ }
+ let output = self.file_io.new_output(&target_path)?;
+ output.write(bytes::Bytes::from(json)).await?;
+ }
+ }
+
+ // Update LATEST hint (best-effort)
+ let _ = self.write_latest_hint(snapshot.id()).await;
+ Ok(true)
+ }
+
+ /// Update the LATEST hint file.
+ pub async fn write_latest_hint(&self, snapshot_id: i64) ->
crate::Result<()> {
+ let hint_path = self.latest_hint_path();
+ let output = self.file_io.new_output(&hint_path)?;
+ output
+ .write(bytes::Bytes::from(snapshot_id.to_string()))
+ .await
+ }
+
/// Returns the snapshot whose commit time is earlier than or equal to the
given
/// `timestamp_millis`. If no such snapshot exists, returns None.
///
@@ -196,7 +270,6 @@ impl SnapshotManager {
None => return Ok(None),
};
- // If the earliest snapshot is already after the timestamp, no match.
if (earliest_snapshot.time_millis() as i64) > timestamp_millis {
return Ok(None);
}
@@ -220,3 +293,77 @@ impl SnapshotManager {
Ok(result)
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::io::FileIOBuilder;
+ use crate::spec::CommitKind;
+
+ fn test_file_io() -> FileIO {
+ FileIOBuilder::new("memory").build().unwrap()
+ }
+
+ async fn setup(table_path: &str) -> (FileIO, SnapshotManager) {
+ let file_io = test_file_io();
+ file_io
+ .mkdirs(&format!("{table_path}/snapshot/"))
+ .await
+ .unwrap();
+ let sm = SnapshotManager::new(file_io.clone(), table_path.to_string());
+ (file_io, sm)
+ }
+
+ fn test_snapshot(id: i64) -> Snapshot {
+ Snapshot::builder()
+ .version(3)
+ .id(id)
+ .schema_id(0)
+ .base_manifest_list("base-list".to_string())
+ .delta_manifest_list("delta-list".to_string())
+ .commit_user("test-user".to_string())
+ .commit_identifier(0)
+ .commit_kind(CommitKind::APPEND)
+ .time_millis(1000 * id as u64)
+ .build()
+ }
+
+ #[tokio::test]
+ async fn test_commit_snapshot_first() {
+ let (_, sm) = setup("memory:/test_commit_first").await;
+ let snap = test_snapshot(1);
+ let result = sm.commit_snapshot(&snap).await.unwrap();
+ assert!(result);
+
+ let loaded = sm.get_snapshot(1).await.unwrap();
+ assert_eq!(loaded.id(), 1);
+ }
+
+ #[tokio::test]
+ async fn test_commit_snapshot_already_exists() {
+ let (_, sm) = setup("memory:/test_commit_exists").await;
+ let snap = test_snapshot(1);
+ assert!(sm.commit_snapshot(&snap).await.unwrap());
+ // Second commit to same id should return false
+ let result = sm.commit_snapshot(&snap).await.unwrap();
+ assert!(!result);
+ }
+
+ #[tokio::test]
+ async fn test_commit_updates_latest_hint() {
+ let (_, sm) = setup("memory:/test_commit_hint").await;
+ let snap = test_snapshot(1);
+ sm.commit_snapshot(&snap).await.unwrap();
+
+ let latest_id = sm.get_latest_snapshot_id().await.unwrap();
+ assert_eq!(latest_id, Some(1));
+ }
+
+ #[tokio::test]
+ async fn test_write_latest_hint() {
+ let (_, sm) = setup("memory:/test_write_hint").await;
+ sm.write_latest_hint(42).await.unwrap();
+ let hint = sm.read_hint(&sm.latest_hint_path()).await;
+ assert_eq!(hint, Some(42));
+ }
+}
diff --git a/crates/paimon/src/table/table_commit.rs
b/crates/paimon/src/table/table_commit.rs
new file mode 100644
index 0000000..bc7f5c1
--- /dev/null
+++ b/crates/paimon/src/table/table_commit.rs
@@ -0,0 +1,995 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Table commit logic for Paimon write operations.
+//!
+//! Reference:
[org.apache.paimon.operation.FileStoreCommitImpl](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java)
+//! and [pypaimon table_commit.py /
file_store_commit.py](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/)
+
+use crate::io::FileIO;
+use crate::spec::stats::BinaryTableStats;
+use crate::spec::FileKind;
+use crate::spec::{
+ datums_to_binary_row, extract_datum, BinaryRow, CommitKind, CoreOptions,
Datum, Manifest,
+ ManifestEntry, ManifestFileMeta, ManifestList, PartitionStatistics,
Predicate,
+ PredicateBuilder, Snapshot,
+};
+use crate::table::commit_message::CommitMessage;
+use crate::table::snapshot_commit::SnapshotCommit;
+use crate::table::{SnapshotManager, Table, TableScan};
+use crate::Result;
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::{SystemTime, UNIX_EPOCH};
+
+/// Batch commit identifier (i64::MAX), same as Python's
BATCH_COMMIT_IDENTIFIER.
+const BATCH_COMMIT_IDENTIFIER: i64 = i64::MAX;
+
+/// Table commit logic for Paimon write operations.
+///
+/// Provides atomic commit functionality including append, overwrite and
truncate
+pub struct TableCommit {
+ table: Table,
+ snapshot_manager: SnapshotManager,
+ snapshot_commit: Arc<dyn SnapshotCommit>,
+ commit_user: String,
+ total_buckets: i32,
+ overwrite_partition: Option<HashMap<String, Datum>>,
+ // commit config
+ commit_max_retries: u32,
+ commit_timeout_ms: u64,
+ commit_min_retry_wait_ms: u64,
+ commit_max_retry_wait_ms: u64,
+ row_tracking_enabled: bool,
+}
+
+impl TableCommit {
+ pub fn new(
+ table: Table,
+ commit_user: String,
+ overwrite_partition: Option<HashMap<String, Datum>>,
+ ) -> Self {
+ let snapshot_manager = SnapshotManager::new(table.file_io.clone(),
table.location.clone());
+ let snapshot_commit = if let Some(env) = &table.rest_env {
+ env.snapshot_commit()
+ } else {
+
Arc::new(crate::table::snapshot_commit::RenamingSnapshotCommit::new(
+ snapshot_manager.clone(),
+ ))
+ };
+ let core_options = CoreOptions::new(table.schema().options());
+ let total_buckets = core_options.bucket();
+ let commit_max_retries = core_options.commit_max_retries();
+ let commit_timeout_ms = core_options.commit_timeout_ms();
+ let commit_min_retry_wait_ms = core_options.commit_min_retry_wait_ms();
+ let commit_max_retry_wait_ms = core_options.commit_max_retry_wait_ms();
+ let row_tracking_enabled = core_options.row_tracking_enabled();
+ Self {
+ table,
+ snapshot_manager,
+ snapshot_commit,
+ commit_user,
+ total_buckets,
+ overwrite_partition,
+ commit_max_retries,
+ commit_timeout_ms,
+ commit_min_retry_wait_ms,
+ commit_max_retry_wait_ms,
+ row_tracking_enabled,
+ }
+ }
+
+ /// Commit new files. Uses OVERWRITE mode if overwrite_partition was set
+ /// in the constructor, otherwise uses APPEND mode.
+ pub async fn commit(&self, commit_messages: Vec<CommitMessage>) ->
Result<()> {
+ if commit_messages.is_empty() {
+ return Ok(());
+ }
+
+ let commit_entries = self.messages_to_entries(&commit_messages);
+
+ if let Some(overwrite_partition) = &self.overwrite_partition {
+ let partition_predicate = if overwrite_partition.is_empty() {
+ None
+ } else {
+ Some(self.build_partition_predicate(overwrite_partition)?)
+ };
+ self.try_commit(
+ CommitKind::OVERWRITE,
+ CommitEntriesPlan::Overwrite {
+ partition_predicate,
+ new_entries: commit_entries,
+ },
+ )
+ .await
+ } else {
+ self.try_commit(
+ CommitKind::APPEND,
+ CommitEntriesPlan::Static(commit_entries),
+ )
+ .await
+ }
+ }
+
+ /// Build a partition predicate from key-value pairs.
+ fn build_partition_predicate(&self, partition: &HashMap<String, Datum>) ->
Result<Predicate> {
+ let pb =
PredicateBuilder::new(&self.table.schema().partition_fields());
+ let predicates: Vec<Predicate> = partition
+ .iter()
+ .map(|(key, value)| pb.equal(key, value.clone()))
+ .collect::<Result<Vec<_>>>()?;
+ Ok(Predicate::and(predicates))
+ }
+
+ /// Drop specific partitions (OVERWRITE with only deletes).
+ pub async fn truncate_partitions(&self, partitions: Vec<HashMap<String,
Datum>>) -> Result<()> {
+ if partitions.is_empty() {
+ return Ok(());
+ }
+
+ let predicates: Vec<Predicate> = partitions
+ .iter()
+ .map(|p| self.build_partition_predicate(p))
+ .collect::<Result<Vec<_>>>()?;
+
+ self.try_commit(
+ CommitKind::OVERWRITE,
+ CommitEntriesPlan::Overwrite {
+ partition_predicate: Some(Predicate::or(predicates)),
+ new_entries: vec![],
+ },
+ )
+ .await
+ }
+
+ /// Truncate the entire table (OVERWRITE with no filter, only deletes).
+ pub async fn truncate_table(&self) -> Result<()> {
+ self.try_commit(
+ CommitKind::OVERWRITE,
+ CommitEntriesPlan::Overwrite {
+ partition_predicate: None,
+ new_entries: vec![],
+ },
+ )
+ .await
+ }
+
+ /// Try to commit with retries.
+ async fn try_commit(&self, commit_kind: CommitKind, plan:
CommitEntriesPlan) -> Result<()> {
+ let mut retry_count = 0u32;
+ let mut last_snapshot_for_dup_check: Option<Snapshot> = None;
+ let start_time_ms = current_time_millis();
+
+ loop {
+ let latest_snapshot =
self.snapshot_manager.get_latest_snapshot().await?;
+ let commit_entries = self.resolve_commit_entries(&plan,
&latest_snapshot).await?;
+
+ if commit_entries.is_empty() {
+ break;
+ }
+
+ // Check for duplicate commit (idempotency on retry)
+ if self
+ .is_duplicate_commit(&last_snapshot_for_dup_check,
&latest_snapshot, &commit_kind)
+ .await
+ {
+ break;
+ }
+
+ let result = self
+ .try_commit_once(&commit_kind, commit_entries,
&latest_snapshot)
+ .await?;
+
+ match result {
+ true => break,
+ false => {
+ last_snapshot_for_dup_check = latest_snapshot;
+ }
+ }
+
+ let elapsed_ms = current_time_millis() - start_time_ms;
+ if elapsed_ms > self.commit_timeout_ms || retry_count >=
self.commit_max_retries {
+ let snap_id = last_snapshot_for_dup_check
+ .as_ref()
+ .map(|s| s.id() + 1)
+ .unwrap_or(1);
+ return Err(crate::Error::DataInvalid {
+ message: format!(
+ "Commit failed for snapshot {} after {} millis with {}
retries, \
+ there may exist commit conflicts between multiple
jobs.",
+ snap_id, elapsed_ms, retry_count
+ ),
+ source: None,
+ });
+ }
+
+ self.commit_retry_wait(retry_count).await;
+ retry_count += 1;
+ }
+
+ Ok(())
+ }
+
+ /// Single commit attempt.
+ async fn try_commit_once(
+ &self,
+ commit_kind: &CommitKind,
+ mut commit_entries: Vec<ManifestEntry>,
+ latest_snapshot: &Option<Snapshot>,
+ ) -> Result<bool> {
+ let new_snapshot_id = latest_snapshot.as_ref().map(|s| s.id() +
1).unwrap_or(1);
+
+ // Row tracking
+ let mut next_row_id: Option<i64> = None;
+ if self.row_tracking_enabled {
+ commit_entries = self.assign_snapshot_id(new_snapshot_id,
commit_entries);
+ let first_row_id_start = latest_snapshot
+ .as_ref()
+ .and_then(|s| s.next_row_id())
+ .unwrap_or(0);
+ let (assigned, nrid) =
+ self.assign_row_tracking_meta(first_row_id_start,
commit_entries);
+ commit_entries = assigned;
+ next_row_id = Some(nrid);
+ }
+
+ let file_io = self.snapshot_manager.file_io();
+ let manifest_dir = self.snapshot_manager.manifest_dir();
+
+ let unique_id = uuid::Uuid::new_v4();
+ let base_manifest_list_name = format!("manifest-list-{unique_id}-0");
+ let delta_manifest_list_name = format!("manifest-list-{unique_id}-1");
+ let new_manifest_name = format!("manifest-{}-0", uuid::Uuid::new_v4());
+
+ let base_manifest_list_path =
format!("{manifest_dir}/{base_manifest_list_name}");
+ let delta_manifest_list_path =
format!("{manifest_dir}/{delta_manifest_list_name}");
+ let new_manifest_path = format!("{manifest_dir}/{new_manifest_name}");
+
+ // Write manifest file
+ let new_manifest_file_meta = self
+ .write_manifest_file(
+ file_io,
+ &new_manifest_path,
+ &new_manifest_name,
+ &commit_entries,
+ )
+ .await?;
+
+ // Write delta manifest list
+ ManifestList::write(
+ file_io,
+ &delta_manifest_list_path,
+ &[new_manifest_file_meta],
+ )
+ .await?;
+
+ // Read existing manifests (base + delta from previous snapshot) and
write base manifest list
+ let mut total_record_count: i64 = 0;
+ let existing_manifest_files = if let Some(snap) = latest_snapshot {
+ let base_path = format!("{manifest_dir}/{}",
snap.base_manifest_list());
+ let delta_path = format!("{manifest_dir}/{}",
snap.delta_manifest_list());
+ let base_files = ManifestList::read(file_io, &base_path).await?;
+ let delta_files = ManifestList::read(file_io, &delta_path).await?;
+ if let Some(prev) = snap.total_record_count() {
+ total_record_count += prev;
+ }
+ let mut all = base_files;
+ all.extend(delta_files);
+ all
+ } else {
+ vec![]
+ };
+
+ ManifestList::write(file_io, &base_manifest_list_path,
&existing_manifest_files).await?;
+
+ // Calculate delta record count
+ let mut delta_record_count: i64 = 0;
+ for entry in &commit_entries {
+ match entry.kind() {
+ FileKind::Add => delta_record_count += entry.file().row_count,
+ FileKind::Delete => delta_record_count -=
entry.file().row_count,
+ }
+ }
+ total_record_count += delta_record_count;
+
+ let snapshot = Snapshot::builder()
+ .version(3)
+ .id(new_snapshot_id)
+ .schema_id(self.table.schema().id())
+ .base_manifest_list(base_manifest_list_name)
+ .delta_manifest_list(delta_manifest_list_name)
+ .commit_user(self.commit_user.clone())
+ .commit_identifier(BATCH_COMMIT_IDENTIFIER)
+ .commit_kind(commit_kind.clone())
+ .time_millis(current_time_millis())
+ .total_record_count(Some(total_record_count))
+ .delta_record_count(Some(delta_record_count))
+ .next_row_id(next_row_id)
+ .build();
+
+ let statistics = self.generate_partition_statistics(&commit_entries)?;
+
+ self.snapshot_commit.commit(&snapshot, &statistics).await
+ }
+
+ /// Write a manifest file and return its metadata.
+ async fn write_manifest_file(
+ &self,
+ file_io: &FileIO,
+ path: &str,
+ file_name: &str,
+ entries: &[ManifestEntry],
+ ) -> Result<ManifestFileMeta> {
+ Manifest::write(file_io, path, entries).await?;
+
+ let mut added_file_count: i64 = 0;
+ let mut deleted_file_count: i64 = 0;
+ for entry in entries {
+ match entry.kind() {
+ FileKind::Add => added_file_count += 1,
+ FileKind::Delete => deleted_file_count += 1,
+ }
+ }
+
+ // Get file size
+ let status = file_io.get_status(path).await?;
+
+ let partition_stats = self.compute_partition_stats(entries)?;
+
+ Ok(ManifestFileMeta::new(
+ file_name.to_string(),
+ status.size as i64,
+ added_file_count,
+ deleted_file_count,
+ partition_stats,
+ self.table.schema().id(),
+ ))
+ }
+
+ /// Check if this commit was already completed (idempotency).
+ async fn is_duplicate_commit(
+ &self,
+ last_snapshot_for_dup_check: &Option<Snapshot>,
+ latest_snapshot: &Option<Snapshot>,
+ commit_kind: &CommitKind,
+ ) -> bool {
+ if let (Some(prev_snap), Some(latest)) = (last_snapshot_for_dup_check,
latest_snapshot) {
+ let start_id = prev_snap.id() + 1;
+ for snapshot_id in start_id..=latest.id() {
+ if let Ok(snap) =
self.snapshot_manager.get_snapshot(snapshot_id).await {
+ if snap.commit_user() == self.commit_user &&
snap.commit_kind() == commit_kind {
+ return true;
+ }
+ }
+ }
+ }
+ false
+ }
+
+ /// Resolve commit entries based on the plan type.
+ async fn resolve_commit_entries(
+ &self,
+ plan: &CommitEntriesPlan,
+ latest_snapshot: &Option<Snapshot>,
+ ) -> Result<Vec<ManifestEntry>> {
+ match plan {
+ CommitEntriesPlan::Static(entries) => Ok(entries.clone()),
+ CommitEntriesPlan::Overwrite {
+ partition_predicate,
+ new_entries,
+ } => {
+ self.generate_overwrite_entries(
+ latest_snapshot,
+ partition_predicate.as_ref(),
+ new_entries,
+ )
+ .await
+ }
+ }
+ }
+
+ /// Generate overwrite entries: DELETE existing + ADD new.
+ async fn generate_overwrite_entries(
+ &self,
+ latest_snapshot: &Option<Snapshot>,
+ partition_predicate: Option<&Predicate>,
+ new_entries: &[ManifestEntry],
+ ) -> Result<Vec<ManifestEntry>> {
+ let mut entries = Vec::new();
+
+ if let Some(snap) = latest_snapshot {
+ let scan = TableScan::new(
+ &self.table,
+ partition_predicate.cloned(),
+ vec![],
+ None,
+ None,
+ None,
+ );
+ let current_entries = scan.plan_manifest_entries(snap).await?;
+ for entry in current_entries {
+ entries.push(entry.with_kind(FileKind::Delete));
+ }
+ }
+
+ entries.extend(new_entries.iter().cloned());
+ Ok(entries)
+ }
+
+ /// Assign snapshot ID as sequence number to entries.
+ fn assign_snapshot_id(
+ &self,
+ snapshot_id: i64,
+ entries: Vec<ManifestEntry>,
+ ) -> Vec<ManifestEntry> {
+ entries
+ .into_iter()
+ .map(|e| e.with_sequence_number(snapshot_id, snapshot_id))
+ .collect()
+ }
+
+ /// Assign row tracking metadata to new files.
+ fn assign_row_tracking_meta(
+ &self,
+ first_row_id_start: i64,
+ entries: Vec<ManifestEntry>,
+ ) -> (Vec<ManifestEntry>, i64) {
+ let mut result = Vec::with_capacity(entries.len());
+ let mut start = first_row_id_start;
+
+ for entry in entries {
+ if *entry.kind() == FileKind::Add
+ && entry.file().file_source == Some(0) // APPEND
+ && entry.file().first_row_id.is_none()
+ {
+ let row_count = entry.file().row_count;
+ result.push(entry.with_first_row_id(start));
+ start += row_count;
+ } else {
+ result.push(entry);
+ }
+ }
+
+ (result, start)
+ }
+
+ /// Exponential backoff with jitter.
+ async fn commit_retry_wait(&self, retry_count: u32) {
+ let base_wait = self
+ .commit_min_retry_wait_ms
+ .saturating_mul(2u64.saturating_pow(retry_count));
+ let wait = base_wait.min(self.commit_max_retry_wait_ms);
+ // Simple jitter: add up to 20% of wait time
+ let jitter = (wait as f64 * 0.2 * rand_f64()) as u64;
+ let total_wait = wait + jitter;
+ tokio::time::sleep(std::time::Duration::from_millis(total_wait)).await;
+ }
+
+ /// Compute partition stats (min/max/null_counts) across all entries.
+ fn compute_partition_stats(&self, entries: &[ManifestEntry]) ->
Result<BinaryTableStats> {
+ let partition_fields = self.table.schema().partition_fields();
+ let num_fields = partition_fields.len();
+
+ if num_fields == 0 || entries.is_empty() {
+ return Ok(BinaryTableStats::new(vec![], vec![], vec![]));
+ }
+
+ let data_types: Vec<_> = partition_fields
+ .iter()
+ .map(|f| f.data_type().clone())
+ .collect();
+ let mut mins: Vec<Option<Datum>> = vec![None; num_fields];
+ let mut maxs: Vec<Option<Datum>> = vec![None; num_fields];
+ let mut null_counts: Vec<i64> = vec![0; num_fields];
+
+ for entry in entries {
+ let partition_bytes = entry.partition();
+ if partition_bytes.is_empty() {
+ continue;
+ }
+ let row = BinaryRow::from_serialized_bytes(partition_bytes)?;
+ for i in 0..num_fields {
+ match extract_datum(&row, i, &data_types[i])? {
+ Some(datum) => {
+ mins[i] = Some(match mins[i].take() {
+ Some(cur) if cur <= datum => cur,
+ Some(_) => datum.clone(),
+ None => datum.clone(),
+ });
+ maxs[i] = Some(match maxs[i].take() {
+ Some(cur) if cur >= datum => cur,
+ Some(_) => datum,
+ None => datum,
+ });
+ }
+ None => {
+ null_counts[i] += 1;
+ }
+ }
+ }
+ }
+
+ let min_datums: Vec<_> = mins.iter().zip(data_types.iter()).collect();
+ let max_datums: Vec<_> = maxs.iter().zip(data_types.iter()).collect();
+
+ let min_bytes = datums_to_binary_row(&min_datums);
+ let max_bytes = datums_to_binary_row(&max_datums);
+ let null_counts = null_counts.into_iter().map(Some).collect();
+
+ Ok(BinaryTableStats::new(min_bytes, max_bytes, null_counts))
+ }
+
+ /// Generate per-partition statistics from commit entries.
+ ///
+ /// Reference: [pypaimon
FileStoreCommit._generate_partition_statistics](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/file_store_commit.py)
+ fn generate_partition_statistics(
+ &self,
+ entries: &[ManifestEntry],
+ ) -> Result<Vec<PartitionStatistics>> {
+ let partition_fields = self.table.schema().partition_fields();
+ let data_types: Vec<_> = partition_fields
+ .iter()
+ .map(|f| f.data_type().clone())
+ .collect();
+ let partition_keys: Vec<_> = self
+ .table
+ .schema()
+ .partition_keys()
+ .iter()
+ .map(|s| s.to_string())
+ .collect();
+
+ let mut stats_map: HashMap<Vec<u8>, PartitionStatistics> =
HashMap::new();
+
+ for entry in entries {
+ let partition_bytes = entry.partition().to_vec();
+ let is_add = *entry.kind() == FileKind::Add;
+ let sign: i64 = if is_add { 1 } else { -1 };
+
+ let file = entry.file();
+ let file_creation_time = file
+ .creation_time
+ .map(|t| t.timestamp_millis() as u64)
+ .unwrap_or_else(current_time_millis);
+
+ let stats =
stats_map.entry(partition_bytes.clone()).or_insert_with(|| {
+ // Parse partition spec from BinaryRow
+ let spec = self
+ .parse_partition_spec(&partition_bytes, &partition_keys,
&data_types)
+ .unwrap_or_default();
+ PartitionStatistics {
+ spec,
+ record_count: 0,
+ file_size_in_bytes: 0,
+ file_count: 0,
+ last_file_creation_time: 0,
+ total_buckets: entry.total_buckets(),
+ }
+ });
+
+ stats.record_count += sign * file.row_count;
+ stats.file_size_in_bytes += sign * file.file_size;
+ stats.file_count += sign;
+ stats.last_file_creation_time =
stats.last_file_creation_time.max(file_creation_time);
+ }
+
+ Ok(stats_map.into_values().collect())
+ }
+
+ /// Parse partition BinaryRow bytes into a HashMap<String, String>.
+ fn parse_partition_spec(
+ &self,
+ partition_bytes: &[u8],
+ partition_keys: &[String],
+ data_types: &[crate::spec::DataType],
+ ) -> Result<HashMap<String, String>> {
+ let mut spec = HashMap::new();
+ if partition_bytes.is_empty() || partition_keys.is_empty() {
+ return Ok(spec);
+ }
+ let row = BinaryRow::from_serialized_bytes(partition_bytes)?;
+ for (i, key) in partition_keys.iter().enumerate() {
+ if let Some(datum) = extract_datum(&row, i, &data_types[i])? {
+ spec.insert(key.clone(), datum.to_string());
+ }
+ }
+ Ok(spec)
+ }
+
+ /// Convert commit messages to manifest entries (ADD kind).
+ fn messages_to_entries(&self, messages: &[CommitMessage]) ->
Vec<ManifestEntry> {
+ messages
+ .iter()
+ .flat_map(|msg| {
+ msg.new_files.iter().map(move |file| {
+ ManifestEntry::new(
+ FileKind::Add,
+ msg.partition.clone(),
+ msg.bucket,
+ self.total_buckets,
+ file.clone(),
+ 2,
+ )
+ })
+ })
+ .collect()
+ }
+}
+
+/// Plan for resolving commit entries.
+enum CommitEntriesPlan {
+ /// Static entries (for APPEND).
+ Static(Vec<ManifestEntry>),
+ /// Overwrite with optional partition predicate.
+ Overwrite {
+ partition_predicate: Option<Predicate>,
+ new_entries: Vec<ManifestEntry>,
+ },
+}
+
+fn current_time_millis() -> u64 {
+ SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_millis() as u64
+}
+
+/// Random f64 in [0, 1) using RandomState for per-process entropy.
+fn rand_f64() -> f64 {
+ use std::collections::hash_map::RandomState;
+ use std::hash::{BuildHasher, Hasher};
+ let mut hasher = RandomState::new().build_hasher();
+ hasher.write_u64(
+ SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_nanos() as u64,
+ );
+ (hasher.finish() as f64) / (u64::MAX as f64)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::catalog::Identifier;
+ use crate::io::FileIOBuilder;
+ use crate::spec::stats::BinaryTableStats;
+ use crate::spec::{BinaryRowBuilder, DataFileMeta, ManifestList,
TableSchema};
+ use chrono::{DateTime, Utc};
+
+ fn test_file_io() -> FileIO {
+ FileIOBuilder::new("memory").build().unwrap()
+ }
+
+ fn test_schema() -> TableSchema {
+ use crate::spec::{DataType, IntType, Schema, VarCharType};
+ let schema = Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("name", DataType::VarChar(VarCharType::string_type()))
+ .build()
+ .unwrap();
+ TableSchema::new(0, &schema)
+ }
+
+ fn test_partitioned_schema() -> TableSchema {
+ use crate::spec::{DataType, IntType, Schema, VarCharType};
+ let schema = Schema::builder()
+ .column("pt", DataType::VarChar(VarCharType::string_type()))
+ .column("id", DataType::Int(IntType::new()))
+ .partition_keys(["pt"])
+ .build()
+ .unwrap();
+ TableSchema::new(0, &schema)
+ }
+
+ fn test_table(file_io: &FileIO, table_path: &str) -> Table {
+ Table::new(
+ file_io.clone(),
+ Identifier::new("default", "test_table"),
+ table_path.to_string(),
+ test_schema(),
+ None,
+ )
+ }
+
+ fn test_partitioned_table(file_io: &FileIO, table_path: &str) -> Table {
+ Table::new(
+ file_io.clone(),
+ Identifier::new("default", "test_table"),
+ table_path.to_string(),
+ test_partitioned_schema(),
+ None,
+ )
+ }
+
+ fn test_data_file(name: &str, row_count: i64) -> DataFileMeta {
+ DataFileMeta {
+ file_name: name.to_string(),
+ file_size: 1024,
+ row_count,
+ min_key: vec![],
+ max_key: vec![],
+ key_stats: BinaryTableStats::new(vec![], vec![], vec![]),
+ value_stats: BinaryTableStats::new(vec![], vec![], vec![]),
+ min_sequence_number: 0,
+ max_sequence_number: 0,
+ schema_id: 0,
+ level: 0,
+ extra_files: vec![],
+ creation_time: Some(
+ "2024-09-06T07:45:55.039+00:00"
+ .parse::<DateTime<Utc>>()
+ .unwrap(),
+ ),
+ delete_row_count: Some(0),
+ embedded_index: None,
+ first_row_id: None,
+ write_cols: None,
+ external_path: None,
+ file_source: None,
+ value_stats_cols: None,
+ }
+ }
+
+ fn setup_commit(file_io: &FileIO, table_path: &str) -> TableCommit {
+ let table = test_table(file_io, table_path);
+ TableCommit::new(table, "test-user".to_string(), None)
+ }
+
+ fn setup_partitioned_commit(file_io: &FileIO, table_path: &str) ->
TableCommit {
+ let table = test_partitioned_table(file_io, table_path);
+ TableCommit::new(table, "test-user".to_string(), None)
+ }
+
+ fn partition_bytes(pt: &str) -> Vec<u8> {
+ use crate::spec::{DataType, VarCharType};
+ let datum = Datum::String(pt.to_string());
+ let dt = DataType::VarChar(VarCharType::string_type());
+ let datums = vec![(&datum, &dt)];
+ BinaryRow::from_datums(&datums).unwrap();
+ let mut builder = BinaryRowBuilder::new(1);
+ if pt.len() <= 7 {
+ builder.write_string_inline(0, pt);
+ } else {
+ builder.write_string(0, pt);
+ }
+ builder.build_serialized()
+ }
+
+ async fn setup_dirs(file_io: &FileIO, table_path: &str) {
+ file_io
+ .mkdirs(&format!("{table_path}/snapshot/"))
+ .await
+ .unwrap();
+ file_io
+ .mkdirs(&format!("{table_path}/manifest/"))
+ .await
+ .unwrap();
+ }
+
+ #[tokio::test]
+ async fn test_append_commit() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_append_commit";
+ setup_dirs(&file_io, table_path).await;
+
+ let commit = setup_commit(&file_io, table_path);
+
+ let messages = vec![CommitMessage::new(
+ vec![],
+ 0,
+ vec![test_data_file("data-0.parquet", 100)],
+ )];
+
+ commit.commit(messages).await.unwrap();
+
+ // Verify snapshot was created
+ let snap_manager = SnapshotManager::new(file_io.clone(),
table_path.to_string());
+ let snapshot =
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+ assert_eq!(snapshot.id(), 1);
+ assert_eq!(snapshot.commit_identifier(), BATCH_COMMIT_IDENTIFIER);
+ assert_eq!(snapshot.total_record_count(), Some(100));
+ assert_eq!(snapshot.delta_record_count(), Some(100));
+
+ // Verify manifest list was written
+ let manifest_dir = format!("{table_path}/manifest");
+ let delta_path = format!("{manifest_dir}/{}",
snapshot.delta_manifest_list());
+ let delta_metas = ManifestList::read(&file_io,
&delta_path).await.unwrap();
+ assert_eq!(delta_metas.len(), 1);
+ assert_eq!(delta_metas[0].num_added_files(), 1);
+
+ // Verify manifest entries
+ let manifest_path = format!("{manifest_dir}/{}",
delta_metas[0].file_name());
+ let entries = Manifest::read(&file_io, &manifest_path).await.unwrap();
+ assert_eq!(entries.len(), 1);
+ assert_eq!(*entries[0].kind(), FileKind::Add);
+ assert_eq!(entries[0].file().file_name, "data-0.parquet");
+ }
+
+ #[tokio::test]
+ async fn test_multiple_appends() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_multiple_appends";
+ setup_dirs(&file_io, table_path).await;
+
+ let commit = setup_commit(&file_io, table_path);
+
+ // First commit
+ commit
+ .commit(vec![CommitMessage::new(
+ vec![],
+ 0,
+ vec![test_data_file("data-0.parquet", 100)],
+ )])
+ .await
+ .unwrap();
+
+ // Second commit
+ commit
+ .commit(vec![CommitMessage::new(
+ vec![],
+ 0,
+ vec![test_data_file("data-1.parquet", 200)],
+ )])
+ .await
+ .unwrap();
+
+ let snap_manager = SnapshotManager::new(file_io.clone(),
table_path.to_string());
+ let snapshot =
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+ assert_eq!(snapshot.id(), 2);
+ assert_eq!(snapshot.total_record_count(), Some(300));
+ assert_eq!(snapshot.delta_record_count(), Some(200));
+ }
+
+ #[tokio::test]
+ async fn test_empty_commit_is_noop() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_empty_commit";
+ setup_dirs(&file_io, table_path).await;
+
+ let commit = setup_commit(&file_io, table_path);
+ commit.commit(vec![]).await.unwrap();
+
+ let snap_manager = SnapshotManager::new(file_io.clone(),
table_path.to_string());
+ let snapshot = snap_manager.get_latest_snapshot().await.unwrap();
+ assert!(snapshot.is_none());
+ }
+
+ #[tokio::test]
+ async fn test_truncate_table() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_truncate";
+ setup_dirs(&file_io, table_path).await;
+
+ let commit = setup_commit(&file_io, table_path);
+
+ // Append some data first
+ commit
+ .commit(vec![CommitMessage::new(
+ vec![],
+ 0,
+ vec![test_data_file("data-0.parquet", 100)],
+ )])
+ .await
+ .unwrap();
+
+ // Truncate
+ commit.truncate_table().await.unwrap();
+
+ let snap_manager = SnapshotManager::new(file_io.clone(),
table_path.to_string());
+ let snapshot =
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+ assert_eq!(snapshot.id(), 2);
+ assert_eq!(snapshot.commit_kind(), &CommitKind::OVERWRITE);
+ assert_eq!(snapshot.total_record_count(), Some(0));
+ assert_eq!(snapshot.delta_record_count(), Some(-100));
+ }
+
+ #[tokio::test]
+ async fn test_overwrite_partition() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_overwrite_partition";
+ setup_dirs(&file_io, table_path).await;
+
+ let commit = setup_partitioned_commit(&file_io, table_path);
+
+ // Append data for partition "a" and "b"
+ commit
+ .commit(vec![
+ CommitMessage::new(
+ partition_bytes("a"),
+ 0,
+ vec![test_data_file("data-a.parquet", 100)],
+ ),
+ CommitMessage::new(
+ partition_bytes("b"),
+ 0,
+ vec![test_data_file("data-b.parquet", 200)],
+ ),
+ ])
+ .await
+ .unwrap();
+
+ // Overwrite partition "a" with new data
+ let mut overwrite_partition = HashMap::new();
+ overwrite_partition.insert("pt".to_string(),
Datum::String("a".to_string()));
+
+ let table = test_partitioned_table(&file_io, table_path);
+ let overwrite_commit =
+ TableCommit::new(table, "test-user".to_string(),
Some(overwrite_partition));
+
+ overwrite_commit
+ .commit(vec![CommitMessage::new(
+ partition_bytes("a"),
+ 0,
+ vec![test_data_file("data-a2.parquet", 50)],
+ )])
+ .await
+ .unwrap();
+
+ let snap_manager = SnapshotManager::new(file_io.clone(),
table_path.to_string());
+ let snapshot =
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+ assert_eq!(snapshot.id(), 2);
+ assert_eq!(snapshot.commit_kind(), &CommitKind::OVERWRITE);
+ // 300 - 100 (delete a) + 50 (add a2) = 250
+ assert_eq!(snapshot.total_record_count(), Some(250));
+ }
+
+ #[tokio::test]
+ async fn test_drop_partitions() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_drop_partitions";
+ setup_dirs(&file_io, table_path).await;
+
+ let commit = setup_partitioned_commit(&file_io, table_path);
+
+ // Append data for partitions "a", "b", "c"
+ commit
+ .commit(vec![
+ CommitMessage::new(
+ partition_bytes("a"),
+ 0,
+ vec![test_data_file("data-a.parquet", 100)],
+ ),
+ CommitMessage::new(
+ partition_bytes("b"),
+ 0,
+ vec![test_data_file("data-b.parquet", 200)],
+ ),
+ CommitMessage::new(
+ partition_bytes("c"),
+ 0,
+ vec![test_data_file("data-c.parquet", 300)],
+ ),
+ ])
+ .await
+ .unwrap();
+
+ // Drop partitions "a" and "c"
+ let partitions = vec![
+ HashMap::from([("pt".to_string(),
Datum::String("a".to_string()))]),
+ HashMap::from([("pt".to_string(),
Datum::String("c".to_string()))]),
+ ];
+ commit.truncate_partitions(partitions).await.unwrap();
+
+ let snap_manager = SnapshotManager::new(file_io.clone(),
table_path.to_string());
+ let snapshot =
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+ assert_eq!(snapshot.id(), 2);
+ assert_eq!(snapshot.commit_kind(), &CommitKind::OVERWRITE);
+ // 600 - 100 (a) - 300 (c) = 200
+ assert_eq!(snapshot.total_record_count(), Some(200));
+ }
+}
diff --git a/crates/paimon/src/table/table_scan.rs
b/crates/paimon/src/table/table_scan.rs
index e21f00b..06fe87a 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -440,33 +440,23 @@ impl<'a> TableScan<'a> {
limited_splits
}
- async fn plan_snapshot(&self, snapshot: Snapshot) -> crate::Result<Plan> {
+ /// Read all manifest entries from a snapshot, applying filters and
merging.
+ ///
+ /// This is the shared entry point used by both `plan_snapshot` (scan) and
+ /// `TableCommit` (overwrite). Filters include partition predicate, data
+ /// predicates, and bucket predicate.
+ pub(crate) async fn plan_manifest_entries(
+ &self,
+ snapshot: &Snapshot,
+ ) -> crate::Result<Vec<ManifestEntry>> {
let file_io = self.table.file_io();
let table_path = self.table.location();
let core_options = CoreOptions::new(self.table.schema().options());
let deletion_vectors_enabled = core_options.deletion_vectors_enabled();
let data_evolution_enabled = core_options.data_evolution_enabled();
- let target_split_size = core_options.source_split_target_size();
- let open_file_cost = core_options.source_split_open_file_cost();
- // Resolve partition fields for manifest-file-level stats pruning.
- let partition_keys = self.table.schema().partition_keys();
- let partition_fields: Vec<DataField> = self
- .table
- .schema()
- .partition_keys()
- .iter()
- .filter_map(|key| {
- self.table
- .schema()
- .fields()
- .iter()
- .find(|f| f.name() == key)
- .cloned()
- })
- .collect();
+ let partition_fields = self.table.schema().partition_fields();
- // Data-evolution tables must not prune data files independently.
let pushdown_data_predicates = if data_evolution_enabled {
&[][..]
} else {
@@ -475,8 +465,6 @@ impl<'a> TableScan<'a> {
let has_primary_keys = !self.table.schema().primary_keys().is_empty();
- // Compute bucket predicate and key fields for per-entry bucket
pruning.
- // Only supported for the default bucket function (MurmurHash3-based).
let bucket_key_fields: Vec<DataField> =
if self.bucket_predicate.is_none() ||
!core_options.is_default_bucket_function() {
Vec::new()
@@ -509,7 +497,7 @@ impl<'a> TableScan<'a> {
let entries = read_all_manifest_entries(
file_io,
table_path,
- &snapshot,
+ snapshot,
deletion_vectors_enabled,
has_primary_keys,
self.partition_predicate.as_ref(),
@@ -521,7 +509,19 @@ impl<'a> TableScan<'a> {
&bucket_key_fields,
)
.await?;
- let entries = merge_manifest_entries(entries);
+ Ok(merge_manifest_entries(entries))
+ }
+
+ async fn plan_snapshot(&self, snapshot: Snapshot) -> crate::Result<Plan> {
+ let file_io = self.table.file_io();
+ let table_path = self.table.location();
+ let core_options = CoreOptions::new(self.table.schema().options());
+ let data_evolution_enabled = core_options.data_evolution_enabled();
+ let target_split_size = core_options.source_split_target_size();
+ let open_file_cost = core_options.source_split_open_file_cost();
+ let partition_keys = self.table.schema().partition_keys();
+
+ let entries = self.plan_manifest_entries(&snapshot).await?;
if entries.is_empty() {
return Ok(Plan::new(Vec::new()));
}
diff --git a/crates/paimon/src/table/write_builder.rs
b/crates/paimon/src/table/write_builder.rs
new file mode 100644
index 0000000..d6458cf
--- /dev/null
+++ b/crates/paimon/src/table/write_builder.rs
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! WriteBuilder for table write API.
+//!
+//! Reference: [pypaimon
WriteBuilder](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/write_builder.py)
+
+use crate::spec::Datum;
+use crate::table::{Table, TableCommit};
+use std::collections::HashMap;
+use uuid::Uuid;
+
+/// Builder for creating table writers and committers.
+///
+/// Provides `new_write` (TODO) and `new_commit` methods, with optional
+/// `overwrite` support for partition-level overwrites.
+pub struct WriteBuilder<'a> {
+ table: &'a Table,
+ commit_user: String,
+ overwrite_partition: Option<HashMap<String, Datum>>,
+}
+
+impl<'a> WriteBuilder<'a> {
+ pub fn new(table: &'a Table) -> Self {
+ Self {
+ table,
+ commit_user: Uuid::new_v4().to_string(),
+ overwrite_partition: None,
+ }
+ }
+
+ /// Set overwrite mode. If `partition` is None, overwrites the entire
table.
+ /// If `partition` is Some, overwrites only the specified partition.
+ pub fn overwrite(&mut self, partition: Option<HashMap<String, Datum>>) ->
&mut Self {
+ self.overwrite_partition = Some(partition.unwrap_or_default());
+ self
+ }
+
+ /// Create a new TableCommit for committing write results.
+ pub fn new_commit(&self) -> TableCommit {
+ TableCommit::new(
+ self.table.clone(),
+ self.commit_user.clone(),
+ self.overwrite_partition.clone(),
+ )
+ }
+
+ // TODO: pub fn new_write(&self) -> TableWrite { ... }
+}
diff --git a/crates/paimon/src/tantivy/directory.rs
b/crates/paimon/src/tantivy/directory.rs
index 32c3058..22440a6 100644
--- a/crates/paimon/src/tantivy/directory.rs
+++ b/crates/paimon/src/tantivy/directory.rs
@@ -368,9 +368,14 @@ mod tests {
#[tokio::test]
async fn test_read_file_from_archive() {
let dir = make_test_dir().await;
- // Read any file from the archive and verify it's non-empty.
- let first_path = dir.files.keys().next().unwrap().clone();
- let handle = dir.get_file_handle(&first_path).unwrap();
+ // Find a non-empty file (some tantivy index files can be 0 bytes).
+ let non_empty_path = dir
+ .files
+ .iter()
+ .find(|(_, meta)| meta.length > 0)
+ .map(|(p, _)| p.clone())
+ .expect("archive should contain at least one non-empty file");
+ let handle = dir.get_file_handle(&non_empty_path).unwrap();
assert!(handle.len() > 0);
let data = handle.read_bytes(0..handle.len()).unwrap();
assert_eq!(data.len(), handle.len());