This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new db71611  feat(table): Add commit pipeline with SnapshotCommit 
abstraction (#233)
db71611 is described below

commit db716113c50103430fbad85e33bd273dea886a89
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Apr 10 17:10:29 2026 +0800

    feat(table): Add commit pipeline with SnapshotCommit abstraction (#233)
    
    Implement the table write and commit infrastructure, including:
    - SnapshotCommit trait with RenamingSnapshotCommit (filesystem) and
      RESTSnapshotCommit (REST catalog) implementations
    - TableCommit with retry logic, append/overwrite/truncate support,
      partition statistics generation, and row tracking
    - WriteBuilder as the entry point for creating TableCommit instances,
      with overwrite mode configured at construction time
    - RESTEnv to hold REST catalog context (API client, identifier, uuid)
    - CommitMessage, PartitionStatistics, and ManifestList types
    - SnapshotManager extensions for atomic snapshot commit and latest hint
    - BinaryRow write_datum and datums_to_binary_row utilities
    - CoreOptions accessors for bucket, commit retry, and row tracking
    
    Reference: pypaimon commit pipeline
---
 .../datafusion/src/physical_plan/scan.rs           |   3 +
 crates/paimon/src/api/resource_paths.rs            |  12 +
 crates/paimon/src/api/rest_api.rs                  |  30 +-
 crates/paimon/src/catalog/filesystem.rs            |   1 +
 crates/paimon/src/catalog/rest/rest_catalog.rs     |  21 +-
 crates/paimon/src/lib.rs                           |   5 +-
 crates/paimon/src/spec/binary_row.rs               | 149 +++
 crates/paimon/src/spec/core_options.rs             |  85 ++
 crates/paimon/src/spec/manifest.rs                 |  21 +-
 crates/paimon/src/spec/manifest_entry.rs           |  77 +-
 crates/paimon/src/spec/manifest_file_meta.rs       |  24 +
 crates/paimon/src/spec/manifest_list.rs            | 120 +++
 crates/paimon/src/spec/mod.rs                      |   5 +
 crates/paimon/src/spec/objects_file.rs             | 104 ++-
 .../src/{lib.rs => spec/partition_statistics.rs}   |  45 +-
 crates/paimon/src/spec/schema.rs                   |   7 +
 crates/paimon/src/spec/snapshot.rs                 |  16 +
 .../paimon/src/{lib.rs => table/commit_message.rs} |  50 +-
 crates/paimon/src/table/mod.rs                     |  21 +
 crates/paimon/src/table/read_builder.rs            |   6 +
 crates/paimon/src/table/rest_env.rs                |  61 ++
 crates/paimon/src/table/snapshot_commit.rs         |  98 ++
 crates/paimon/src/table/snapshot_manager.rs        | 155 +++-
 crates/paimon/src/table/table_commit.rs            | 995 +++++++++++++++++++++
 crates/paimon/src/table/table_scan.rs              |  48 +-
 crates/paimon/src/table/write_builder.rs           |  63 ++
 crates/paimon/src/tantivy/directory.rs             |  11 +-
 27 files changed, 2121 insertions(+), 112 deletions(-)

diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs 
b/crates/integrations/datafusion/src/physical_plan/scan.rs
index 57ecfa4..a1c35a8 100644
--- a/crates/integrations/datafusion/src/physical_plan/scan.rs
+++ b/crates/integrations/datafusion/src/physical_plan/scan.rs
@@ -248,6 +248,7 @@ mod tests {
     use paimon::spec::{
         BinaryRow, DataType, Datum, IntType, PredicateBuilder, Schema as 
PaimonSchema, TableSchema,
     };
+    use paimon::table::Table;
     use std::fs;
     use tempfile::tempdir;
     use test_utils::{local_file_path, test_data_file, write_int_parquet_file};
@@ -298,6 +299,7 @@ mod tests {
             Identifier::new("test_db", "test_table"),
             "/tmp/test-table".to_string(),
             table_schema,
+            None,
         )
     }
 
@@ -329,6 +331,7 @@ mod tests {
             Identifier::new("default", "t"),
             table_path,
             table_schema,
+            None,
         );
 
         let split = paimon::DataSplitBuilder::new()
diff --git a/crates/paimon/src/api/resource_paths.rs 
b/crates/paimon/src/api/resource_paths.rs
index 1cbc88f..9345310 100644
--- a/crates/paimon/src/api/resource_paths.rs
+++ b/crates/paimon/src/api/resource_paths.rs
@@ -131,6 +131,18 @@ impl ResourcePaths {
     pub fn rename_table(&self) -> String {
         format!("{}/{}/rename", self.base_path, Self::TABLES)
     }
+
+    /// Get the commit table endpoint path.
+    pub fn commit_table(&self, database_name: &str, table_name: &str) -> 
String {
+        format!(
+            "{}/{}/{}/{}/{}/commit",
+            self.base_path,
+            Self::DATABASES,
+            RESTUtil::encode_string(database_name),
+            Self::TABLES,
+            RESTUtil::encode_string(table_name)
+        )
+    }
 }
 
 #[cfg(test)]
diff --git a/crates/paimon/src/api/rest_api.rs 
b/crates/paimon/src/api/rest_api.rs
index 00a32b8..8a3416e 100644
--- a/crates/paimon/src/api/rest_api.rs
+++ b/crates/paimon/src/api/rest_api.rs
@@ -25,7 +25,7 @@ use std::collections::HashMap;
 use crate::api::rest_client::HttpClient;
 use crate::catalog::Identifier;
 use crate::common::{CatalogOptions, Options};
-use crate::spec::Schema;
+use crate::spec::{PartitionStatistics, Schema, Snapshot};
 use crate::Result;
 
 use super::api_request::{
@@ -391,4 +391,32 @@ impl RESTApi {
         let path = self.resource_paths.table_token(database, table);
         self.client.get(&path, None::<&[(&str, &str)]>).await
     }
+
+    // ==================== Commit Operations ====================
+
+    /// Commit a snapshot for a table.
+    ///
+    /// Corresponds to Python `RESTApi.commit_snapshot`.
+    pub async fn commit_snapshot(
+        &self,
+        identifier: &Identifier,
+        table_uuid: &str,
+        snapshot: &Snapshot,
+        statistics: &[PartitionStatistics],
+    ) -> Result<bool> {
+        let database = identifier.database();
+        let table = identifier.object();
+        validate_non_empty_multi(&[(database, "database name"), (table, "table 
name")])?;
+        let path = self.resource_paths.commit_table(database, table);
+        let request = serde_json::json!({
+            "tableUuid": table_uuid,
+            "snapshot": snapshot,
+            "statistics": statistics,
+        });
+        let resp: serde_json::Value = self.client.post(&path, &request).await?;
+        Ok(resp
+            .get("success")
+            .and_then(|v| v.as_bool())
+            .unwrap_or(false))
+    }
 }
diff --git a/crates/paimon/src/catalog/filesystem.rs 
b/crates/paimon/src/catalog/filesystem.rs
index 313b6ef..a061e8a 100644
--- a/crates/paimon/src/catalog/filesystem.rs
+++ b/crates/paimon/src/catalog/filesystem.rs
@@ -327,6 +327,7 @@ impl Catalog for FileSystemCatalog {
             identifier.clone(),
             table_path,
             schema,
+            None,
         ))
     }
 
diff --git a/crates/paimon/src/catalog/rest/rest_catalog.rs 
b/crates/paimon/src/catalog/rest/rest_catalog.rs
index e5d023f..1da2aa1 100644
--- a/crates/paimon/src/catalog/rest/rest_catalog.rs
+++ b/crates/paimon/src/catalog/rest/rest_catalog.rs
@@ -21,6 +21,7 @@
 //! a Paimon REST catalog server for database and table CRUD operations.
 
 use std::collections::HashMap;
+use std::sync::Arc;
 
 use async_trait::async_trait;
 
@@ -32,7 +33,7 @@ use crate::common::{CatalogOptions, Options};
 use crate::error::Error;
 use crate::io::FileIO;
 use crate::spec::{Schema, SchemaChange, TableSchema};
-use crate::table::Table;
+use crate::table::{RESTEnv, Table};
 use crate::Result;
 
 use super::rest_token_file_io::RESTTokenFileIO;
@@ -44,8 +45,8 @@ use super::rest_token_file_io::RESTTokenFileIO;
 ///
 /// Corresponds to Python `RESTCatalog` in 
`pypaimon/catalog/rest/rest_catalog.py`.
 pub struct RESTCatalog {
-    /// The REST API client.
-    api: RESTApi,
+    /// The REST API client (shared with RESTEnv).
+    api: Arc<RESTApi>,
     /// Catalog configuration options.
     options: Options,
     /// Warehouse path.
@@ -71,7 +72,7 @@ impl RESTCatalog {
                 message: format!("Missing required option: {}", 
CatalogOptions::WAREHOUSE),
             })?;
 
-        let api = RESTApi::new(options.clone(), config_required).await?;
+        let api = Arc::new(RESTApi::new(options.clone(), 
config_required).await?);
 
         let data_token_enabled = api
             .options()
@@ -232,6 +233,15 @@ impl Catalog for RESTCatalog {
             source: None,
         })?;
 
+        // Extract table uuid for RESTEnv
+        let uuid = response.id.ok_or_else(|| Error::DataInvalid {
+            message: format!(
+                "Table {} response missing id (uuid)",
+                identifier.full_name()
+            ),
+            source: None,
+        })?;
+
         // Build FileIO based on data_token_enabled and is_external
         // TODO Support token cache and direct oss access
         let file_io = if self.data_token_enabled && !is_external {
@@ -244,11 +254,14 @@ impl Catalog for RESTCatalog {
             FileIO::from_path(&table_path)?.build()?
         };
 
+        let rest_env = RESTEnv::new(identifier.clone(), uuid, 
self.api.clone());
+
         Ok(Table::new(
             file_io,
             identifier.clone(),
             table_path,
             table_schema,
+            Some(rest_env),
         ))
     }
 
diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs
index a68ba6b..3867d69 100644
--- a/crates/paimon/src/lib.rs
+++ b/crates/paimon/src/lib.rs
@@ -42,6 +42,7 @@ pub use catalog::CatalogFactory;
 pub use catalog::FileSystemCatalog;
 
 pub use table::{
-    DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, 
ReadBuilder, RowRange,
-    SnapshotManager, Table, TableRead, TableScan, TagManager,
+    CommitMessage, DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, 
Plan, RESTEnv,
+    RESTSnapshotCommit, ReadBuilder, RenamingSnapshotCommit, RowRange, 
SnapshotCommit,
+    SnapshotManager, Table, TableCommit, TableRead, TableScan, TagManager, 
WriteBuilder,
 };
diff --git a/crates/paimon/src/spec/binary_row.rs 
b/crates/paimon/src/spec/binary_row.rs
index 1621476..0ba6b78 100644
--- a/crates/paimon/src/spec/binary_row.rs
+++ b/crates/paimon/src/spec/binary_row.rs
@@ -19,6 +19,7 @@
 //! and BinaryRowBuilder for constructing BinaryRow instances.
 
 use crate::spec::murmur_hash::hash_by_words;
+use crate::spec::{DataType, Datum};
 use serde::{Deserialize, Serialize};
 
 pub const EMPTY_BINARY_ROW: BinaryRow = BinaryRow::new(0);
@@ -523,6 +524,87 @@ impl BinaryRowBuilder {
         serialized.extend_from_slice(&self.data);
         serialized
     }
+
+    /// Write a Datum value at the given position, dispatching by type.
+    pub fn write_datum(&mut self, pos: usize, datum: &Datum, data_type: 
&DataType) {
+        match datum {
+            Datum::Bool(v) => self.write_boolean(pos, *v),
+            Datum::TinyInt(v) => self.write_byte(pos, *v),
+            Datum::SmallInt(v) => self.write_short(pos, *v),
+            Datum::Int(v) | Datum::Date(v) | Datum::Time(v) => 
self.write_int(pos, *v),
+            Datum::Long(v) => self.write_long(pos, *v),
+            Datum::Float(v) => self.write_float(pos, *v),
+            Datum::Double(v) => self.write_double(pos, *v),
+            Datum::Timestamp { millis, nanos } => {
+                let precision = match data_type {
+                    DataType::Timestamp(ts) => ts.precision(),
+                    _ => 3,
+                };
+                if precision <= 3 {
+                    self.write_timestamp_compact(pos, *millis);
+                } else {
+                    self.write_timestamp_non_compact(pos, *millis, *nanos);
+                }
+            }
+            Datum::LocalZonedTimestamp { millis, nanos } => {
+                let precision = match data_type {
+                    DataType::LocalZonedTimestamp(ts) => ts.precision(),
+                    _ => 3,
+                };
+                if precision <= 3 {
+                    self.write_timestamp_compact(pos, *millis);
+                } else {
+                    self.write_timestamp_non_compact(pos, *millis, *nanos);
+                }
+            }
+            Datum::Decimal {
+                unscaled,
+                precision,
+                ..
+            } => {
+                if *precision <= 18 {
+                    self.write_decimal_compact(pos, *unscaled as i64);
+                } else {
+                    self.write_decimal_var_len(pos, *unscaled);
+                }
+            }
+            Datum::String(s) => {
+                if s.len() <= 7 {
+                    self.write_string_inline(pos, s);
+                } else {
+                    self.write_string(pos, s);
+                }
+            }
+            Datum::Bytes(b) => {
+                if b.len() <= 7 {
+                    self.write_binary_inline(pos, b);
+                } else {
+                    self.write_binary(pos, b);
+                }
+            }
+        }
+    }
+}
+
+/// Build a serialized BinaryRow from optional Datum values.
+/// Returns empty vec if all values are None.
+pub fn datums_to_binary_row(datums: &[(&Option<Datum>, &DataType)]) -> Vec<u8> 
{
+    if datums.iter().all(|(d, _)| d.is_none()) {
+        return vec![];
+    }
+    let arity = datums.len() as i32;
+    let mut builder = BinaryRowBuilder::new(arity);
+    for (pos, (datum_opt, data_type)) in datums.iter().enumerate() {
+        match datum_opt {
+            Some(datum) => {
+                builder.write_datum(pos, datum, data_type);
+            }
+            None => {
+                builder.set_null_at(pos);
+            }
+        }
+    }
+    builder.build_serialized()
 }
 
 #[cfg(test)]
@@ -756,6 +838,73 @@ mod tests {
         assert_eq!(nano, 0);
     }
 
+    #[test]
+    fn test_write_datum_int_and_string() {
+        let mut builder = BinaryRowBuilder::new(2);
+        builder.write_datum(
+            0,
+            &Datum::Int(42),
+            &DataType::Int(crate::spec::IntType::new()),
+        );
+        builder.write_datum(
+            1,
+            &Datum::String("hello".to_string()),
+            &DataType::VarChar(crate::spec::VarCharType::string_type()),
+        );
+        let row = builder.build();
+        assert_eq!(row.get_int(0).unwrap(), 42);
+        assert_eq!(row.get_string(1).unwrap(), "hello");
+    }
+
+    #[test]
+    fn test_write_datum_long_string() {
+        let mut builder = BinaryRowBuilder::new(1);
+        builder.write_datum(
+            0,
+            &Datum::String("long_string_value".to_string()),
+            &DataType::VarChar(crate::spec::VarCharType::string_type()),
+        );
+        let row = builder.build();
+        assert_eq!(row.get_string(0).unwrap(), "long_string_value");
+    }
+
+    #[test]
+    fn test_datums_to_binary_row_roundtrip() {
+        let d1 = Some(Datum::Int(100));
+        let d2 = Some(Datum::String("abc".to_string()));
+        let dt1 = DataType::Int(crate::spec::IntType::new());
+        let dt2 = DataType::VarChar(crate::spec::VarCharType::string_type());
+        let datums = vec![(&d1, &dt1), (&d2, &dt2)];
+        let bytes = datums_to_binary_row(&datums);
+        assert!(!bytes.is_empty());
+        let row = BinaryRow::from_serialized_bytes(&bytes).unwrap();
+        assert_eq!(row.get_int(0).unwrap(), 100);
+        assert_eq!(row.get_string(1).unwrap(), "abc");
+    }
+
+    #[test]
+    fn test_datums_to_binary_row_all_none() {
+        let d1: Option<Datum> = None;
+        let dt1 = DataType::Int(crate::spec::IntType::new());
+        let datums = vec![(&d1, &dt1)];
+        let bytes = datums_to_binary_row(&datums);
+        assert!(bytes.is_empty());
+    }
+
+    #[test]
+    fn test_datums_to_binary_row_mixed_null() {
+        let d1 = Some(Datum::Int(7));
+        let d2: Option<Datum> = None;
+        let dt1 = DataType::Int(crate::spec::IntType::new());
+        let dt2 = DataType::Int(crate::spec::IntType::new());
+        let datums = vec![(&d1, &dt1), (&d2, &dt2)];
+        let bytes = datums_to_binary_row(&datums);
+        assert!(!bytes.is_empty());
+        let row = BinaryRow::from_serialized_bytes(&bytes).unwrap();
+        assert_eq!(row.get_int(0).unwrap(), 7);
+        assert!(row.is_null_at(1));
+    }
+
     #[test]
     fn test_get_timestamp_non_compact() {
         let epoch_millis: i64 = 1_704_067_200_123;
diff --git a/crates/paimon/src/spec/core_options.rs 
b/crates/paimon/src/spec/core_options.rs
index 93c3fec..17993d9 100644
--- a/crates/paimon/src/spec/core_options.rs
+++ b/crates/paimon/src/spec/core_options.rs
@@ -26,6 +26,17 @@ const PARTITION_DEFAULT_NAME_OPTION: &str = 
"partition.default-name";
 const PARTITION_LEGACY_NAME_OPTION: &str = "partition.legacy-name";
 const BUCKET_KEY_OPTION: &str = "bucket-key";
 const BUCKET_FUNCTION_TYPE_OPTION: &str = "bucket-function.type";
+const BUCKET_OPTION: &str = "bucket";
+const DEFAULT_BUCKET: i32 = 1;
+const COMMIT_MAX_RETRIES_OPTION: &str = "commit.max-retries";
+const COMMIT_TIMEOUT_OPTION: &str = "commit.timeout";
+const COMMIT_MIN_RETRY_WAIT_OPTION: &str = "commit.min-retry-wait";
+const COMMIT_MAX_RETRY_WAIT_OPTION: &str = "commit.max-retry-wait";
+const ROW_TRACKING_ENABLED_OPTION: &str = "row-tracking.enabled";
+const DEFAULT_COMMIT_MAX_RETRIES: u32 = 10;
+const DEFAULT_COMMIT_TIMEOUT_MS: u64 = 120_000;
+const DEFAULT_COMMIT_MIN_RETRY_WAIT_MS: u64 = 1_000;
+const DEFAULT_COMMIT_MAX_RETRY_WAIT_MS: u64 = 10_000;
 pub const SCAN_SNAPSHOT_ID_OPTION: &str = "scan.snapshot-id";
 pub const SCAN_TIMESTAMP_MILLIS_OPTION: &str = "scan.timestamp-millis";
 pub const SCAN_TAG_NAME_OPTION: &str = "scan.tag-name";
@@ -200,6 +211,49 @@ impl<'a> CoreOptions<'a> {
             .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
     }
 
+    pub fn commit_max_retries(&self) -> u32 {
+        self.options
+            .get(COMMIT_MAX_RETRIES_OPTION)
+            .and_then(|v| v.parse().ok())
+            .unwrap_or(DEFAULT_COMMIT_MAX_RETRIES)
+    }
+
+    pub fn commit_timeout_ms(&self) -> u64 {
+        self.options
+            .get(COMMIT_TIMEOUT_OPTION)
+            .and_then(|v| v.parse().ok())
+            .unwrap_or(DEFAULT_COMMIT_TIMEOUT_MS)
+    }
+
+    pub fn commit_min_retry_wait_ms(&self) -> u64 {
+        self.options
+            .get(COMMIT_MIN_RETRY_WAIT_OPTION)
+            .and_then(|v| v.parse().ok())
+            .unwrap_or(DEFAULT_COMMIT_MIN_RETRY_WAIT_MS)
+    }
+
+    pub fn commit_max_retry_wait_ms(&self) -> u64 {
+        self.options
+            .get(COMMIT_MAX_RETRY_WAIT_OPTION)
+            .and_then(|v| v.parse().ok())
+            .unwrap_or(DEFAULT_COMMIT_MAX_RETRY_WAIT_MS)
+    }
+
+    pub fn row_tracking_enabled(&self) -> bool {
+        self.options
+            .get(ROW_TRACKING_ENABLED_OPTION)
+            .map(|v| v.eq_ignore_ascii_case("true"))
+            .unwrap_or(false)
+    }
+
+    /// Number of buckets for the table. Default is 1.
+    pub fn bucket(&self) -> i32 {
+        self.options
+            .get(BUCKET_OPTION)
+            .and_then(|v| v.parse().ok())
+            .unwrap_or(DEFAULT_BUCKET)
+    }
+
     /// Whether the bucket function type is the default hash-based function.
     ///
     /// Only the default function (`Math.abs(hash % numBuckets)`) is supported
@@ -363,6 +417,37 @@ mod tests {
         }
     }
 
+    #[test]
+    fn test_commit_options_defaults() {
+        let options = HashMap::new();
+        let core = CoreOptions::new(&options);
+        assert_eq!(core.bucket(), 1);
+        assert_eq!(core.commit_max_retries(), 10);
+        assert_eq!(core.commit_timeout_ms(), 120_000);
+        assert_eq!(core.commit_min_retry_wait_ms(), 1_000);
+        assert_eq!(core.commit_max_retry_wait_ms(), 10_000);
+        assert!(!core.row_tracking_enabled());
+    }
+
+    #[test]
+    fn test_commit_options_custom() {
+        let options = HashMap::from([
+            (BUCKET_OPTION.to_string(), "4".to_string()),
+            (COMMIT_MAX_RETRIES_OPTION.to_string(), "20".to_string()),
+            (COMMIT_TIMEOUT_OPTION.to_string(), "60000".to_string()),
+            (COMMIT_MIN_RETRY_WAIT_OPTION.to_string(), "500".to_string()),
+            (COMMIT_MAX_RETRY_WAIT_OPTION.to_string(), "5000".to_string()),
+            (ROW_TRACKING_ENABLED_OPTION.to_string(), "true".to_string()),
+        ]);
+        let core = CoreOptions::new(&options);
+        assert_eq!(core.bucket(), 4);
+        assert_eq!(core.commit_max_retries(), 20);
+        assert_eq!(core.commit_timeout_ms(), 60_000);
+        assert_eq!(core.commit_min_retry_wait_ms(), 500);
+        assert_eq!(core.commit_max_retry_wait_ms(), 5_000);
+        assert!(core.row_tracking_enabled());
+    }
+
     #[test]
     fn test_try_time_travel_selector_normalizes_valid_selector() {
         let tag_options = HashMap::from([(SCAN_TAG_NAME_OPTION.to_string(), 
"tag1".to_string())]);
diff --git a/crates/paimon/src/spec/manifest.rs 
b/crates/paimon/src/spec/manifest.rs
index 1893a7d..84f3629 100644
--- a/crates/paimon/src/spec/manifest.rs
+++ b/crates/paimon/src/spec/manifest.rs
@@ -17,6 +17,7 @@
 
 use crate::io::FileIO;
 use crate::spec::manifest_entry::ManifestEntry;
+use crate::spec::manifest_entry::MANIFEST_ENTRY_SCHEMA;
 use serde_avro_fast::object_container_file_encoding::Reader;
 use snafu::ResultExt;
 
@@ -32,13 +33,6 @@ pub struct Manifest;
 
 impl Manifest {
     /// Read manifest entries from a file.
-    ///
-    /// # Arguments
-    /// * `file_io` - FileIO instance for reading files
-    /// * `path` - Path to the manifest file
-    ///
-    /// # Returns
-    /// A vector of ManifestEntry records
     pub async fn read(file_io: &FileIO, path: &str) -> 
Result<Vec<ManifestEntry>> {
         let input_file = file_io.new_input(path)?;
 
@@ -51,12 +45,6 @@ impl Manifest {
     }
 
     /// Read manifest entries from bytes.
-    ///
-    /// # Arguments
-    /// * `bytes` - Avro-encoded manifest file content
-    ///
-    /// # Returns
-    /// A vector of ManifestEntry records
     fn read_from_bytes(bytes: &[u8]) -> Result<Vec<ManifestEntry>> {
         let mut reader =
             Reader::from_slice(bytes).whatever_context::<_, 
crate::Error>("read manifest avro")?;
@@ -65,6 +53,13 @@ impl Manifest {
             .collect::<std::result::Result<Vec<_>, _>>()
             .whatever_context::<_, crate::Error>("deserialize manifest entry")
     }
+
+    /// Write manifest entries to a file.
+    pub async fn write(file_io: &FileIO, path: &str, entries: 
&[ManifestEntry]) -> Result<()> {
+        let bytes = crate::spec::to_avro_bytes(MANIFEST_ENTRY_SCHEMA, 
entries)?;
+        let output = file_io.new_output(path)?;
+        output.write(bytes::Bytes::from(bytes)).await
+    }
 }
 
 #[cfg(test)]
diff --git a/crates/paimon/src/spec/manifest_entry.rs 
b/crates/paimon/src/spec/manifest_entry.rs
index d7aa32c..cbe295b 100644
--- a/crates/paimon/src/spec/manifest_entry.rs
+++ b/crates/paimon/src/spec/manifest_entry.rs
@@ -35,7 +35,7 @@ pub struct Identifier {
 
 /// Entry of a manifest file, representing an addition / deletion of a data 
file.
 /// Impl Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java>
-#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 pub struct ManifestEntry {
     #[serde(rename = "_KIND")]
     kind: FileKind,
@@ -124,4 +124,79 @@ impl ManifestEntry {
             version,
         }
     }
+
+    /// Return a copy with a different kind.
+    pub fn with_kind(mut self, kind: FileKind) -> Self {
+        self.kind = kind;
+        self
+    }
+
+    /// Return a copy with sequence numbers set on the file.
+    pub fn with_sequence_number(mut self, min_seq: i64, max_seq: i64) -> Self {
+        self.file.min_sequence_number = min_seq;
+        self.file.max_sequence_number = max_seq;
+        self
+    }
+
+    /// Return a copy with first_row_id set on the file.
+    pub fn with_first_row_id(mut self, first_row_id: i64) -> Self {
+        self.file.first_row_id = Some(first_row_id);
+        self
+    }
 }
+
+/// Avro schema for ManifestEntry (used in manifest files).
+pub const MANIFEST_ENTRY_SCHEMA: &str = r#"["null", {
+    "type": "record",
+    "name": "record",
+    "namespace": "org.apache.paimon.avro.generated",
+    "fields": [
+        {"name": "_KIND", "type": "int"},
+        {"name": "_PARTITION", "type": "bytes"},
+        {"name": "_BUCKET", "type": "int"},
+        {"name": "_TOTAL_BUCKETS", "type": "int"},
+        {"name": "_FILE", "type": ["null", {
+            "type": "record",
+            "name": "record__FILE",
+            "fields": [
+                {"name": "_FILE_NAME", "type": "string"},
+                {"name": "_FILE_SIZE", "type": "long"},
+                {"name": "_ROW_COUNT", "type": "long"},
+                {"name": "_MIN_KEY", "type": "bytes"},
+                {"name": "_MAX_KEY", "type": "bytes"},
+                {"name": "_KEY_STATS", "type": ["null", {
+                    "type": "record",
+                    "name": "record__FILE__KEY_STATS",
+                    "fields": [
+                        {"name": "_MIN_VALUES", "type": "bytes"},
+                        {"name": "_MAX_VALUES", "type": "bytes"},
+                        {"name": "_NULL_COUNTS", "type": ["null", {"type": 
"array", "items": ["null", "long"]}], "default": null}
+                    ]
+                }], "default": null},
+                {"name": "_VALUE_STATS", "type": ["null", {
+                    "type": "record",
+                    "name": "record__FILE__VALUE_STATS",
+                    "fields": [
+                        {"name": "_MIN_VALUES", "type": "bytes"},
+                        {"name": "_MAX_VALUES", "type": "bytes"},
+                        {"name": "_NULL_COUNTS", "type": ["null", {"type": 
"array", "items": ["null", "long"]}], "default": null}
+                    ]
+                }], "default": null},
+                {"name": "_MIN_SEQUENCE_NUMBER", "type": "long"},
+                {"name": "_MAX_SEQUENCE_NUMBER", "type": "long"},
+                {"name": "_SCHEMA_ID", "type": "long"},
+                {"name": "_LEVEL", "type": "int"},
+                {"name": "_EXTRA_FILES", "type": {"type": "array", "items": 
"string"}},
+                {"name": "_CREATION_TIME", "type": ["null", {"type": "long", 
"logicalType": "timestamp-millis"}], "default": null},
+                {"name": "_DELETE_ROW_COUNT", "type": ["null", "long"], 
"default": null},
+                {"name": "_EMBEDDED_FILE_INDEX", "type": ["null", "bytes"], 
"default": null},
+                {"name": "_FILE_SOURCE", "type": ["null", "int"], "default": 
null},
+                {"name": "_VALUE_STATS_COLS", "type": ["null", {"type": 
"array", "items": "string"}], "default": null},
+                {"name": "_EXTERNAL_PATH", "type": ["null", "string"], 
"default": null},
+                {"name": "_FIRST_ROW_ID", "type": ["null", "long"], "default": 
null},
+                {"name": "_WRITE_COLS", "type": ["null", {"type": "array", 
"items": "string"}], "default": null}
+            ]
+        }], "default": null},
+        {"name": "_VERSION", "type": "int"}
+    ]
+}]"#;
diff --git a/crates/paimon/src/spec/manifest_file_meta.rs 
b/crates/paimon/src/spec/manifest_file_meta.rs
index 36f92b9..b74389e 100644
--- a/crates/paimon/src/spec/manifest_file_meta.rs
+++ b/crates/paimon/src/spec/manifest_file_meta.rs
@@ -115,6 +115,30 @@ impl ManifestFileMeta {
     }
 }
 
+/// Avro schema for ManifestFileMeta (used in manifest-list files).
+pub const MANIFEST_FILE_META_SCHEMA: &str = r#"["null", {
+    "type": "record",
+    "name": "record",
+    "namespace": "org.apache.paimon.avro.generated",
+    "fields": [
+        {"name": "_VERSION", "type": "int"},
+        {"name": "_FILE_NAME", "type": "string"},
+        {"name": "_FILE_SIZE", "type": "long"},
+        {"name": "_NUM_ADDED_FILES", "type": "long"},
+        {"name": "_NUM_DELETED_FILES", "type": "long"},
+        {"name": "_PARTITION_STATS", "type": ["null", {
+            "type": "record",
+            "name": "record__PARTITION_STATS",
+            "fields": [
+                {"name": "_MIN_VALUES", "type": "bytes"},
+                {"name": "_MAX_VALUES", "type": "bytes"},
+                {"name": "_NULL_COUNTS", "type": ["null", {"type": "array", 
"items": ["null", "long"]}], "default": null}
+            ]
+        }], "default": null},
+        {"name": "_SCHEMA_ID", "type": "long"}
+    ]
+}]"#;
+
 impl Display for ManifestFileMeta {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         write!(
diff --git a/crates/paimon/src/spec/manifest_list.rs 
b/crates/paimon/src/spec/manifest_list.rs
new file mode 100644
index 0000000..050edf4
--- /dev/null
+++ b/crates/paimon/src/spec/manifest_list.rs
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::io::FileIO;
+use crate::spec::manifest_file_meta::MANIFEST_FILE_META_SCHEMA;
+use crate::spec::ManifestFileMeta;
+use crate::Result;
+
+/// Manifest list file reader and writer.
+///
+/// A manifest list file contains a list of ManifestFileMeta records in Avro 
format.
+/// Each record describes a manifest file.
+///
+/// Impl Reference: 
<https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java>
+pub struct ManifestList;
+
+impl ManifestList {
+    /// Read manifest file metas from a manifest list file.
+    pub async fn read(file_io: &FileIO, path: &str) -> 
Result<Vec<ManifestFileMeta>> {
+        let input = file_io.new_input(path)?;
+        if !input.exists().await? {
+            return Ok(Vec::new());
+        }
+        let content = input.read().await?;
+        crate::spec::from_avro_bytes(&content)
+    }
+
+    /// Write manifest file metas to a manifest list file.
+    pub async fn write(file_io: &FileIO, path: &str, metas: 
&[ManifestFileMeta]) -> Result<()> {
+        let bytes = crate::spec::to_avro_bytes(MANIFEST_FILE_META_SCHEMA, 
metas)?;
+        let output = file_io.new_output(path)?;
+        output.write(bytes::Bytes::from(bytes)).await
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::io::FileIOBuilder;
+    use crate::spec::stats::BinaryTableStats;
+
+    fn test_file_io() -> FileIO {
+        FileIOBuilder::new("memory").build().unwrap()
+    }
+
+    #[tokio::test]
+    async fn test_manifest_list_roundtrip() {
+        let file_io = test_file_io();
+        let path = "memory:/test_manifest_list_roundtrip/manifest-list-0";
+        file_io
+            .mkdirs("memory:/test_manifest_list_roundtrip/")
+            .await
+            .unwrap();
+
+        let value_bytes = vec![
+            0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 49, 0, 
0, 0, 0, 0, 0, 129,
+        ];
+        let original = vec![
+            ManifestFileMeta::new(
+                "manifest-a".to_string(),
+                1024,
+                5,
+                2,
+                BinaryTableStats::new(value_bytes.clone(), 
value_bytes.clone(), vec![Some(1)]),
+                0,
+            ),
+            ManifestFileMeta::new(
+                "manifest-b".to_string(),
+                2048,
+                10,
+                0,
+                BinaryTableStats::new(value_bytes.clone(), 
value_bytes.clone(), vec![Some(3)]),
+                1,
+            ),
+        ];
+
+        ManifestList::write(&file_io, path, &original)
+            .await
+            .unwrap();
+        let decoded = ManifestList::read(&file_io, path).await.unwrap();
+        assert_eq!(original, decoded);
+    }
+
+    #[tokio::test]
+    async fn test_manifest_list_read_nonexistent() {
+        let file_io = test_file_io();
+        let result = ManifestList::read(&file_io, 
"memory:/nonexistent/manifest-list")
+            .await
+            .unwrap();
+        assert!(result.is_empty());
+    }
+
+    #[tokio::test]
+    async fn test_manifest_list_write_empty() {
+        let file_io = test_file_io();
+        let path = "memory:/test_manifest_list_empty/manifest-list-0";
+        file_io
+            .mkdirs("memory:/test_manifest_list_empty/")
+            .await
+            .unwrap();
+
+        ManifestList::write(&file_io, path, &[]).await.unwrap();
+        let decoded = ManifestList::read(&file_io, path).await.unwrap();
+        assert!(decoded.is_empty());
+    }
+}
diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs
index 50402a3..f30df9f 100644
--- a/crates/paimon/src/spec/mod.rs
+++ b/crates/paimon/src/spec/mod.rs
@@ -53,8 +53,11 @@ pub use manifest_common::FileKind;
 mod manifest_entry;
 pub use manifest_entry::Identifier;
 pub use manifest_entry::ManifestEntry;
+mod manifest_list;
+pub use manifest_list::ManifestList;
 mod objects_file;
 pub use objects_file::from_avro_bytes;
+pub use objects_file::to_avro_bytes;
 pub(crate) mod stats;
 mod types;
 pub use types::*;
@@ -67,3 +70,5 @@ pub use predicate::{
     field_idx_to_partition_idx, Datum, Predicate, PredicateBuilder, 
PredicateOperator,
 };
 pub(crate) mod murmur_hash;
+mod partition_statistics;
+pub use partition_statistics::PartitionStatistics;
diff --git a/crates/paimon/src/spec/objects_file.rs 
b/crates/paimon/src/spec/objects_file.rs
index 171de1d..864ea00 100644
--- a/crates/paimon/src/spec/objects_file.rs
+++ b/crates/paimon/src/spec/objects_file.rs
@@ -16,7 +16,8 @@
 // under the License.
 
 use serde::de::DeserializeOwned;
-use serde_avro_fast::object_container_file_encoding::Reader;
+use serde::Serialize;
+use serde_avro_fast::object_container_file_encoding::{Compression, Reader};
 use snafu::ResultExt;
 
 pub fn from_avro_bytes<T: DeserializeOwned>(bytes: &[u8]) -> 
crate::Result<Vec<T>> {
@@ -28,15 +29,112 @@ pub fn from_avro_bytes<T: DeserializeOwned>(bytes: &[u8]) 
-> crate::Result<Vec<T
         .whatever_context::<_, crate::Error>("deserialize avro records")
 }
 
+/// Serialize records into Avro Object Container File bytes.
+///
+/// The `schema_json` must be a valid Avro schema JSON string that matches
+/// the serde serialization layout of `T`.
+pub fn to_avro_bytes<T: Serialize>(schema_json: &str, records: &[T]) -> 
crate::Result<Vec<u8>> {
+    let schema: serde_avro_fast::Schema =
+        schema_json
+            .parse()
+            .map_err(
+                |e: serde_avro_fast::schema::SchemaError| 
crate::Error::DataInvalid {
+                    message: format!("invalid avro schema: {e}"),
+                    source: Some(Box::new(e)),
+                },
+            )?;
+    serde_avro_fast::object_container_file_encoding::write_all(
+        &schema,
+        Compression::Null,
+        Vec::new(),
+        records.iter(),
+    )
+    .map_err(|e| crate::Error::DataInvalid {
+        message: format!("avro serialization failed: {e}"),
+        source: Some(Box::new(e)),
+    })
+}
+
 #[cfg(test)]
 mod tests {
+    use super::*;
     use crate::spec::manifest_common::FileKind;
-    use crate::spec::manifest_entry::ManifestEntry;
-    use crate::spec::objects_file::from_avro_bytes;
+    use crate::spec::manifest_entry::{ManifestEntry, MANIFEST_ENTRY_SCHEMA};
+    use crate::spec::manifest_file_meta::MANIFEST_FILE_META_SCHEMA;
     use crate::spec::stats::BinaryTableStats;
     use crate::spec::{DataFileMeta, ManifestFileMeta};
     use chrono::{DateTime, Utc};
 
+    #[test]
+    fn test_roundtrip_manifest_file_meta() {
+        let value_bytes = vec![
+            0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 49, 0, 
0, 0, 0, 0, 0, 129,
+        ];
+        let original = vec![ManifestFileMeta::new(
+            "manifest-test-0".to_string(),
+            1024,
+            5,
+            2,
+            BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), 
vec![Some(1)]),
+            0,
+        )];
+        let bytes = to_avro_bytes(MANIFEST_FILE_META_SCHEMA, 
&original).unwrap();
+        let decoded = from_avro_bytes::<ManifestFileMeta>(&bytes).unwrap();
+        assert_eq!(original, decoded);
+    }
+
+    #[test]
+    fn test_roundtrip_manifest_entry() {
+        let value_bytes = vec![
+            0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 49, 0, 0, 0, 0, 0, 0, 129, 1, 
0, 0, 0, 0, 0, 0, 0,
+        ];
+        let single_value = vec![0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 
0, 0, 0, 0, 0];
+        let original = vec![ManifestEntry::new(
+            FileKind::Add,
+            single_value.clone(),
+            1,
+            10,
+            DataFileMeta {
+                file_name: "test.parquet".to_string(),
+                file_size: 100,
+                row_count: 50,
+                min_key: single_value.clone(),
+                max_key: single_value.clone(),
+                key_stats: BinaryTableStats::new(
+                    value_bytes.clone(),
+                    value_bytes.clone(),
+                    vec![Some(1), Some(2)],
+                ),
+                value_stats: BinaryTableStats::new(
+                    value_bytes.clone(),
+                    value_bytes.clone(),
+                    vec![Some(1), Some(2)],
+                ),
+                min_sequence_number: 1,
+                max_sequence_number: 50,
+                schema_id: 0,
+                level: 0,
+                extra_files: vec![],
+                creation_time: Some(
+                    "2024-09-06T07:45:55.039+00:00"
+                        .parse::<DateTime<Utc>>()
+                        .unwrap(),
+                ),
+                delete_row_count: Some(0),
+                embedded_index: None,
+                first_row_id: None,
+                write_cols: None,
+                external_path: None,
+                file_source: None,
+                value_stats_cols: None,
+            },
+            2,
+        )];
+        let bytes = to_avro_bytes(MANIFEST_ENTRY_SCHEMA, &original).unwrap();
+        let decoded = from_avro_bytes::<ManifestEntry>(&bytes).unwrap();
+        assert_eq!(original, decoded);
+    }
+
     #[tokio::test]
     async fn test_read_manifest_list() {
         let workdir =
diff --git a/crates/paimon/src/lib.rs 
b/crates/paimon/src/spec/partition_statistics.rs
similarity index 53%
copy from crates/paimon/src/lib.rs
copy to crates/paimon/src/spec/partition_statistics.rs
index a68ba6b..30f0579 100644
--- a/crates/paimon/src/lib.rs
+++ b/crates/paimon/src/spec/partition_statistics.rs
@@ -15,33 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod error;
-pub use error::Error;
-pub use error::Result;
+use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
 
-pub mod common;
-pub use common::{CatalogOptions, Options};
-
-pub mod api;
-pub use api::rest_api::RESTApi;
-
-pub mod arrow;
-pub mod btree;
-pub mod catalog;
-mod deletion_vector;
-pub mod file_index;
-pub mod io;
-mod predicate_stats;
-pub mod spec;
-pub mod table;
-#[cfg(feature = "fulltext")]
-pub mod tantivy;
-
-pub use catalog::Catalog;
-pub use catalog::CatalogFactory;
-pub use catalog::FileSystemCatalog;
-
-pub use table::{
-    DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, 
ReadBuilder, RowRange,
-    SnapshotManager, Table, TableRead, TableScan, TagManager,
-};
+/// Partition-level statistics for snapshot commits.
+///
+/// Reference: 
[org.apache.paimon.partition.PartitionStatistics](https://github.com/apache/paimon)
+/// and [pypaimon snapshot_commit.py 
PartitionStatistics](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/snapshot/snapshot_commit.py)
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct PartitionStatistics {
+    pub spec: HashMap<String, String>,
+    pub record_count: i64,
+    pub file_size_in_bytes: i64,
+    pub file_count: i64,
+    pub last_file_creation_time: u64,
+    pub total_buckets: i32,
+}
diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs
index 39d6aee..62ab3fe 100644
--- a/crates/paimon/src/spec/schema.rs
+++ b/crates/paimon/src/spec/schema.rs
@@ -86,6 +86,13 @@ impl TableSchema {
         &self.partition_keys
     }
 
+    pub fn partition_fields(&self) -> Vec<DataField> {
+        self.partition_keys
+            .iter()
+            .filter_map(|key| self.fields.iter().find(|f| f.name() == 
key).cloned())
+            .collect()
+    }
+
     pub fn primary_keys(&self) -> &[String] {
         &self.primary_keys
     }
diff --git a/crates/paimon/src/spec/snapshot.rs 
b/crates/paimon/src/spec/snapshot.rs
index 28dc92d..013211b 100644
--- a/crates/paimon/src/spec/snapshot.rs
+++ b/crates/paimon/src/spec/snapshot.rs
@@ -92,6 +92,10 @@ pub struct Snapshot {
     #[builder(default = None)]
     #[serde(skip_serializing_if = "Option::is_none")]
     statistics: Option<String>,
+    /// next row id for row tracking
+    #[builder(default = None)]
+    #[serde(skip_serializing_if = "Option::is_none")]
+    next_row_id: Option<i64>,
 }
 
 impl Snapshot {
@@ -190,6 +194,18 @@ impl Snapshot {
     pub fn statistics(&self) -> Option<&str> {
         self.statistics.as_deref()
     }
+
+    /// Get the next row id of this snapshot.
+    #[inline]
+    pub fn next_row_id(&self) -> Option<i64> {
+        self.next_row_id
+    }
+
+    /// Get the commit kind of this snapshot.
+    #[inline]
+    pub fn commit_kind(&self) -> &CommitKind {
+        &self.commit_kind
+    }
 }
 
 #[cfg(test)]
diff --git a/crates/paimon/src/lib.rs 
b/crates/paimon/src/table/commit_message.rs
similarity index 51%
copy from crates/paimon/src/lib.rs
copy to crates/paimon/src/table/commit_message.rs
index a68ba6b..4e9c4ed 100644
--- a/crates/paimon/src/lib.rs
+++ b/crates/paimon/src/table/commit_message.rs
@@ -15,33 +15,27 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod error;
-pub use error::Error;
-pub use error::Result;
+use crate::spec::DataFileMeta;
 
-pub mod common;
-pub use common::{CatalogOptions, Options};
+/// A commit message representing new files to be committed for a specific 
partition and bucket.
+///
+/// Reference: 
[org.apache.paimon.table.sink.CommitMessage](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java)
+#[derive(Debug, Clone)]
+pub struct CommitMessage {
+    /// Binary row bytes for the partition.
+    pub partition: Vec<u8>,
+    /// Bucket id.
+    pub bucket: i32,
+    /// New data files to be added.
+    pub new_files: Vec<DataFileMeta>,
+}
 
-pub mod api;
-pub use api::rest_api::RESTApi;
-
-pub mod arrow;
-pub mod btree;
-pub mod catalog;
-mod deletion_vector;
-pub mod file_index;
-pub mod io;
-mod predicate_stats;
-pub mod spec;
-pub mod table;
-#[cfg(feature = "fulltext")]
-pub mod tantivy;
-
-pub use catalog::Catalog;
-pub use catalog::CatalogFactory;
-pub use catalog::FileSystemCatalog;
-
-pub use table::{
-    DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, 
ReadBuilder, RowRange,
-    SnapshotManager, Table, TableRead, TableScan, TagManager,
-};
+impl CommitMessage {
+    pub fn new(partition: Vec<u8>, bucket: i32, new_files: Vec<DataFileMeta>) 
-> Self {
+        Self {
+            partition,
+            bucket,
+            new_files,
+        }
+    }
+}
diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs
index 1b34324..c17ebbc 100644
--- a/crates/paimon/src/table/mod.rs
+++ b/crates/paimon/src/table/mod.rs
@@ -19,31 +19,41 @@
 
 pub(crate) mod bin_pack;
 mod bucket_filter;
+mod commit_message;
 #[cfg(feature = "fulltext")]
 mod full_text_search_builder;
 pub(crate) mod global_index_scanner;
 mod read_builder;
+pub(crate) mod rest_env;
 pub(crate) mod row_id_predicate;
 pub(crate) mod schema_manager;
+pub(crate) mod snapshot_commit;
 mod snapshot_manager;
 mod source;
 mod stats_filter;
+pub(crate) mod table_commit;
 mod table_scan;
 mod tag_manager;
+mod write_builder;
 
 use crate::Result;
 use arrow_array::RecordBatch;
+pub use commit_message::CommitMessage;
 #[cfg(feature = "fulltext")]
 pub use full_text_search_builder::FullTextSearchBuilder;
 use futures::stream::BoxStream;
 pub use read_builder::{ReadBuilder, TableRead};
+pub use rest_env::RESTEnv;
 pub use schema_manager::SchemaManager;
+pub use snapshot_commit::{RESTSnapshotCommit, RenamingSnapshotCommit, 
SnapshotCommit};
 pub use snapshot_manager::SnapshotManager;
 pub use source::{
     merge_row_ranges, DataSplit, DataSplitBuilder, DeletionFile, 
PartitionBucket, Plan, RowRange,
 };
+pub use table_commit::TableCommit;
 pub use table_scan::TableScan;
 pub use tag_manager::TagManager;
+pub use write_builder::WriteBuilder;
 
 use crate::catalog::Identifier;
 use crate::io::FileIO;
@@ -58,6 +68,7 @@ pub struct Table {
     location: String,
     schema: TableSchema,
     schema_manager: SchemaManager,
+    rest_env: Option<RESTEnv>,
 }
 
 impl Table {
@@ -67,6 +78,7 @@ impl Table {
         identifier: Identifier,
         location: String,
         schema: TableSchema,
+        rest_env: Option<RESTEnv>,
     ) -> Self {
         let schema_manager = SchemaManager::new(file_io.clone(), 
location.clone());
         Self {
@@ -75,6 +87,7 @@ impl Table {
             location,
             schema,
             schema_manager,
+            rest_env,
         }
     }
 
@@ -118,6 +131,13 @@ impl Table {
         FullTextSearchBuilder::new(self)
     }
 
+    /// Create a write builder for write/commit.
+    ///
+    /// Reference: [pypaimon 
FileStoreTable.new_write_builder](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/table/file_store_table.py).
+    pub fn new_write_builder(&self) -> WriteBuilder<'_> {
+        WriteBuilder::new(self)
+    }
+
     /// Create a copy of this table with extra options merged into the schema.
     pub fn copy_with_options(&self, extra: HashMap<String, String>) -> Self {
         Self {
@@ -126,6 +146,7 @@ impl Table {
             location: self.location.clone(),
             schema: self.schema.copy_with_options(extra),
             schema_manager: self.schema_manager.clone(),
+            rest_env: self.rest_env.clone(),
         }
     }
 }
diff --git a/crates/paimon/src/table/read_builder.rs 
b/crates/paimon/src/table/read_builder.rs
index 6712b0e..63d9917 100644
--- a/crates/paimon/src/table/read_builder.rs
+++ b/crates/paimon/src/table/read_builder.rs
@@ -410,6 +410,7 @@ mod tests {
             Identifier::new("default", "t"),
             table_path,
             table_schema,
+            None,
         );
 
         let split = DataSplitBuilder::new()
@@ -469,6 +470,7 @@ mod tests {
             Identifier::new("default", "t"),
             table_path,
             table_schema,
+            None,
         );
 
         let split = DataSplitBuilder::new()
@@ -526,6 +528,7 @@ mod tests {
             Identifier::new("default", "t"),
             table_path,
             table_schema,
+            None,
         );
 
         let split = DataSplitBuilder::new()
@@ -586,6 +589,7 @@ mod tests {
             Identifier::new("default", "t"),
             table_path,
             table_schema,
+            None,
         );
 
         let split = DataSplitBuilder::new()
@@ -637,6 +641,7 @@ mod tests {
             Identifier::new("default", "t"),
             "/tmp/test".to_string(),
             table_schema,
+            None,
         );
 
         let mut builder = table.new_read_builder();
@@ -692,6 +697,7 @@ mod tests {
             Identifier::new("default", "t"),
             "/tmp/test".to_string(),
             table_schema,
+            None,
         );
 
         let mut builder = table.new_read_builder();
diff --git a/crates/paimon/src/table/rest_env.rs 
b/crates/paimon/src/table/rest_env.rs
new file mode 100644
index 0000000..bd1b42a
--- /dev/null
+++ b/crates/paimon/src/table/rest_env.rs
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! REST environment for creating RESTSnapshotCommit instances.
+
+use crate::api::rest_api::RESTApi;
+use crate::catalog::Identifier;
+use crate::table::snapshot_commit::{RESTSnapshotCommit, SnapshotCommit};
+use std::sync::Arc;
+
+/// REST environment that holds the REST API client, identifier, and uuid
+/// needed to create a `RESTSnapshotCommit`.
+#[derive(Clone)]
+pub struct RESTEnv {
+    identifier: Identifier,
+    uuid: String,
+    api: Arc<RESTApi>,
+}
+
+impl std::fmt::Debug for RESTEnv {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("RESTEnv")
+            .field("identifier", &self.identifier)
+            .field("uuid", &self.uuid)
+            .finish()
+    }
+}
+
+impl RESTEnv {
+    /// Create a new RESTEnv.
+    pub fn new(identifier: Identifier, uuid: String, api: Arc<RESTApi>) -> 
Self {
+        Self {
+            identifier,
+            uuid,
+            api,
+        }
+    }
+
+    /// Create a `RESTSnapshotCommit` from this environment.
+    pub fn snapshot_commit(&self) -> Arc<dyn SnapshotCommit> {
+        Arc::new(RESTSnapshotCommit::new(
+            self.api.clone(),
+            self.identifier.clone(),
+            self.uuid.clone(),
+        ))
+    }
+}
diff --git a/crates/paimon/src/table/snapshot_commit.rs 
b/crates/paimon/src/table/snapshot_commit.rs
new file mode 100644
index 0000000..9d141f7
--- /dev/null
+++ b/crates/paimon/src/table/snapshot_commit.rs
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! SnapshotCommit abstraction for atomic snapshot commits.
+//!
+//! Reference: [pypaimon 
snapshot_commit.py](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/snapshot/snapshot_commit.py)
+
+use crate::api::rest_api::RESTApi;
+use crate::catalog::Identifier;
+use crate::spec::{PartitionStatistics, Snapshot};
+use crate::table::SnapshotManager;
+use crate::Result;
+use async_trait::async_trait;
+use std::sync::Arc;
+
+/// Interface to commit a snapshot atomically.
+///
+/// Two implementations:
+/// - `RenamingSnapshotCommit` — file system atomic rename
+/// - `RESTSnapshotCommit` — via Catalog API (e.g. REST)
+#[async_trait]
+pub trait SnapshotCommit: Send + Sync {
+    /// Commit the given snapshot. Returns true if successful, false if
+    /// another writer won the race.
+    async fn commit(&self, snapshot: &Snapshot, statistics: 
&[PartitionStatistics])
+        -> Result<bool>;
+}
+
+/// A SnapshotCommit using file renaming to commit.
+///
+/// Reference: [pypaimon 
RenamingSnapshotCommit](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/snapshot/renaming_snapshot_commit.py)
+pub struct RenamingSnapshotCommit {
+    snapshot_manager: SnapshotManager,
+}
+
+impl RenamingSnapshotCommit {
+    pub fn new(snapshot_manager: SnapshotManager) -> Self {
+        Self { snapshot_manager }
+    }
+}
+
+#[async_trait]
+impl SnapshotCommit for RenamingSnapshotCommit {
+    async fn commit(
+        &self,
+        snapshot: &Snapshot,
+        _statistics: &[PartitionStatistics],
+    ) -> Result<bool> {
+        // statistics are not used in file system mode (same as Python)
+        self.snapshot_manager.commit_snapshot(snapshot).await
+    }
+}
+
+/// A SnapshotCommit using REST API to commit (e.g. REST Catalog).
+///
+/// Reference: [pypaimon 
RESTSnapshotCommit](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/snapshot/catalog_snapshot_commit.py)
+pub struct RESTSnapshotCommit {
+    api: Arc<RESTApi>,
+    identifier: Identifier,
+    uuid: String,
+}
+
+impl RESTSnapshotCommit {
+    pub fn new(api: Arc<RESTApi>, identifier: Identifier, uuid: String) -> 
Self {
+        Self {
+            api,
+            identifier,
+            uuid,
+        }
+    }
+}
+
+#[async_trait]
+impl SnapshotCommit for RESTSnapshotCommit {
+    async fn commit(
+        &self,
+        snapshot: &Snapshot,
+        statistics: &[PartitionStatistics],
+    ) -> Result<bool> {
+        self.api
+            .commit_snapshot(&self.identifier, &self.uuid, snapshot, 
statistics)
+            .await
+    }
+}
diff --git a/crates/paimon/src/table/snapshot_manager.rs 
b/crates/paimon/src/table/snapshot_manager.rs
index 6d8eab2..abff248 100644
--- a/crates/paimon/src/table/snapshot_manager.rs
+++ b/crates/paimon/src/table/snapshot_manager.rs
@@ -45,6 +45,10 @@ impl SnapshotManager {
         }
     }
 
+    pub fn file_io(&self) -> &FileIO {
+        &self.file_io
+    }
+
     /// Path to the snapshot directory (e.g. `table_path/snapshot`).
     pub fn snapshot_dir(&self) -> String {
         format!("{}/{}", self.table_path, SNAPSHOT_DIR)
@@ -65,14 +69,22 @@ impl SnapshotManager {
         format!("{}/snapshot-{}", self.snapshot_dir(), snapshot_id)
     }
 
+    /// Path to the manifest directory.
+    pub fn manifest_dir(&self) -> String {
+        format!("{}/manifest", self.table_path)
+    }
+
+    /// Path to a manifest file.
+    pub fn manifest_path(&self, manifest_name: &str) -> String {
+        format!("{}/{}", self.manifest_dir(), manifest_name)
+    }
+
     /// Read a hint file and return the id, or None if the file does not exist,
     /// is being deleted, or contains invalid content.
     ///
     /// Reference: 
[HintFileUtils.readHint](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/HintFileUtils.java)
     async fn read_hint(&self, path: &str) -> Option<i64> {
         let input = self.file_io.new_input(path).ok()?;
-        // Try to read directly without exists() check to avoid TOCTOU race.
-        // The file may be deleted or overwritten concurrently.
         let content = input.read().await.ok()?;
         let id_str = str::from_utf8(&content).ok()?;
         id_str.trim().parse().ok()
@@ -138,7 +150,7 @@ impl SnapshotManager {
         self.find_by_list_files(i64::min).await
     }
 
-    /// Get a snapshot by id. Returns an error if the snapshot file does not 
exist.
+    /// Get a snapshot by id.
     pub async fn get_snapshot(&self, snapshot_id: i64) -> 
crate::Result<Snapshot> {
         let snapshot_path = self.snapshot_path(snapshot_id);
         let snap_input = self.file_io.new_input(&snapshot_path)?;
@@ -176,6 +188,68 @@ impl SnapshotManager {
         Ok(Some(snapshot))
     }
 
+    /// Atomically commit a snapshot.
+    ///
+    /// Writes the snapshot JSON to the target path. Returns `false` if the
+    /// target already exists (another writer won the race).
+    ///
+    /// On file systems that support atomic rename, we write to a temp file
+    /// first then rename. On backends where rename is not supported (e.g.
+    /// memory, object stores), we fall back to a direct write after an
+    /// existence check.
+    pub async fn commit_snapshot(&self, snapshot: &Snapshot) -> 
crate::Result<bool> {
+        let target_path = self.snapshot_path(snapshot.id());
+
+        let json = serde_json::to_string(snapshot).map_err(|e| 
crate::Error::DataInvalid {
+            message: format!("failed to serialize snapshot: {e}"),
+            source: Some(Box::new(e)),
+        })?;
+
+        // Try rename-based atomic commit first, fall back to check-and-write.
+        //
+        // TODO: opendal's rename uses POSIX semantics which silently 
overwrites the target.
+        //  The exists() check below narrows the race window but does not 
eliminate it.
+        //  Java Paimon uses `lock.runWithLock(() -> !fileIO.exists(newPath) 
&& callable.call())`
+        //  for full mutual exclusion. We need an external lock mechanism 
(like Java's Lock
+        //  interface) for backends without atomic rename-no-replace support.
+        let tmp_path = format!("{}.tmp-{}", target_path, uuid::Uuid::new_v4());
+        let output = self.file_io.new_output(&tmp_path)?;
+        output.write(bytes::Bytes::from(json.clone())).await?;
+
+        // Check before rename to avoid silent overwrite (opendal uses POSIX 
rename semantics)
+        if self.file_io.exists(&target_path).await? {
+            let _ = self.file_io.delete_file(&tmp_path).await;
+            return Ok(false);
+        }
+
+        match self.file_io.rename(&tmp_path, &target_path).await {
+            Ok(()) => {}
+            Err(_) => {
+                // Rename not supported (e.g. memory/object store).
+                // Clean up temp file, then check-and-write.
+                let _ = self.file_io.delete_file(&tmp_path).await;
+                if self.file_io.exists(&target_path).await? {
+                    return Ok(false);
+                }
+                let output = self.file_io.new_output(&target_path)?;
+                output.write(bytes::Bytes::from(json)).await?;
+            }
+        }
+
+        // Update LATEST hint (best-effort)
+        let _ = self.write_latest_hint(snapshot.id()).await;
+        Ok(true)
+    }
+
+    /// Update the LATEST hint file.
+    pub async fn write_latest_hint(&self, snapshot_id: i64) -> 
crate::Result<()> {
+        let hint_path = self.latest_hint_path();
+        let output = self.file_io.new_output(&hint_path)?;
+        output
+            .write(bytes::Bytes::from(snapshot_id.to_string()))
+            .await
+    }
+
     /// Returns the snapshot whose commit time is earlier than or equal to the 
given
     /// `timestamp_millis`. If no such snapshot exists, returns None.
     ///
@@ -196,7 +270,6 @@ impl SnapshotManager {
             None => return Ok(None),
         };
 
-        // If the earliest snapshot is already after the timestamp, no match.
         if (earliest_snapshot.time_millis() as i64) > timestamp_millis {
             return Ok(None);
         }
@@ -220,3 +293,77 @@ impl SnapshotManager {
         Ok(result)
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::io::FileIOBuilder;
+    use crate::spec::CommitKind;
+
+    fn test_file_io() -> FileIO {
+        FileIOBuilder::new("memory").build().unwrap()
+    }
+
+    async fn setup(table_path: &str) -> (FileIO, SnapshotManager) {
+        let file_io = test_file_io();
+        file_io
+            .mkdirs(&format!("{table_path}/snapshot/"))
+            .await
+            .unwrap();
+        let sm = SnapshotManager::new(file_io.clone(), table_path.to_string());
+        (file_io, sm)
+    }
+
+    fn test_snapshot(id: i64) -> Snapshot {
+        Snapshot::builder()
+            .version(3)
+            .id(id)
+            .schema_id(0)
+            .base_manifest_list("base-list".to_string())
+            .delta_manifest_list("delta-list".to_string())
+            .commit_user("test-user".to_string())
+            .commit_identifier(0)
+            .commit_kind(CommitKind::APPEND)
+            .time_millis(1000 * id as u64)
+            .build()
+    }
+
+    #[tokio::test]
+    async fn test_commit_snapshot_first() {
+        let (_, sm) = setup("memory:/test_commit_first").await;
+        let snap = test_snapshot(1);
+        let result = sm.commit_snapshot(&snap).await.unwrap();
+        assert!(result);
+
+        let loaded = sm.get_snapshot(1).await.unwrap();
+        assert_eq!(loaded.id(), 1);
+    }
+
+    #[tokio::test]
+    async fn test_commit_snapshot_already_exists() {
+        let (_, sm) = setup("memory:/test_commit_exists").await;
+        let snap = test_snapshot(1);
+        assert!(sm.commit_snapshot(&snap).await.unwrap());
+        // Second commit to same id should return false
+        let result = sm.commit_snapshot(&snap).await.unwrap();
+        assert!(!result);
+    }
+
+    #[tokio::test]
+    async fn test_commit_updates_latest_hint() {
+        let (_, sm) = setup("memory:/test_commit_hint").await;
+        let snap = test_snapshot(1);
+        sm.commit_snapshot(&snap).await.unwrap();
+
+        let latest_id = sm.get_latest_snapshot_id().await.unwrap();
+        assert_eq!(latest_id, Some(1));
+    }
+
+    #[tokio::test]
+    async fn test_write_latest_hint() {
+        let (_, sm) = setup("memory:/test_write_hint").await;
+        sm.write_latest_hint(42).await.unwrap();
+        let hint = sm.read_hint(&sm.latest_hint_path()).await;
+        assert_eq!(hint, Some(42));
+    }
+}
diff --git a/crates/paimon/src/table/table_commit.rs 
b/crates/paimon/src/table/table_commit.rs
new file mode 100644
index 0000000..bc7f5c1
--- /dev/null
+++ b/crates/paimon/src/table/table_commit.rs
@@ -0,0 +1,995 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Table commit logic for Paimon write operations.
+//!
+//! Reference: 
[org.apache.paimon.operation.FileStoreCommitImpl](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java)
+//! and [pypaimon table_commit.py / 
file_store_commit.py](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/)
+
+use crate::io::FileIO;
+use crate::spec::stats::BinaryTableStats;
+use crate::spec::FileKind;
+use crate::spec::{
+    datums_to_binary_row, extract_datum, BinaryRow, CommitKind, CoreOptions, 
Datum, Manifest,
+    ManifestEntry, ManifestFileMeta, ManifestList, PartitionStatistics, 
Predicate,
+    PredicateBuilder, Snapshot,
+};
+use crate::table::commit_message::CommitMessage;
+use crate::table::snapshot_commit::SnapshotCommit;
+use crate::table::{SnapshotManager, Table, TableScan};
+use crate::Result;
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::{SystemTime, UNIX_EPOCH};
+
+/// Batch commit identifier (i64::MAX), same as Python's 
BATCH_COMMIT_IDENTIFIER.
+const BATCH_COMMIT_IDENTIFIER: i64 = i64::MAX;
+
+/// Table commit logic for Paimon write operations.
+///
+/// Provides atomic commit functionality including append, overwrite and 
truncate
+pub struct TableCommit {
+    table: Table,
+    snapshot_manager: SnapshotManager,
+    snapshot_commit: Arc<dyn SnapshotCommit>,
+    commit_user: String,
+    total_buckets: i32,
+    overwrite_partition: Option<HashMap<String, Datum>>,
+    // commit config
+    commit_max_retries: u32,
+    commit_timeout_ms: u64,
+    commit_min_retry_wait_ms: u64,
+    commit_max_retry_wait_ms: u64,
+    row_tracking_enabled: bool,
+}
+
+impl TableCommit {
+    pub fn new(
+        table: Table,
+        commit_user: String,
+        overwrite_partition: Option<HashMap<String, Datum>>,
+    ) -> Self {
+        let snapshot_manager = SnapshotManager::new(table.file_io.clone(), 
table.location.clone());
+        let snapshot_commit = if let Some(env) = &table.rest_env {
+            env.snapshot_commit()
+        } else {
+            
Arc::new(crate::table::snapshot_commit::RenamingSnapshotCommit::new(
+                snapshot_manager.clone(),
+            ))
+        };
+        let core_options = CoreOptions::new(table.schema().options());
+        let total_buckets = core_options.bucket();
+        let commit_max_retries = core_options.commit_max_retries();
+        let commit_timeout_ms = core_options.commit_timeout_ms();
+        let commit_min_retry_wait_ms = core_options.commit_min_retry_wait_ms();
+        let commit_max_retry_wait_ms = core_options.commit_max_retry_wait_ms();
+        let row_tracking_enabled = core_options.row_tracking_enabled();
+        Self {
+            table,
+            snapshot_manager,
+            snapshot_commit,
+            commit_user,
+            total_buckets,
+            overwrite_partition,
+            commit_max_retries,
+            commit_timeout_ms,
+            commit_min_retry_wait_ms,
+            commit_max_retry_wait_ms,
+            row_tracking_enabled,
+        }
+    }
+
+    /// Commit new files. Uses OVERWRITE mode if overwrite_partition was set
+    /// in the constructor, otherwise uses APPEND mode.
+    pub async fn commit(&self, commit_messages: Vec<CommitMessage>) -> 
Result<()> {
+        if commit_messages.is_empty() {
+            return Ok(());
+        }
+
+        let commit_entries = self.messages_to_entries(&commit_messages);
+
+        if let Some(overwrite_partition) = &self.overwrite_partition {
+            let partition_predicate = if overwrite_partition.is_empty() {
+                None
+            } else {
+                Some(self.build_partition_predicate(overwrite_partition)?)
+            };
+            self.try_commit(
+                CommitKind::OVERWRITE,
+                CommitEntriesPlan::Overwrite {
+                    partition_predicate,
+                    new_entries: commit_entries,
+                },
+            )
+            .await
+        } else {
+            self.try_commit(
+                CommitKind::APPEND,
+                CommitEntriesPlan::Static(commit_entries),
+            )
+            .await
+        }
+    }
+
+    /// Build a partition predicate from key-value pairs.
+    fn build_partition_predicate(&self, partition: &HashMap<String, Datum>) -> 
Result<Predicate> {
+        let pb = 
PredicateBuilder::new(&self.table.schema().partition_fields());
+        let predicates: Vec<Predicate> = partition
+            .iter()
+            .map(|(key, value)| pb.equal(key, value.clone()))
+            .collect::<Result<Vec<_>>>()?;
+        Ok(Predicate::and(predicates))
+    }
+
+    /// Drop specific partitions (OVERWRITE with only deletes).
+    pub async fn truncate_partitions(&self, partitions: Vec<HashMap<String, 
Datum>>) -> Result<()> {
+        if partitions.is_empty() {
+            return Ok(());
+        }
+
+        let predicates: Vec<Predicate> = partitions
+            .iter()
+            .map(|p| self.build_partition_predicate(p))
+            .collect::<Result<Vec<_>>>()?;
+
+        self.try_commit(
+            CommitKind::OVERWRITE,
+            CommitEntriesPlan::Overwrite {
+                partition_predicate: Some(Predicate::or(predicates)),
+                new_entries: vec![],
+            },
+        )
+        .await
+    }
+
+    /// Truncate the entire table (OVERWRITE with no filter, only deletes).
+    pub async fn truncate_table(&self) -> Result<()> {
+        self.try_commit(
+            CommitKind::OVERWRITE,
+            CommitEntriesPlan::Overwrite {
+                partition_predicate: None,
+                new_entries: vec![],
+            },
+        )
+        .await
+    }
+
+    /// Try to commit with retries.
+    async fn try_commit(&self, commit_kind: CommitKind, plan: 
CommitEntriesPlan) -> Result<()> {
+        let mut retry_count = 0u32;
+        let mut last_snapshot_for_dup_check: Option<Snapshot> = None;
+        let start_time_ms = current_time_millis();
+
+        loop {
+            let latest_snapshot = 
self.snapshot_manager.get_latest_snapshot().await?;
+            let commit_entries = self.resolve_commit_entries(&plan, 
&latest_snapshot).await?;
+
+            if commit_entries.is_empty() {
+                break;
+            }
+
+            // Check for duplicate commit (idempotency on retry)
+            if self
+                .is_duplicate_commit(&last_snapshot_for_dup_check, 
&latest_snapshot, &commit_kind)
+                .await
+            {
+                break;
+            }
+
+            let result = self
+                .try_commit_once(&commit_kind, commit_entries, 
&latest_snapshot)
+                .await?;
+
+            match result {
+                true => break,
+                false => {
+                    last_snapshot_for_dup_check = latest_snapshot;
+                }
+            }
+
+            let elapsed_ms = current_time_millis() - start_time_ms;
+            if elapsed_ms > self.commit_timeout_ms || retry_count >= 
self.commit_max_retries {
+                let snap_id = last_snapshot_for_dup_check
+                    .as_ref()
+                    .map(|s| s.id() + 1)
+                    .unwrap_or(1);
+                return Err(crate::Error::DataInvalid {
+                    message: format!(
+                        "Commit failed for snapshot {} after {} millis with {} 
retries, \
+                         there may exist commit conflicts between multiple 
jobs.",
+                        snap_id, elapsed_ms, retry_count
+                    ),
+                    source: None,
+                });
+            }
+
+            self.commit_retry_wait(retry_count).await;
+            retry_count += 1;
+        }
+
+        Ok(())
+    }
+
+    /// Single commit attempt.
+    async fn try_commit_once(
+        &self,
+        commit_kind: &CommitKind,
+        mut commit_entries: Vec<ManifestEntry>,
+        latest_snapshot: &Option<Snapshot>,
+    ) -> Result<bool> {
+        let new_snapshot_id = latest_snapshot.as_ref().map(|s| s.id() + 
1).unwrap_or(1);
+
+        // Row tracking
+        let mut next_row_id: Option<i64> = None;
+        if self.row_tracking_enabled {
+            commit_entries = self.assign_snapshot_id(new_snapshot_id, 
commit_entries);
+            let first_row_id_start = latest_snapshot
+                .as_ref()
+                .and_then(|s| s.next_row_id())
+                .unwrap_or(0);
+            let (assigned, nrid) =
+                self.assign_row_tracking_meta(first_row_id_start, 
commit_entries);
+            commit_entries = assigned;
+            next_row_id = Some(nrid);
+        }
+
+        let file_io = self.snapshot_manager.file_io();
+        let manifest_dir = self.snapshot_manager.manifest_dir();
+
+        let unique_id = uuid::Uuid::new_v4();
+        let base_manifest_list_name = format!("manifest-list-{unique_id}-0");
+        let delta_manifest_list_name = format!("manifest-list-{unique_id}-1");
+        let new_manifest_name = format!("manifest-{}-0", uuid::Uuid::new_v4());
+
+        let base_manifest_list_path = 
format!("{manifest_dir}/{base_manifest_list_name}");
+        let delta_manifest_list_path = 
format!("{manifest_dir}/{delta_manifest_list_name}");
+        let new_manifest_path = format!("{manifest_dir}/{new_manifest_name}");
+
+        // Write manifest file
+        let new_manifest_file_meta = self
+            .write_manifest_file(
+                file_io,
+                &new_manifest_path,
+                &new_manifest_name,
+                &commit_entries,
+            )
+            .await?;
+
+        // Write delta manifest list
+        ManifestList::write(
+            file_io,
+            &delta_manifest_list_path,
+            &[new_manifest_file_meta],
+        )
+        .await?;
+
+        // Read existing manifests (base + delta from previous snapshot) and 
write base manifest list
+        let mut total_record_count: i64 = 0;
+        let existing_manifest_files = if let Some(snap) = latest_snapshot {
+            let base_path = format!("{manifest_dir}/{}", 
snap.base_manifest_list());
+            let delta_path = format!("{manifest_dir}/{}", 
snap.delta_manifest_list());
+            let base_files = ManifestList::read(file_io, &base_path).await?;
+            let delta_files = ManifestList::read(file_io, &delta_path).await?;
+            if let Some(prev) = snap.total_record_count() {
+                total_record_count += prev;
+            }
+            let mut all = base_files;
+            all.extend(delta_files);
+            all
+        } else {
+            vec![]
+        };
+
+        ManifestList::write(file_io, &base_manifest_list_path, 
&existing_manifest_files).await?;
+
+        // Calculate delta record count
+        let mut delta_record_count: i64 = 0;
+        for entry in &commit_entries {
+            match entry.kind() {
+                FileKind::Add => delta_record_count += entry.file().row_count,
+                FileKind::Delete => delta_record_count -= 
entry.file().row_count,
+            }
+        }
+        total_record_count += delta_record_count;
+
+        let snapshot = Snapshot::builder()
+            .version(3)
+            .id(new_snapshot_id)
+            .schema_id(self.table.schema().id())
+            .base_manifest_list(base_manifest_list_name)
+            .delta_manifest_list(delta_manifest_list_name)
+            .commit_user(self.commit_user.clone())
+            .commit_identifier(BATCH_COMMIT_IDENTIFIER)
+            .commit_kind(commit_kind.clone())
+            .time_millis(current_time_millis())
+            .total_record_count(Some(total_record_count))
+            .delta_record_count(Some(delta_record_count))
+            .next_row_id(next_row_id)
+            .build();
+
+        let statistics = self.generate_partition_statistics(&commit_entries)?;
+
+        self.snapshot_commit.commit(&snapshot, &statistics).await
+    }
+
+    /// Write a manifest file and return its metadata.
+    async fn write_manifest_file(
+        &self,
+        file_io: &FileIO,
+        path: &str,
+        file_name: &str,
+        entries: &[ManifestEntry],
+    ) -> Result<ManifestFileMeta> {
+        Manifest::write(file_io, path, entries).await?;
+
+        let mut added_file_count: i64 = 0;
+        let mut deleted_file_count: i64 = 0;
+        for entry in entries {
+            match entry.kind() {
+                FileKind::Add => added_file_count += 1,
+                FileKind::Delete => deleted_file_count += 1,
+            }
+        }
+
+        // Get file size
+        let status = file_io.get_status(path).await?;
+
+        let partition_stats = self.compute_partition_stats(entries)?;
+
+        Ok(ManifestFileMeta::new(
+            file_name.to_string(),
+            status.size as i64,
+            added_file_count,
+            deleted_file_count,
+            partition_stats,
+            self.table.schema().id(),
+        ))
+    }
+
+    /// Check if this commit was already completed (idempotency).
+    async fn is_duplicate_commit(
+        &self,
+        last_snapshot_for_dup_check: &Option<Snapshot>,
+        latest_snapshot: &Option<Snapshot>,
+        commit_kind: &CommitKind,
+    ) -> bool {
+        if let (Some(prev_snap), Some(latest)) = (last_snapshot_for_dup_check, 
latest_snapshot) {
+            let start_id = prev_snap.id() + 1;
+            for snapshot_id in start_id..=latest.id() {
+                if let Ok(snap) = 
self.snapshot_manager.get_snapshot(snapshot_id).await {
+                    if snap.commit_user() == self.commit_user && 
snap.commit_kind() == commit_kind {
+                        return true;
+                    }
+                }
+            }
+        }
+        false
+    }
+
+    /// Resolve commit entries based on the plan type.
+    async fn resolve_commit_entries(
+        &self,
+        plan: &CommitEntriesPlan,
+        latest_snapshot: &Option<Snapshot>,
+    ) -> Result<Vec<ManifestEntry>> {
+        match plan {
+            CommitEntriesPlan::Static(entries) => Ok(entries.clone()),
+            CommitEntriesPlan::Overwrite {
+                partition_predicate,
+                new_entries,
+            } => {
+                self.generate_overwrite_entries(
+                    latest_snapshot,
+                    partition_predicate.as_ref(),
+                    new_entries,
+                )
+                .await
+            }
+        }
+    }
+
+    /// Generate overwrite entries: DELETE existing + ADD new.
+    async fn generate_overwrite_entries(
+        &self,
+        latest_snapshot: &Option<Snapshot>,
+        partition_predicate: Option<&Predicate>,
+        new_entries: &[ManifestEntry],
+    ) -> Result<Vec<ManifestEntry>> {
+        let mut entries = Vec::new();
+
+        if let Some(snap) = latest_snapshot {
+            let scan = TableScan::new(
+                &self.table,
+                partition_predicate.cloned(),
+                vec![],
+                None,
+                None,
+                None,
+            );
+            let current_entries = scan.plan_manifest_entries(snap).await?;
+            for entry in current_entries {
+                entries.push(entry.with_kind(FileKind::Delete));
+            }
+        }
+
+        entries.extend(new_entries.iter().cloned());
+        Ok(entries)
+    }
+
+    /// Assign snapshot ID as sequence number to entries.
+    fn assign_snapshot_id(
+        &self,
+        snapshot_id: i64,
+        entries: Vec<ManifestEntry>,
+    ) -> Vec<ManifestEntry> {
+        entries
+            .into_iter()
+            .map(|e| e.with_sequence_number(snapshot_id, snapshot_id))
+            .collect()
+    }
+
+    /// Assign row tracking metadata to new files.
+    fn assign_row_tracking_meta(
+        &self,
+        first_row_id_start: i64,
+        entries: Vec<ManifestEntry>,
+    ) -> (Vec<ManifestEntry>, i64) {
+        let mut result = Vec::with_capacity(entries.len());
+        let mut start = first_row_id_start;
+
+        for entry in entries {
+            if *entry.kind() == FileKind::Add
+                && entry.file().file_source == Some(0) // APPEND
+                && entry.file().first_row_id.is_none()
+            {
+                let row_count = entry.file().row_count;
+                result.push(entry.with_first_row_id(start));
+                start += row_count;
+            } else {
+                result.push(entry);
+            }
+        }
+
+        (result, start)
+    }
+
+    /// Exponential backoff with jitter.
+    async fn commit_retry_wait(&self, retry_count: u32) {
+        let base_wait = self
+            .commit_min_retry_wait_ms
+            .saturating_mul(2u64.saturating_pow(retry_count));
+        let wait = base_wait.min(self.commit_max_retry_wait_ms);
+        // Simple jitter: add up to 20% of wait time
+        let jitter = (wait as f64 * 0.2 * rand_f64()) as u64;
+        let total_wait = wait + jitter;
+        tokio::time::sleep(std::time::Duration::from_millis(total_wait)).await;
+    }
+
+    /// Compute partition stats (min/max/null_counts) across all entries.
+    fn compute_partition_stats(&self, entries: &[ManifestEntry]) -> 
Result<BinaryTableStats> {
+        let partition_fields = self.table.schema().partition_fields();
+        let num_fields = partition_fields.len();
+
+        if num_fields == 0 || entries.is_empty() {
+            return Ok(BinaryTableStats::new(vec![], vec![], vec![]));
+        }
+
+        let data_types: Vec<_> = partition_fields
+            .iter()
+            .map(|f| f.data_type().clone())
+            .collect();
+        let mut mins: Vec<Option<Datum>> = vec![None; num_fields];
+        let mut maxs: Vec<Option<Datum>> = vec![None; num_fields];
+        let mut null_counts: Vec<i64> = vec![0; num_fields];
+
+        for entry in entries {
+            let partition_bytes = entry.partition();
+            if partition_bytes.is_empty() {
+                continue;
+            }
+            let row = BinaryRow::from_serialized_bytes(partition_bytes)?;
+            for i in 0..num_fields {
+                match extract_datum(&row, i, &data_types[i])? {
+                    Some(datum) => {
+                        mins[i] = Some(match mins[i].take() {
+                            Some(cur) if cur <= datum => cur,
+                            Some(_) => datum.clone(),
+                            None => datum.clone(),
+                        });
+                        maxs[i] = Some(match maxs[i].take() {
+                            Some(cur) if cur >= datum => cur,
+                            Some(_) => datum,
+                            None => datum,
+                        });
+                    }
+                    None => {
+                        null_counts[i] += 1;
+                    }
+                }
+            }
+        }
+
+        let min_datums: Vec<_> = mins.iter().zip(data_types.iter()).collect();
+        let max_datums: Vec<_> = maxs.iter().zip(data_types.iter()).collect();
+
+        let min_bytes = datums_to_binary_row(&min_datums);
+        let max_bytes = datums_to_binary_row(&max_datums);
+        let null_counts = null_counts.into_iter().map(Some).collect();
+
+        Ok(BinaryTableStats::new(min_bytes, max_bytes, null_counts))
+    }
+
+    /// Generate per-partition statistics from commit entries.
+    ///
+    /// Reference: [pypaimon 
FileStoreCommit._generate_partition_statistics](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/file_store_commit.py)
+    fn generate_partition_statistics(
+        &self,
+        entries: &[ManifestEntry],
+    ) -> Result<Vec<PartitionStatistics>> {
+        let partition_fields = self.table.schema().partition_fields();
+        let data_types: Vec<_> = partition_fields
+            .iter()
+            .map(|f| f.data_type().clone())
+            .collect();
+        let partition_keys: Vec<_> = self
+            .table
+            .schema()
+            .partition_keys()
+            .iter()
+            .map(|s| s.to_string())
+            .collect();
+
+        let mut stats_map: HashMap<Vec<u8>, PartitionStatistics> = 
HashMap::new();
+
+        for entry in entries {
+            let partition_bytes = entry.partition().to_vec();
+            let is_add = *entry.kind() == FileKind::Add;
+            let sign: i64 = if is_add { 1 } else { -1 };
+
+            let file = entry.file();
+            let file_creation_time = file
+                .creation_time
+                .map(|t| t.timestamp_millis() as u64)
+                .unwrap_or_else(current_time_millis);
+
+            let stats = 
stats_map.entry(partition_bytes.clone()).or_insert_with(|| {
+                // Parse partition spec from BinaryRow
+                let spec = self
+                    .parse_partition_spec(&partition_bytes, &partition_keys, 
&data_types)
+                    .unwrap_or_default();
+                PartitionStatistics {
+                    spec,
+                    record_count: 0,
+                    file_size_in_bytes: 0,
+                    file_count: 0,
+                    last_file_creation_time: 0,
+                    total_buckets: entry.total_buckets(),
+                }
+            });
+
+            stats.record_count += sign * file.row_count;
+            stats.file_size_in_bytes += sign * file.file_size;
+            stats.file_count += sign;
+            stats.last_file_creation_time = 
stats.last_file_creation_time.max(file_creation_time);
+        }
+
+        Ok(stats_map.into_values().collect())
+    }
+
+    /// Parse partition BinaryRow bytes into a HashMap<String, String>.
+    fn parse_partition_spec(
+        &self,
+        partition_bytes: &[u8],
+        partition_keys: &[String],
+        data_types: &[crate::spec::DataType],
+    ) -> Result<HashMap<String, String>> {
+        let mut spec = HashMap::new();
+        if partition_bytes.is_empty() || partition_keys.is_empty() {
+            return Ok(spec);
+        }
+        let row = BinaryRow::from_serialized_bytes(partition_bytes)?;
+        for (i, key) in partition_keys.iter().enumerate() {
+            if let Some(datum) = extract_datum(&row, i, &data_types[i])? {
+                spec.insert(key.clone(), datum.to_string());
+            }
+        }
+        Ok(spec)
+    }
+
+    /// Convert commit messages to manifest entries (ADD kind).
+    fn messages_to_entries(&self, messages: &[CommitMessage]) -> 
Vec<ManifestEntry> {
+        messages
+            .iter()
+            .flat_map(|msg| {
+                msg.new_files.iter().map(move |file| {
+                    ManifestEntry::new(
+                        FileKind::Add,
+                        msg.partition.clone(),
+                        msg.bucket,
+                        self.total_buckets,
+                        file.clone(),
+                        2,
+                    )
+                })
+            })
+            .collect()
+    }
+}
+
+/// Plan for resolving commit entries.
+enum CommitEntriesPlan {
+    /// Static entries (for APPEND).
+    Static(Vec<ManifestEntry>),
+    /// Overwrite with optional partition predicate.
+    Overwrite {
+        partition_predicate: Option<Predicate>,
+        new_entries: Vec<ManifestEntry>,
+    },
+}
+
+fn current_time_millis() -> u64 {
+    SystemTime::now()
+        .duration_since(UNIX_EPOCH)
+        .unwrap_or_default()
+        .as_millis() as u64
+}
+
+/// Random f64 in [0, 1) using RandomState for per-process entropy.
+fn rand_f64() -> f64 {
+    use std::collections::hash_map::RandomState;
+    use std::hash::{BuildHasher, Hasher};
+    let mut hasher = RandomState::new().build_hasher();
+    hasher.write_u64(
+        SystemTime::now()
+            .duration_since(UNIX_EPOCH)
+            .unwrap_or_default()
+            .as_nanos() as u64,
+    );
+    (hasher.finish() as f64) / (u64::MAX as f64)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::catalog::Identifier;
+    use crate::io::FileIOBuilder;
+    use crate::spec::stats::BinaryTableStats;
+    use crate::spec::{BinaryRowBuilder, DataFileMeta, ManifestList, 
TableSchema};
+    use chrono::{DateTime, Utc};
+
+    fn test_file_io() -> FileIO {
+        FileIOBuilder::new("memory").build().unwrap()
+    }
+
+    fn test_schema() -> TableSchema {
+        use crate::spec::{DataType, IntType, Schema, VarCharType};
+        let schema = Schema::builder()
+            .column("id", DataType::Int(IntType::new()))
+            .column("name", DataType::VarChar(VarCharType::string_type()))
+            .build()
+            .unwrap();
+        TableSchema::new(0, &schema)
+    }
+
+    fn test_partitioned_schema() -> TableSchema {
+        use crate::spec::{DataType, IntType, Schema, VarCharType};
+        let schema = Schema::builder()
+            .column("pt", DataType::VarChar(VarCharType::string_type()))
+            .column("id", DataType::Int(IntType::new()))
+            .partition_keys(["pt"])
+            .build()
+            .unwrap();
+        TableSchema::new(0, &schema)
+    }
+
+    fn test_table(file_io: &FileIO, table_path: &str) -> Table {
+        Table::new(
+            file_io.clone(),
+            Identifier::new("default", "test_table"),
+            table_path.to_string(),
+            test_schema(),
+            None,
+        )
+    }
+
+    fn test_partitioned_table(file_io: &FileIO, table_path: &str) -> Table {
+        Table::new(
+            file_io.clone(),
+            Identifier::new("default", "test_table"),
+            table_path.to_string(),
+            test_partitioned_schema(),
+            None,
+        )
+    }
+
+    fn test_data_file(name: &str, row_count: i64) -> DataFileMeta {
+        DataFileMeta {
+            file_name: name.to_string(),
+            file_size: 1024,
+            row_count,
+            min_key: vec![],
+            max_key: vec![],
+            key_stats: BinaryTableStats::new(vec![], vec![], vec![]),
+            value_stats: BinaryTableStats::new(vec![], vec![], vec![]),
+            min_sequence_number: 0,
+            max_sequence_number: 0,
+            schema_id: 0,
+            level: 0,
+            extra_files: vec![],
+            creation_time: Some(
+                "2024-09-06T07:45:55.039+00:00"
+                    .parse::<DateTime<Utc>>()
+                    .unwrap(),
+            ),
+            delete_row_count: Some(0),
+            embedded_index: None,
+            first_row_id: None,
+            write_cols: None,
+            external_path: None,
+            file_source: None,
+            value_stats_cols: None,
+        }
+    }
+
+    fn setup_commit(file_io: &FileIO, table_path: &str) -> TableCommit {
+        let table = test_table(file_io, table_path);
+        TableCommit::new(table, "test-user".to_string(), None)
+    }
+
+    fn setup_partitioned_commit(file_io: &FileIO, table_path: &str) -> 
TableCommit {
+        let table = test_partitioned_table(file_io, table_path);
+        TableCommit::new(table, "test-user".to_string(), None)
+    }
+
+    fn partition_bytes(pt: &str) -> Vec<u8> {
+        use crate::spec::{DataType, VarCharType};
+        let datum = Datum::String(pt.to_string());
+        let dt = DataType::VarChar(VarCharType::string_type());
+        let datums = vec![(&datum, &dt)];
+        BinaryRow::from_datums(&datums).unwrap();
+        let mut builder = BinaryRowBuilder::new(1);
+        if pt.len() <= 7 {
+            builder.write_string_inline(0, pt);
+        } else {
+            builder.write_string(0, pt);
+        }
+        builder.build_serialized()
+    }
+
+    async fn setup_dirs(file_io: &FileIO, table_path: &str) {
+        file_io
+            .mkdirs(&format!("{table_path}/snapshot/"))
+            .await
+            .unwrap();
+        file_io
+            .mkdirs(&format!("{table_path}/manifest/"))
+            .await
+            .unwrap();
+    }
+
+    #[tokio::test]
+    async fn test_append_commit() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_append_commit";
+        setup_dirs(&file_io, table_path).await;
+
+        let commit = setup_commit(&file_io, table_path);
+
+        let messages = vec![CommitMessage::new(
+            vec![],
+            0,
+            vec![test_data_file("data-0.parquet", 100)],
+        )];
+
+        commit.commit(messages).await.unwrap();
+
+        // Verify snapshot was created
+        let snap_manager = SnapshotManager::new(file_io.clone(), 
table_path.to_string());
+        let snapshot = 
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+        assert_eq!(snapshot.id(), 1);
+        assert_eq!(snapshot.commit_identifier(), BATCH_COMMIT_IDENTIFIER);
+        assert_eq!(snapshot.total_record_count(), Some(100));
+        assert_eq!(snapshot.delta_record_count(), Some(100));
+
+        // Verify manifest list was written
+        let manifest_dir = format!("{table_path}/manifest");
+        let delta_path = format!("{manifest_dir}/{}", 
snapshot.delta_manifest_list());
+        let delta_metas = ManifestList::read(&file_io, 
&delta_path).await.unwrap();
+        assert_eq!(delta_metas.len(), 1);
+        assert_eq!(delta_metas[0].num_added_files(), 1);
+
+        // Verify manifest entries
+        let manifest_path = format!("{manifest_dir}/{}", 
delta_metas[0].file_name());
+        let entries = Manifest::read(&file_io, &manifest_path).await.unwrap();
+        assert_eq!(entries.len(), 1);
+        assert_eq!(*entries[0].kind(), FileKind::Add);
+        assert_eq!(entries[0].file().file_name, "data-0.parquet");
+    }
+
+    #[tokio::test]
+    async fn test_multiple_appends() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_multiple_appends";
+        setup_dirs(&file_io, table_path).await;
+
+        let commit = setup_commit(&file_io, table_path);
+
+        // First commit
+        commit
+            .commit(vec![CommitMessage::new(
+                vec![],
+                0,
+                vec![test_data_file("data-0.parquet", 100)],
+            )])
+            .await
+            .unwrap();
+
+        // Second commit
+        commit
+            .commit(vec![CommitMessage::new(
+                vec![],
+                0,
+                vec![test_data_file("data-1.parquet", 200)],
+            )])
+            .await
+            .unwrap();
+
+        let snap_manager = SnapshotManager::new(file_io.clone(), 
table_path.to_string());
+        let snapshot = 
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+        assert_eq!(snapshot.id(), 2);
+        assert_eq!(snapshot.total_record_count(), Some(300));
+        assert_eq!(snapshot.delta_record_count(), Some(200));
+    }
+
+    #[tokio::test]
+    async fn test_empty_commit_is_noop() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_empty_commit";
+        setup_dirs(&file_io, table_path).await;
+
+        let commit = setup_commit(&file_io, table_path);
+        commit.commit(vec![]).await.unwrap();
+
+        let snap_manager = SnapshotManager::new(file_io.clone(), 
table_path.to_string());
+        let snapshot = snap_manager.get_latest_snapshot().await.unwrap();
+        assert!(snapshot.is_none());
+    }
+
+    #[tokio::test]
+    async fn test_truncate_table() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_truncate";
+        setup_dirs(&file_io, table_path).await;
+
+        let commit = setup_commit(&file_io, table_path);
+
+        // Append some data first
+        commit
+            .commit(vec![CommitMessage::new(
+                vec![],
+                0,
+                vec![test_data_file("data-0.parquet", 100)],
+            )])
+            .await
+            .unwrap();
+
+        // Truncate
+        commit.truncate_table().await.unwrap();
+
+        let snap_manager = SnapshotManager::new(file_io.clone(), 
table_path.to_string());
+        let snapshot = 
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+        assert_eq!(snapshot.id(), 2);
+        assert_eq!(snapshot.commit_kind(), &CommitKind::OVERWRITE);
+        assert_eq!(snapshot.total_record_count(), Some(0));
+        assert_eq!(snapshot.delta_record_count(), Some(-100));
+    }
+
+    #[tokio::test]
+    async fn test_overwrite_partition() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_overwrite_partition";
+        setup_dirs(&file_io, table_path).await;
+
+        let commit = setup_partitioned_commit(&file_io, table_path);
+
+        // Append data for partition "a" and "b"
+        commit
+            .commit(vec![
+                CommitMessage::new(
+                    partition_bytes("a"),
+                    0,
+                    vec![test_data_file("data-a.parquet", 100)],
+                ),
+                CommitMessage::new(
+                    partition_bytes("b"),
+                    0,
+                    vec![test_data_file("data-b.parquet", 200)],
+                ),
+            ])
+            .await
+            .unwrap();
+
+        // Overwrite partition "a" with new data
+        let mut overwrite_partition = HashMap::new();
+        overwrite_partition.insert("pt".to_string(), 
Datum::String("a".to_string()));
+
+        let table = test_partitioned_table(&file_io, table_path);
+        let overwrite_commit =
+            TableCommit::new(table, "test-user".to_string(), 
Some(overwrite_partition));
+
+        overwrite_commit
+            .commit(vec![CommitMessage::new(
+                partition_bytes("a"),
+                0,
+                vec![test_data_file("data-a2.parquet", 50)],
+            )])
+            .await
+            .unwrap();
+
+        let snap_manager = SnapshotManager::new(file_io.clone(), 
table_path.to_string());
+        let snapshot = 
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+        assert_eq!(snapshot.id(), 2);
+        assert_eq!(snapshot.commit_kind(), &CommitKind::OVERWRITE);
+        // 300 - 100 (delete a) + 50 (add a2) = 250
+        assert_eq!(snapshot.total_record_count(), Some(250));
+    }
+
+    #[tokio::test]
+    async fn test_drop_partitions() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_drop_partitions";
+        setup_dirs(&file_io, table_path).await;
+
+        let commit = setup_partitioned_commit(&file_io, table_path);
+
+        // Append data for partitions "a", "b", "c"
+        commit
+            .commit(vec![
+                CommitMessage::new(
+                    partition_bytes("a"),
+                    0,
+                    vec![test_data_file("data-a.parquet", 100)],
+                ),
+                CommitMessage::new(
+                    partition_bytes("b"),
+                    0,
+                    vec![test_data_file("data-b.parquet", 200)],
+                ),
+                CommitMessage::new(
+                    partition_bytes("c"),
+                    0,
+                    vec![test_data_file("data-c.parquet", 300)],
+                ),
+            ])
+            .await
+            .unwrap();
+
+        // Drop partitions "a" and "c"
+        let partitions = vec![
+            HashMap::from([("pt".to_string(), 
Datum::String("a".to_string()))]),
+            HashMap::from([("pt".to_string(), 
Datum::String("c".to_string()))]),
+        ];
+        commit.truncate_partitions(partitions).await.unwrap();
+
+        let snap_manager = SnapshotManager::new(file_io.clone(), 
table_path.to_string());
+        let snapshot = 
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+        assert_eq!(snapshot.id(), 2);
+        assert_eq!(snapshot.commit_kind(), &CommitKind::OVERWRITE);
+        // 600 - 100 (a) - 300 (c) = 200
+        assert_eq!(snapshot.total_record_count(), Some(200));
+    }
+}
diff --git a/crates/paimon/src/table/table_scan.rs 
b/crates/paimon/src/table/table_scan.rs
index e21f00b..06fe87a 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -440,33 +440,23 @@ impl<'a> TableScan<'a> {
         limited_splits
     }
 
-    async fn plan_snapshot(&self, snapshot: Snapshot) -> crate::Result<Plan> {
+    /// Read all manifest entries from a snapshot, applying filters and 
merging.
+    ///
+    /// This is the shared entry point used by both `plan_snapshot` (scan) and
+    /// `TableCommit` (overwrite). Filters include partition predicate, data
+    /// predicates, and bucket predicate.
+    pub(crate) async fn plan_manifest_entries(
+        &self,
+        snapshot: &Snapshot,
+    ) -> crate::Result<Vec<ManifestEntry>> {
         let file_io = self.table.file_io();
         let table_path = self.table.location();
         let core_options = CoreOptions::new(self.table.schema().options());
         let deletion_vectors_enabled = core_options.deletion_vectors_enabled();
         let data_evolution_enabled = core_options.data_evolution_enabled();
-        let target_split_size = core_options.source_split_target_size();
-        let open_file_cost = core_options.source_split_open_file_cost();
 
-        // Resolve partition fields for manifest-file-level stats pruning.
-        let partition_keys = self.table.schema().partition_keys();
-        let partition_fields: Vec<DataField> = self
-            .table
-            .schema()
-            .partition_keys()
-            .iter()
-            .filter_map(|key| {
-                self.table
-                    .schema()
-                    .fields()
-                    .iter()
-                    .find(|f| f.name() == key)
-                    .cloned()
-            })
-            .collect();
+        let partition_fields = self.table.schema().partition_fields();
 
-        // Data-evolution tables must not prune data files independently.
         let pushdown_data_predicates = if data_evolution_enabled {
             &[][..]
         } else {
@@ -475,8 +465,6 @@ impl<'a> TableScan<'a> {
 
         let has_primary_keys = !self.table.schema().primary_keys().is_empty();
 
-        // Compute bucket predicate and key fields for per-entry bucket 
pruning.
-        // Only supported for the default bucket function (MurmurHash3-based).
         let bucket_key_fields: Vec<DataField> =
             if self.bucket_predicate.is_none() || 
!core_options.is_default_bucket_function() {
                 Vec::new()
@@ -509,7 +497,7 @@ impl<'a> TableScan<'a> {
         let entries = read_all_manifest_entries(
             file_io,
             table_path,
-            &snapshot,
+            snapshot,
             deletion_vectors_enabled,
             has_primary_keys,
             self.partition_predicate.as_ref(),
@@ -521,7 +509,19 @@ impl<'a> TableScan<'a> {
             &bucket_key_fields,
         )
         .await?;
-        let entries = merge_manifest_entries(entries);
+        Ok(merge_manifest_entries(entries))
+    }
+
+    async fn plan_snapshot(&self, snapshot: Snapshot) -> crate::Result<Plan> {
+        let file_io = self.table.file_io();
+        let table_path = self.table.location();
+        let core_options = CoreOptions::new(self.table.schema().options());
+        let data_evolution_enabled = core_options.data_evolution_enabled();
+        let target_split_size = core_options.source_split_target_size();
+        let open_file_cost = core_options.source_split_open_file_cost();
+        let partition_keys = self.table.schema().partition_keys();
+
+        let entries = self.plan_manifest_entries(&snapshot).await?;
         if entries.is_empty() {
             return Ok(Plan::new(Vec::new()));
         }
diff --git a/crates/paimon/src/table/write_builder.rs 
b/crates/paimon/src/table/write_builder.rs
new file mode 100644
index 0000000..d6458cf
--- /dev/null
+++ b/crates/paimon/src/table/write_builder.rs
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! WriteBuilder for table write API.
+//!
+//! Reference: [pypaimon 
WriteBuilder](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/write_builder.py)
+
+use crate::spec::Datum;
+use crate::table::{Table, TableCommit};
+use std::collections::HashMap;
+use uuid::Uuid;
+
+/// Builder for creating table writers and committers.
+///
+/// Provides `new_write` (TODO) and `new_commit` methods, with optional
+/// `overwrite` support for partition-level overwrites.
+pub struct WriteBuilder<'a> {
+    table: &'a Table,
+    commit_user: String,
+    overwrite_partition: Option<HashMap<String, Datum>>,
+}
+
+impl<'a> WriteBuilder<'a> {
+    pub fn new(table: &'a Table) -> Self {
+        Self {
+            table,
+            commit_user: Uuid::new_v4().to_string(),
+            overwrite_partition: None,
+        }
+    }
+
+    /// Set overwrite mode. If `partition` is None, overwrites the entire 
table.
+    /// If `partition` is Some, overwrites only the specified partition.
+    pub fn overwrite(&mut self, partition: Option<HashMap<String, Datum>>) -> 
&mut Self {
+        self.overwrite_partition = Some(partition.unwrap_or_default());
+        self
+    }
+
+    /// Create a new TableCommit for committing write results.
+    pub fn new_commit(&self) -> TableCommit {
+        TableCommit::new(
+            self.table.clone(),
+            self.commit_user.clone(),
+            self.overwrite_partition.clone(),
+        )
+    }
+
+    // TODO: pub fn new_write(&self) -> TableWrite { ... }
+}
diff --git a/crates/paimon/src/tantivy/directory.rs 
b/crates/paimon/src/tantivy/directory.rs
index 32c3058..22440a6 100644
--- a/crates/paimon/src/tantivy/directory.rs
+++ b/crates/paimon/src/tantivy/directory.rs
@@ -368,9 +368,14 @@ mod tests {
     #[tokio::test]
     async fn test_read_file_from_archive() {
         let dir = make_test_dir().await;
-        // Read any file from the archive and verify it's non-empty.
-        let first_path = dir.files.keys().next().unwrap().clone();
-        let handle = dir.get_file_handle(&first_path).unwrap();
+        // Find a non-empty file (some tantivy index files can be 0 bytes).
+        let non_empty_path = dir
+            .files
+            .iter()
+            .find(|(_, meta)| meta.length > 0)
+            .map(|(p, _)| p.clone())
+            .expect("archive should contain at least one non-empty file");
+        let handle = dir.get_file_handle(&non_empty_path).unwrap();
         assert!(handle.len() > 0);
         let data = handle.read_bytes(0..handle.len()).unwrap();
         assert_eq!(data.len(), handle.len());

Reply via email to