This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new d5dd8fc  feat: add postpone bucket (bucket=-2) write support for 
primary-key tables (#252)
d5dd8fc is described below

commit d5dd8fc61930e87355a07c1727e6b6da88ef7433
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Apr 16 16:48:42 2026 +0800

    feat: add postpone bucket (bucket=-2) write support for primary-key tables 
(#252)
    
    Postpone bucket mode writes data in KV format without sorting or
    deduplication, deferring bucket assignment to background compaction.
    Files are written to `bucket-postpone` directory and are invisible
    to normal reads until compacted.
---
 crates/integrations/datafusion/tests/pk_tables.rs | 121 +++++++
 crates/paimon/src/spec/core_options.rs            |  15 +
 crates/paimon/src/table/data_file_writer.rs       |  10 +-
 crates/paimon/src/table/kv_file_writer.rs         |  57 ++--
 crates/paimon/src/table/mod.rs                    |   1 +
 crates/paimon/src/table/postpone_file_writer.rs   | 305 ++++++++++++++++++
 crates/paimon/src/table/table_scan.rs             |  13 +-
 crates/paimon/src/table/table_write.rs            | 374 +++++++++++++++++++---
 crates/paimon/src/table/write_builder.rs          |   2 +-
 9 files changed, 829 insertions(+), 69 deletions(-)

diff --git a/crates/integrations/datafusion/tests/pk_tables.rs 
b/crates/integrations/datafusion/tests/pk_tables.rs
index 753963d..b2fae22 100644
--- a/crates/integrations/datafusion/tests/pk_tables.rs
+++ b/crates/integrations/datafusion/tests/pk_tables.rs
@@ -1259,3 +1259,124 @@ async fn test_pk_first_row_insert_overwrite() {
         "After second OVERWRITE: still 2 files (no stale level-0 files 
accumulated)"
     );
 }
+
+// ======================= Postpone Bucket (bucket = -2) 
=======================
+
+/// Postpone bucket files are invisible to normal SELECT but visible via 
scan_all_files.
+#[tokio::test]
+async fn test_postpone_write_invisible_to_select() {
+    let (_tmp, catalog) = create_test_env();
+    let handler = create_handler(catalog.clone());
+    handler
+        .sql("CREATE SCHEMA paimon.test_db")
+        .await
+        .expect("CREATE SCHEMA failed");
+
+    handler
+        .sql(
+            "CREATE TABLE paimon.test_db.t_postpone (
+                id INT NOT NULL, value INT,
+                PRIMARY KEY (id)
+            ) WITH ('bucket' = '-2')",
+        )
+        .await
+        .unwrap();
+
+    // Write data
+    handler
+        .sql("INSERT INTO paimon.test_db.t_postpone VALUES (1, 10), (2, 20), 
(3, 30)")
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap();
+
+    // scan_all_files should find the postpone file
+    let table = catalog
+        .get_table(&Identifier::new("test_db", "t_postpone"))
+        .await
+        .unwrap();
+    let plan = table
+        .new_read_builder()
+        .new_scan()
+        .with_scan_all_files()
+        .plan()
+        .await
+        .unwrap();
+    let file_count: usize = plan.splits().iter().map(|s| 
s.data_files().len()).sum();
+    assert_eq!(file_count, 1, "scan_all_files should find 1 postpone file");
+
+    // Normal SELECT should return 0 rows (postpone files are invisible)
+    let count = row_count(&handler, "SELECT * FROM 
paimon.test_db.t_postpone").await;
+    assert_eq!(count, 0, "SELECT should return 0 rows for postpone table");
+}
+
+/// INSERT OVERWRITE on a postpone table should replace old files with new 
ones.
+#[tokio::test]
+async fn test_postpone_insert_overwrite() {
+    let (_tmp, catalog) = create_test_env();
+    let handler = create_handler(catalog.clone());
+    handler
+        .sql("CREATE SCHEMA paimon.test_db")
+        .await
+        .expect("CREATE SCHEMA failed");
+
+    handler
+        .sql(
+            "CREATE TABLE paimon.test_db.t_postpone_ow (
+                id INT NOT NULL, value INT,
+                PRIMARY KEY (id)
+            ) WITH ('bucket' = '-2')",
+        )
+        .await
+        .unwrap();
+
+    // First commit
+    handler
+        .sql("INSERT INTO paimon.test_db.t_postpone_ow VALUES (1, 10), (2, 
20)")
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap();
+
+    let table = catalog
+        .get_table(&Identifier::new("test_db", "t_postpone_ow"))
+        .await
+        .unwrap();
+    let plan = table
+        .new_read_builder()
+        .new_scan()
+        .with_scan_all_files()
+        .plan()
+        .await
+        .unwrap();
+    let file_count: usize = plan.splits().iter().map(|s| 
s.data_files().len()).sum();
+    assert_eq!(file_count, 1, "After INSERT: 1 postpone file");
+
+    // INSERT OVERWRITE should replace old file
+    handler
+        .sql("INSERT OVERWRITE paimon.test_db.t_postpone_ow VALUES (3, 30)")
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap();
+
+    let table = catalog
+        .get_table(&Identifier::new("test_db", "t_postpone_ow"))
+        .await
+        .unwrap();
+    let plan = table
+        .new_read_builder()
+        .new_scan()
+        .with_scan_all_files()
+        .plan()
+        .await
+        .unwrap();
+    let file_count: usize = plan.splits().iter().map(|s| 
s.data_files().len()).sum();
+    assert_eq!(
+        file_count, 1,
+        "After OVERWRITE: only 1 new file (old file deleted)"
+    );
+}
diff --git a/crates/paimon/src/spec/core_options.rs 
b/crates/paimon/src/spec/core_options.rs
index ae9d7cc..85aa801 100644
--- a/crates/paimon/src/spec/core_options.rs
+++ b/crates/paimon/src/spec/core_options.rs
@@ -28,6 +28,11 @@ const BUCKET_KEY_OPTION: &str = "bucket-key";
 const BUCKET_FUNCTION_TYPE_OPTION: &str = "bucket-function.type";
 const BUCKET_OPTION: &str = "bucket";
 const DEFAULT_BUCKET: i32 = -1;
+/// Postpone bucket mode: data is written to `bucket-postpone` directory
+/// and is invisible to readers until compaction assigns real bucket numbers.
+pub const POSTPONE_BUCKET: i32 = -2;
+/// Directory name for postpone bucket files.
+pub const POSTPONE_BUCKET_DIR: &str = "bucket-postpone";
 const COMMIT_MAX_RETRIES_OPTION: &str = "commit.max-retries";
 const COMMIT_TIMEOUT_OPTION: &str = "commit.timeout";
 const COMMIT_MIN_RETRY_WAIT_OPTION: &str = "commit.min-retry-wait";
@@ -63,6 +68,16 @@ pub enum MergeEngine {
     FirstRow,
 }
 
+/// Format the bucket directory name for a given bucket number.
+/// Returns `"bucket-postpone"` for `POSTPONE_BUCKET` (-2), otherwise 
`"bucket-{N}"`.
+pub fn bucket_dir_name(bucket: i32) -> String {
+    if bucket == POSTPONE_BUCKET {
+        POSTPONE_BUCKET_DIR.to_string()
+    } else {
+        format!("bucket-{bucket}")
+    }
+}
+
 /// Typed accessors for common table options.
 ///
 /// This mirrors pypaimon's `CoreOptions` pattern while staying lightweight.
diff --git a/crates/paimon/src/table/data_file_writer.rs 
b/crates/paimon/src/table/data_file_writer.rs
index cfbc55a..e58e0e1 100644
--- a/crates/paimon/src/table/data_file_writer.rs
+++ b/crates/paimon/src/table/data_file_writer.rs
@@ -25,7 +25,7 @@
 use crate::arrow::format::{create_format_writer, FormatFileWriter};
 use crate::io::FileIO;
 use crate::spec::stats::BinaryTableStats;
-use crate::spec::{DataFileMeta, EMPTY_SERIALIZED_ROW};
+use crate::spec::{bucket_dir_name, DataFileMeta, EMPTY_SERIALIZED_ROW};
 use crate::Result;
 use arrow_array::RecordBatch;
 use chrono::Utc;
@@ -133,11 +133,13 @@ impl DataFileWriter {
         );
 
         let bucket_dir = if self.partition_path.is_empty() {
-            format!("{}/bucket-{}", self.table_location, self.bucket)
+            format!("{}/{}", self.table_location, bucket_dir_name(self.bucket))
         } else {
             format!(
-                "{}/{}/bucket-{}",
-                self.table_location, self.partition_path, self.bucket
+                "{}/{}/{}",
+                self.table_location,
+                self.partition_path,
+                bucket_dir_name(self.bucket)
             )
         };
         self.file_io.mkdirs(&format!("{bucket_dir}/")).await?;
diff --git a/crates/paimon/src/table/kv_file_writer.rs 
b/crates/paimon/src/table/kv_file_writer.rs
index 3309820..ef74015 100644
--- a/crates/paimon/src/table/kv_file_writer.rs
+++ b/crates/paimon/src/table/kv_file_writer.rs
@@ -74,9 +74,6 @@ pub(crate) struct KeyValueWriteConfig {
     pub sequence_field_indices: Vec<usize>,
     /// Merge engine for deduplication.
     pub merge_engine: MergeEngine,
-    /// Column index in user schema that provides the row kind value.
-    /// Resolved from: `rowkind.field` option > `_VALUE_KIND` column > None 
(all INSERT).
-    pub value_kind_col_index: Option<usize>,
 }
 
 impl KeyValueFileWriter {
@@ -200,23 +197,8 @@ impl KeyValueFileWriter {
         let min_key = self.extract_key_binary_row(&combined, first_row)?;
         let max_key = self.extract_key_binary_row(&combined, last_row)?;
 
-        // Build physical schema (thin-mode): [_SEQUENCE_NUMBER, _VALUE_KIND, 
all_user_cols...]
-        let user_fields = user_schema.fields();
-        let mut physical_fields: Vec<Arc<ArrowField>> = Vec::new();
-        physical_fields.push(Arc::new(ArrowField::new(
-            SEQUENCE_NUMBER_FIELD_NAME,
-            ArrowDataType::Int64,
-            false,
-        )));
-        physical_fields.push(Arc::new(ArrowField::new(
-            VALUE_KIND_FIELD_NAME,
-            ArrowDataType::Int8,
-            false,
-        )));
-        for field in user_fields.iter() {
-            physical_fields.push(field.clone());
-        }
-        let physical_schema = Arc::new(ArrowSchema::new(physical_fields));
+        // Build physical schema and open writer.
+        let physical_schema = build_physical_schema(&user_schema);
 
         // Open parquet writer.
         let file_name = format!(
@@ -262,8 +244,13 @@ impl KeyValueFileWriter {
                     },
                 )?,
             );
-            // Value kind column.
-            match self.config.value_kind_col_index {
+            // Value kind column — resolve from batch schema.
+            let vk_idx = combined
+                .schema()
+                .fields()
+                .iter()
+                .position(|f| f.name() == crate::spec::VALUE_KIND_FIELD_NAME);
+            match vk_idx {
                 Some(vk_idx) => {
                     physical_columns.push(
                         arrow_select::take::take(
@@ -282,8 +269,11 @@ impl KeyValueFileWriter {
                     physical_columns.push(Arc::new(Int8Array::from(vec![0i8; 
chunk_len])));
                 }
             }
-            // All user columns.
+            // All user columns (skip _VALUE_KIND if present — already handled 
above).
             for idx in 0..combined.num_columns() {
+                if Some(idx) == vk_idx {
+                    continue;
+                }
                 physical_columns.push(
                     arrow_select::take::take(combined.column(idx).as_ref(), 
&chunk_indices, None)
                         .map_err(|e| crate::Error::DataInvalid {
@@ -459,3 +449,24 @@ impl KeyValueFileWriter {
         Ok(builder.build_serialized())
     }
 }
+
+/// Build the physical schema: [_SEQUENCE_NUMBER, _VALUE_KIND, user_cols 
(excluding _VALUE_KIND)...]
+pub(crate) fn build_physical_schema(user_schema: &ArrowSchema) -> 
Arc<ArrowSchema> {
+    let mut physical_fields: Vec<Arc<ArrowField>> = Vec::new();
+    physical_fields.push(Arc::new(ArrowField::new(
+        SEQUENCE_NUMBER_FIELD_NAME,
+        ArrowDataType::Int64,
+        false,
+    )));
+    physical_fields.push(Arc::new(ArrowField::new(
+        VALUE_KIND_FIELD_NAME,
+        ArrowDataType::Int8,
+        false,
+    )));
+    for field in user_schema.fields().iter() {
+        if field.name() != VALUE_KIND_FIELD_NAME {
+            physical_fields.push(field.clone());
+        }
+    }
+    Arc::new(ArrowSchema::new(physical_fields))
+}
diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs
index 8fb8ef3..e63beaf 100644
--- a/crates/paimon/src/table/mod.rs
+++ b/crates/paimon/src/table/mod.rs
@@ -29,6 +29,7 @@ mod full_text_search_builder;
 pub(crate) mod global_index_scanner;
 mod kv_file_reader;
 mod kv_file_writer;
+mod postpone_file_writer;
 mod read_builder;
 pub(crate) mod rest_env;
 pub(crate) mod row_id_predicate;
diff --git a/crates/paimon/src/table/postpone_file_writer.rs 
b/crates/paimon/src/table/postpone_file_writer.rs
new file mode 100644
index 0000000..af7cb06
--- /dev/null
+++ b/crates/paimon/src/table/postpone_file_writer.rs
@@ -0,0 +1,305 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Postpone bucket file writer for primary-key tables with `bucket = -2`.
+//!
+//! Writes data in KV format (`_SEQUENCE_NUMBER`, `_VALUE_KIND` + user columns)
+//! but without sorting or deduplication — compaction assigns real buckets 
later.
+//!
+//! Uses a special file naming prefix: `data-u-{commitUser}-s-0-w-`.
+//!
+//! Reference: 
[PostponeBucketWriter](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/sink/PostponeBucketWriter.java)
+
+use crate::arrow::format::{create_format_writer, FormatFileWriter};
+use crate::io::FileIO;
+use crate::spec::stats::BinaryTableStats;
+use crate::spec::{bucket_dir_name, DataFileMeta, EMPTY_SERIALIZED_ROW, 
VALUE_KIND_FIELD_NAME};
+use crate::table::kv_file_writer::build_physical_schema;
+use crate::Result;
+use arrow_array::{Int64Array, Int8Array, RecordBatch};
+use chrono::{DateTime, Utc};
+use std::sync::Arc;
+use tokio::task::JoinSet;
+
+/// Configuration for [`PostponeFileWriter`].
+pub(crate) struct PostponeWriteConfig {
+    pub table_location: String,
+    pub partition_path: String,
+    pub bucket: i32,
+    pub schema_id: i64,
+    pub target_file_size: i64,
+    pub file_compression: String,
+    pub file_compression_zstd_level: i32,
+    pub write_buffer_size: i64,
+    /// Data file name prefix: `"data-u-{commitUser}-s-0-w-"`.
+    pub data_file_prefix: String,
+}
+
+/// Writer for postpone bucket mode (`bucket = -2`).
+///
+/// Streams data directly to a FormatFileWriter in arrival order (no 
sort/dedup),
+/// prepending `_SEQUENCE_NUMBER` and `_VALUE_KIND` columns to each batch.
+/// Rolls to a new file when `target_file_size` is reached.
+pub(crate) struct PostponeFileWriter {
+    file_io: FileIO,
+    config: PostponeWriteConfig,
+    next_sequence_number: i64,
+    current_writer: Option<Box<dyn FormatFileWriter>>,
+    current_file_name: Option<String>,
+    current_row_count: i64,
+    /// Sequence number at which the current file started.
+    current_file_start_seq: i64,
+    /// Timestamp captured when the current file was opened (used for 
deterministic replay order).
+    current_file_creation_time: DateTime<Utc>,
+    written_files: Vec<DataFileMeta>,
+    /// Background file close tasks spawned during rolling.
+    in_flight_closes: JoinSet<Result<DataFileMeta>>,
+}
+
+impl PostponeFileWriter {
+    pub(crate) fn new(file_io: FileIO, config: PostponeWriteConfig) -> Self {
+        Self {
+            file_io,
+            config,
+            next_sequence_number: 0,
+            current_writer: None,
+            current_file_name: None,
+            current_row_count: 0,
+            current_file_start_seq: 0,
+            current_file_creation_time: Utc::now(),
+            written_files: Vec::new(),
+            in_flight_closes: JoinSet::new(),
+        }
+    }
+
+    pub(crate) async fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+        if batch.num_rows() == 0 {
+            return Ok(());
+        }
+
+        if self.current_writer.is_none() {
+            self.open_new_file(batch.schema()).await?;
+        }
+
+        let num_rows = batch.num_rows();
+        let start_seq = self.next_sequence_number;
+        let end_seq = start_seq + num_rows as i64 - 1;
+        self.next_sequence_number = end_seq + 1;
+
+        // Build physical batch: [_SEQUENCE_NUMBER, _VALUE_KIND, 
all_user_cols...]
+        let mut physical_columns: Vec<Arc<dyn arrow_array::Array>> = 
Vec::new();
+        physical_columns.push(Arc::new(Int64Array::from(
+            (start_seq..=end_seq).collect::<Vec<_>>(),
+        )));
+        let vk_idx = batch
+            .schema()
+            .fields()
+            .iter()
+            .position(|f| f.name() == VALUE_KIND_FIELD_NAME);
+        match vk_idx {
+            Some(vk_idx) => 
physical_columns.push(batch.column(vk_idx).clone()),
+            None => physical_columns.push(Arc::new(Int8Array::from(vec![0i8; 
num_rows]))),
+        }
+        // All user columns (skip _VALUE_KIND if present — already handled 
above).
+        for (i, col) in batch.columns().iter().enumerate() {
+            if Some(i) == vk_idx {
+                continue;
+            }
+            physical_columns.push(col.clone());
+        }
+
+        let physical_schema = build_physical_schema(&batch.schema());
+        let physical_batch =
+            RecordBatch::try_new(physical_schema, 
physical_columns).map_err(|e| {
+                crate::Error::DataInvalid {
+                    message: format!("Failed to create physical batch: {e}"),
+                    source: None,
+                }
+            })?;
+
+        self.current_row_count += num_rows as i64;
+        self.current_writer
+            .as_mut()
+            .unwrap()
+            .write(&physical_batch)
+            .await?;
+
+        // Roll to a new file if target size is reached — close in background
+        if self.current_writer.as_ref().unwrap().num_bytes() as i64 >= 
self.config.target_file_size
+        {
+            self.roll_file();
+        }
+
+        // Flush row group if in-progress buffer exceeds write_buffer_size
+        if let Some(w) = self.current_writer.as_mut() {
+            if w.in_progress_size() as i64 >= self.config.write_buffer_size {
+                w.flush().await?;
+            }
+        }
+
+        Ok(())
+    }
+
+    pub(crate) async fn prepare_commit(&mut self) -> Result<Vec<DataFileMeta>> 
{
+        self.close_current_file().await?;
+        while let Some(result) = self.in_flight_closes.join_next().await {
+            let meta = result.map_err(|e| crate::Error::DataInvalid {
+                message: format!("Background file close task panicked: {e}"),
+                source: None,
+            })??;
+            self.written_files.push(meta);
+        }
+        Ok(std::mem::take(&mut self.written_files))
+    }
+
+    /// Spawn the current writer's close in the background for non-blocking 
rolling.
+    fn roll_file(&mut self) {
+        let writer = match self.current_writer.take() {
+            Some(w) => w,
+            None => return,
+        };
+        let file_name = self.current_file_name.take().unwrap();
+        let row_count = self.current_row_count;
+        let min_seq = self.current_file_start_seq;
+        let max_seq = self.next_sequence_number - 1;
+        self.current_row_count = 0;
+        let schema_id = self.config.schema_id;
+        // Capture creation_time from when the file was opened, not when the 
async close finishes.
+        // Java's postpone compaction sorts by creationTime for replay order.
+        let creation_time = self.current_file_creation_time;
+
+        self.in_flight_closes.spawn(async move {
+            let file_size = writer.close().await? as i64;
+            Ok(build_meta(
+                file_name,
+                file_size,
+                row_count,
+                min_seq,
+                max_seq,
+                schema_id,
+                creation_time,
+            ))
+        });
+    }
+
+    async fn open_new_file(&mut self, user_schema: arrow_schema::SchemaRef) -> 
Result<()> {
+        let file_name = format!(
+            "{}{}-{}.parquet",
+            self.config.data_file_prefix,
+            uuid::Uuid::new_v4(),
+            self.written_files.len()
+        );
+        let bucket_dir = if self.config.partition_path.is_empty() {
+            format!(
+                "{}/{}",
+                self.config.table_location,
+                bucket_dir_name(self.config.bucket)
+            )
+        } else {
+            format!(
+                "{}/{}/{}",
+                self.config.table_location,
+                self.config.partition_path,
+                bucket_dir_name(self.config.bucket)
+            )
+        };
+        self.file_io.mkdirs(&format!("{bucket_dir}/")).await?;
+        let physical_schema = build_physical_schema(&user_schema);
+        let file_path = format!("{bucket_dir}/{file_name}");
+        let output = self.file_io.new_output(&file_path)?;
+        let writer = create_format_writer(
+            &output,
+            physical_schema,
+            &self.config.file_compression,
+            self.config.file_compression_zstd_level,
+        )
+        .await?;
+        self.current_writer = Some(writer);
+        self.current_file_name = Some(file_name);
+        self.current_row_count = 0;
+        self.current_file_start_seq = self.next_sequence_number;
+        self.current_file_creation_time = Utc::now();
+        Ok(())
+    }
+
+    async fn close_current_file(&mut self) -> Result<()> {
+        let writer = match self.current_writer.take() {
+            Some(w) => w,
+            None => return Ok(()),
+        };
+        let file_name = self.current_file_name.take().unwrap();
+        let row_count = self.current_row_count;
+        self.current_row_count = 0;
+        let file_size = writer.close().await? as i64;
+
+        let min_seq = self.current_file_start_seq;
+        let max_seq = self.next_sequence_number - 1;
+
+        let meta = build_meta(
+            file_name,
+            file_size,
+            row_count,
+            min_seq,
+            max_seq,
+            self.config.schema_id,
+            self.current_file_creation_time,
+        );
+        self.written_files.push(meta);
+        Ok(())
+    }
+}
+
+fn build_meta(
+    file_name: String,
+    file_size: i64,
+    row_count: i64,
+    min_seq: i64,
+    max_seq: i64,
+    schema_id: i64,
+    creation_time: DateTime<Utc>,
+) -> DataFileMeta {
+    DataFileMeta {
+        file_name,
+        file_size,
+        row_count,
+        min_key: EMPTY_SERIALIZED_ROW.clone(),
+        max_key: EMPTY_SERIALIZED_ROW.clone(),
+        key_stats: BinaryTableStats::new(
+            EMPTY_SERIALIZED_ROW.clone(),
+            EMPTY_SERIALIZED_ROW.clone(),
+            vec![],
+        ),
+        value_stats: BinaryTableStats::new(
+            EMPTY_SERIALIZED_ROW.clone(),
+            EMPTY_SERIALIZED_ROW.clone(),
+            vec![],
+        ),
+        min_sequence_number: min_seq,
+        max_sequence_number: max_seq,
+        schema_id,
+        level: 0,
+        extra_files: vec![],
+        creation_time: Some(creation_time),
+        delete_row_count: Some(0),
+        embedded_index: None,
+        file_source: Some(0), // FileSource.APPEND
+        value_stats_cols: Some(vec![]),
+        external_path: None,
+        first_row_id: None,
+        write_cols: None,
+    }
+}
diff --git a/crates/paimon/src/table/table_scan.rs 
b/crates/paimon/src/table/table_scan.rs
index 9b2a7e5..e71aae0 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -30,8 +30,9 @@ use super::Table;
 use crate::io::FileIO;
 use crate::predicate_stats::data_leaf_may_match;
 use crate::spec::{
-    eval_row, BinaryRow, CoreOptions, DataField, DataFileMeta, FileKind, 
IndexManifest,
-    ManifestEntry, ManifestFileMeta, PartitionComputer, Predicate, Snapshot, 
TimeTravelSelector,
+    bucket_dir_name, eval_row, BinaryRow, CoreOptions, DataField, 
DataFileMeta, FileKind,
+    IndexManifest, ManifestEntry, ManifestFileMeta, PartitionComputer, 
Predicate, Snapshot,
+    TimeTravelSelector,
 };
 use crate::table::bin_pack::split_for_batch;
 use crate::table::source::{
@@ -135,6 +136,7 @@ async fn read_all_manifest_entries(
     table_path: &str,
     snapshot: &Snapshot,
     skip_level_zero: bool,
+    scan_all_files: bool,
     has_primary_keys: bool,
     partition_predicate: Option<&Predicate>,
     partition_fields: &[DataField],
@@ -173,7 +175,7 @@ async fn read_all_manifest_entries(
                         if skip_level_zero && has_primary_keys && 
entry.file().level == 0 {
                             return false;
                         }
-                        if has_primary_keys && entry.bucket() < 0 {
+                        if has_primary_keys && !scan_all_files && 
entry.bucket() < 0 {
                             return false;
                         }
                         if let Some(pred) = bucket_predicate {
@@ -537,6 +539,7 @@ impl<'a> TableScan<'a> {
             table_path,
             snapshot,
             skip_level_zero,
+            self.scan_all_files,
             has_primary_keys,
             self.partition_predicate.as_ref(),
             &partition_fields,
@@ -673,9 +676,9 @@ impl<'a> TableScan<'a> {
 
             let bucket_path = if let Some(ref computer) = partition_computer {
                 let partition_path = 
computer.generate_partition_path(&partition_row)?;
-                format!("{base_path}/{partition_path}bucket-{bucket}")
+                format!("{base_path}/{partition_path}{}", 
bucket_dir_name(bucket))
             } else {
-                format!("{base_path}/bucket-{bucket}")
+                format!("{base_path}/{}", bucket_dir_name(bucket))
             };
 
             // Original `partition` Vec consumed by PartitionBucket for DV map 
lookup.
diff --git a/crates/paimon/src/table/table_write.rs 
b/crates/paimon/src/table/table_write.rs
index de7e0fa..5c6145d 100644
--- a/crates/paimon/src/table/table_write.rs
+++ b/crates/paimon/src/table/table_write.rs
@@ -24,11 +24,12 @@ use crate::spec::DataFileMeta;
 use crate::spec::PartitionComputer;
 use crate::spec::{
     extract_datum_from_arrow, BinaryRow, BinaryRowBuilder, CoreOptions, 
DataField, DataType, Datum,
-    MergeEngine, Predicate, PredicateBuilder, EMPTY_SERIALIZED_ROW,
+    MergeEngine, Predicate, PredicateBuilder, EMPTY_SERIALIZED_ROW, 
POSTPONE_BUCKET,
 };
 use crate::table::commit_message::CommitMessage;
 use crate::table::data_file_writer::DataFileWriter;
 use crate::table::kv_file_writer::{KeyValueFileWriter, KeyValueWriteConfig};
+use crate::table::postpone_file_writer::{PostponeFileWriter, 
PostponeWriteConfig};
 use crate::table::{SnapshotManager, Table, TableScan};
 use crate::Result;
 use arrow_array::RecordBatch;
@@ -43,10 +44,11 @@ fn schema_contains_blob_type(fields: &[DataField]) -> bool {
         .any(|field| field.data_type().contains_blob_type())
 }
 
-/// Enum to hold either an append-only writer or a key-value writer.
+/// Enum to hold either an append-only writer, a key-value writer, or a 
postpone writer.
 enum FileWriter {
     Append(DataFileWriter),
     KeyValue(KeyValueFileWriter),
+    Postpone(PostponeFileWriter),
 }
 
 impl FileWriter {
@@ -54,6 +56,7 @@ impl FileWriter {
         match self {
             FileWriter::Append(w) => w.write(batch).await,
             FileWriter::KeyValue(w) => w.write(batch).await,
+            FileWriter::Postpone(w) => w.write(batch).await,
         }
     }
 
@@ -61,6 +64,7 @@ impl FileWriter {
         match self {
             FileWriter::Append(ref mut w) => w.prepare_commit().await,
             FileWriter::KeyValue(ref mut w) => w.prepare_commit().await,
+            FileWriter::Postpone(ref mut w) => w.prepare_commit().await,
         }
     }
 }
@@ -95,14 +99,14 @@ pub struct TableWrite {
     sequence_field_indices: Vec<usize>,
     /// Merge engine for primary-key tables.
     merge_engine: MergeEngine,
-    /// Column index in user schema for row kind (resolved from rowkind.field 
or _VALUE_KIND).
-    value_kind_col_index: Option<usize>,
     /// Cache of per-partition bucket→max_sequence_number, lazily populated on 
first write.
     partition_seq_cache: HashMap<Vec<u8>, HashMap<i32, i64>>,
+    /// Commit user identifier, used for postpone file naming.
+    commit_user: String,
 }
 
 impl TableWrite {
-    pub(crate) fn new(table: &Table) -> crate::Result<Self> {
+    pub(crate) fn new(table: &Table, commit_user: String) -> 
crate::Result<Self> {
         let schema = table.schema();
         let core_options = CoreOptions::new(schema.options());
 
@@ -124,16 +128,17 @@ impl TableWrite {
         let has_primary_keys = !schema.primary_keys().is_empty();
 
         if has_primary_keys {
-            if total_buckets < 1 {
+            if total_buckets < 1 && total_buckets != POSTPONE_BUCKET {
                 return Err(crate::Error::Unsupported {
                     message: format!(
-                        "KeyValueFileWriter does not support 
bucket={total_buckets}, only fixed bucket (>= 1) is supported"
+                        "KeyValueFileWriter does not support 
bucket={total_buckets}, only fixed bucket (>= 1) or postpone bucket (= -2) is 
supported"
                     ),
                 });
             }
-            if core_options
-                .changelog_producer()
-                .eq_ignore_ascii_case("input")
+            if total_buckets != POSTPONE_BUCKET
+                && core_options
+                    .changelog_producer()
+                    .eq_ignore_ascii_case("input")
             {
                 return Err(crate::Error::Unsupported {
                     message: "KeyValueFileWriter does not support 
changelog-producer=input"
@@ -200,11 +205,6 @@ impl TableWrite {
             });
         }
 
-        // Resolve value_kind column from _VALUE_KIND in user schema, if 
present.
-        let value_kind_col_index = fields
-            .iter()
-            .position(|f| f.name() == crate::spec::VALUE_KIND_FIELD_NAME);
-
         Ok(Self {
             table: table.clone(),
             partition_writers: HashMap::new(),
@@ -222,8 +222,8 @@ impl TableWrite {
             primary_key_types,
             sequence_field_indices,
             merge_engine,
-            value_kind_col_index,
             partition_seq_cache: HashMap::new(),
+            commit_user,
         })
     }
 
@@ -297,7 +297,15 @@ impl TableWrite {
     ) -> Result<Vec<(PartitionBucketKey, RecordBatch)>> {
         // Fast path: no partitions and single bucket — skip per-row routing
         if self.partition_field_indices.is_empty() && self.total_buckets <= 1 {
-            return Ok(vec![((EMPTY_SERIALIZED_ROW.clone(), 0), 
batch.clone())]);
+            let bucket = if self.total_buckets == POSTPONE_BUCKET {
+                POSTPONE_BUCKET
+            } else {
+                0
+            };
+            return Ok(vec![(
+                (EMPTY_SERIALIZED_ROW.clone(), bucket),
+                batch.clone(),
+            )]);
         }
 
         let fields = self.table.schema().fields();
@@ -412,7 +420,9 @@ impl TableWrite {
         };
 
         // Compute bucket
-        let bucket = if self.total_buckets <= 1 || 
self.bucket_key_indices.is_empty() {
+        let bucket = if self.total_buckets == POSTPONE_BUCKET {
+            POSTPONE_BUCKET
+        } else if self.total_buckets <= 1 || 
self.bucket_key_indices.is_empty() {
             0
         } else {
             let mut datums: Vec<(Option<Datum>, DataType)> = Vec::new();
@@ -438,6 +448,7 @@ impl TableWrite {
         };
 
         let writer = if self.primary_key_indices.is_empty() {
+            // Append-only writer for non-PK tables.
             FileWriter::Append(DataFileWriter::new(
                 self.table.file_io().clone(),
                 self.table.location().to_string(),
@@ -452,6 +463,23 @@ impl TableWrite {
                 None,    // first_row_id: assigned by commit
                 None,    // write_cols: full-row write
             ))
+        } else if bucket == POSTPONE_BUCKET {
+            // Postpone bucket: KV format but no sorting/dedup, special file 
naming.
+            let data_file_prefix = format!("data-u-{}-s-0-w-", 
self.commit_user);
+            FileWriter::Postpone(PostponeFileWriter::new(
+                self.table.file_io().clone(),
+                PostponeWriteConfig {
+                    table_location: self.table.location().to_string(),
+                    partition_path,
+                    bucket,
+                    schema_id: self.schema_id,
+                    target_file_size: self.target_file_size,
+                    file_compression: self.file_compression.clone(),
+                    file_compression_zstd_level: 
self.file_compression_zstd_level,
+                    write_buffer_size: self.write_buffer_size,
+                    data_file_prefix,
+                },
+            ))
         } else {
             // Lazily scan partition sequence numbers on first writer creation 
per partition.
             if !self.partition_seq_cache.contains_key(&partition_bytes) {
@@ -481,7 +509,6 @@ impl TableWrite {
                     primary_key_types: self.primary_key_types.clone(),
                     sequence_field_indices: 
self.sequence_field_indices.clone(),
                     merge_engine: self.merge_engine,
-                    value_kind_col_index: self.value_kind_col_index,
                 },
                 next_seq,
             ))
@@ -504,6 +531,7 @@ mod tests {
     };
     use crate::table::{SnapshotManager, TableCommit};
     use arrow_array::Int32Array;
+    use arrow_array::RecordBatchReader as _;
     use arrow_schema::{
         DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, 
TimeUnit,
     };
@@ -625,7 +653,7 @@ mod tests {
         setup_dirs(&file_io, table_path).await;
 
         let table = test_table(&file_io, table_path);
-        let mut table_write = TableWrite::new(&table).unwrap();
+        let mut table_write = TableWrite::new(&table, 
"test-user".to_string()).unwrap();
 
         let batch = make_batch(vec![1, 2, 3], vec![10, 20, 30]);
         table_write.write_arrow_batch(&batch).await.unwrap();
@@ -656,7 +684,9 @@ mod tests {
             None,
         );
 
-        let err = TableWrite::new(&table).err().unwrap();
+        let err = TableWrite::new(&table, "test-user".to_string())
+            .err()
+            .unwrap();
         assert!(
             matches!(err, crate::Error::Unsupported { message } if 
message.contains("BlobType"))
         );
@@ -672,7 +702,9 @@ mod tests {
             None,
         );
 
-        let err = TableWrite::new(&table).err().unwrap();
+        let err = TableWrite::new(&table, "test-user".to_string())
+            .err()
+            .unwrap();
         assert!(
             matches!(err, crate::Error::Unsupported { message } if 
message.contains("BlobType"))
         );
@@ -685,7 +717,7 @@ mod tests {
         setup_dirs(&file_io, table_path).await;
 
         let table = test_partitioned_table(&file_io, table_path);
-        let mut table_write = TableWrite::new(&table).unwrap();
+        let mut table_write = TableWrite::new(&table, 
"test-user".to_string()).unwrap();
 
         let batch = make_partitioned_batch(vec!["a", "b", "a"], vec![1, 2, 3]);
         table_write.write_arrow_batch(&batch).await.unwrap();
@@ -716,7 +748,7 @@ mod tests {
         let file_io = test_file_io();
         let table_path = "memory:/test_table_write_empty";
         let table = test_table(&file_io, table_path);
-        let mut table_write = TableWrite::new(&table).unwrap();
+        let mut table_write = TableWrite::new(&table, 
"test-user".to_string()).unwrap();
 
         let batch = make_batch(vec![], vec![]);
         table_write.write_arrow_batch(&batch).await.unwrap();
@@ -732,7 +764,7 @@ mod tests {
         setup_dirs(&file_io, table_path).await;
 
         let table = test_table(&file_io, table_path);
-        let mut table_write = TableWrite::new(&table).unwrap();
+        let mut table_write = TableWrite::new(&table, 
"test-user".to_string()).unwrap();
 
         // First write + prepare_commit
         table_write
@@ -764,7 +796,7 @@ mod tests {
         setup_dirs(&file_io, table_path).await;
 
         let table = test_table(&file_io, table_path);
-        let mut table_write = TableWrite::new(&table).unwrap();
+        let mut table_write = TableWrite::new(&table, 
"test-user".to_string()).unwrap();
 
         table_write
             .write_arrow_batch(&make_batch(vec![1, 2], vec![10, 20]))
@@ -828,7 +860,7 @@ mod tests {
         setup_dirs(&file_io, table_path).await;
 
         let table = test_bucketed_table(&file_io, table_path);
-        let mut table_write = TableWrite::new(&table).unwrap();
+        let mut table_write = TableWrite::new(&table, 
"test-user".to_string()).unwrap();
 
         // Row with NULL bucket key should not panic
         let batch = make_nullable_id_batch(vec![None, Some(1), None], vec![10, 
20, 30]);
@@ -850,7 +882,7 @@ mod tests {
         setup_dirs(&file_io, table_path).await;
 
         let table = test_bucketed_table(&file_io, table_path);
-        let mut table_write = TableWrite::new(&table).unwrap();
+        let mut table_write = TableWrite::new(&table, 
"test-user".to_string()).unwrap();
 
         // Two NULLs should land in the same bucket
         let batch = make_nullable_id_batch(vec![None, None], vec![10, 20]);
@@ -878,7 +910,7 @@ mod tests {
 
         // Compute bucket for NULL key
         let fields = table.schema().fields().to_vec();
-        let tw = TableWrite::new(&table).unwrap();
+        let tw = TableWrite::new(&table, "test-user".to_string()).unwrap();
 
         let batch_null = make_nullable_id_batch(vec![None], vec![10]);
         let (_, bucket_null) = tw
@@ -943,7 +975,7 @@ mod tests {
                 None,
             );
 
-            let tw = TableWrite::new(&table).unwrap();
+            let tw = TableWrite::new(&table, "test-user".to_string()).unwrap();
             let fields = table.schema().fields().to_vec();
 
             // Build a batch: d=NULL, ltz=NULL, ntz=NULL, k=1
@@ -1017,7 +1049,7 @@ mod tests {
             None,
         );
 
-        let mut table_write = TableWrite::new(&table).unwrap();
+        let mut table_write = TableWrite::new(&table, 
"test-user".to_string()).unwrap();
 
         // Write multiple batches — each should roll to a new file
         table_write
@@ -1070,7 +1102,7 @@ mod tests {
         setup_dirs(&file_io, table_path).await;
 
         let table = test_pk_table(&file_io, table_path);
-        let mut table_write = TableWrite::new(&table).unwrap();
+        let mut table_write = TableWrite::new(&table, 
"test-user".to_string()).unwrap();
 
         let batch = make_batch(vec![3, 1, 2], vec![30, 10, 20]);
         table_write.write_arrow_batch(&batch).await.unwrap();
@@ -1105,7 +1137,7 @@ mod tests {
         setup_dirs(&file_io, table_path).await;
 
         let table = test_pk_table(&file_io, table_path);
-        let mut table_write = TableWrite::new(&table).unwrap();
+        let mut table_write = TableWrite::new(&table, 
"test-user".to_string()).unwrap();
 
         // Write unsorted data
         let batch = make_batch(vec![5, 2, 4, 1, 3], vec![50, 20, 40, 10, 30]);
@@ -1163,7 +1195,7 @@ mod tests {
         let table = test_pk_table(&file_io, table_path);
 
         // First commit: id=1,2,3
-        let mut tw1 = TableWrite::new(&table).unwrap();
+        let mut tw1 = TableWrite::new(&table, 
"test-user".to_string()).unwrap();
         tw1.write_arrow_batch(&make_batch(vec![1, 2, 3], vec![10, 20, 30]))
             .await
             .unwrap();
@@ -1172,7 +1204,7 @@ mod tests {
         commit.commit(msgs1).await.unwrap();
 
         // Second commit: id=2,3,4 with updated values, higher sequence numbers
-        let mut tw2 = TableWrite::new(&table).unwrap();
+        let mut tw2 = TableWrite::new(&table, 
"test-user".to_string()).unwrap();
         tw2.write_arrow_batch(&make_batch(vec![2, 3, 4], vec![200, 300, 400]))
             .await
             .unwrap();
@@ -1225,7 +1257,7 @@ mod tests {
         setup_dirs(&file_io, table_path).await;
 
         let table = test_pk_table(&file_io, table_path);
-        let mut table_write = TableWrite::new(&table).unwrap();
+        let mut table_write = TableWrite::new(&table, 
"test-user".to_string()).unwrap();
 
         let batch = make_batch(vec![1, 2], vec![10, 20]);
         table_write.write_arrow_batch(&batch).await.unwrap();
@@ -1236,4 +1268,274 @@ mod tests {
         assert_eq!(file.min_sequence_number, 0);
         assert_eq!(file.max_sequence_number, 1);
     }
+
+    // -----------------------------------------------------------------------
+    // Postpone bucket (bucket = -2) write tests
+    // -----------------------------------------------------------------------
+
+    fn test_postpone_pk_schema() -> TableSchema {
+        let schema = Schema::builder()
+            .column("id", DataType::Int(IntType::new()))
+            .column("value", DataType::Int(IntType::new()))
+            .primary_key(["id"])
+            .option("bucket", "-2")
+            .build()
+            .unwrap();
+        TableSchema::new(0, &schema)
+    }
+
+    fn test_postpone_pk_table(file_io: &FileIO, table_path: &str) -> Table {
+        Table::new(
+            file_io.clone(),
+            Identifier::new("default", "test_postpone_table"),
+            table_path.to_string(),
+            test_postpone_pk_schema(),
+            None,
+        )
+    }
+
+    fn test_postpone_partitioned_schema() -> TableSchema {
+        let schema = Schema::builder()
+            .column("pt", DataType::VarChar(VarCharType::string_type()))
+            .column("id", DataType::Int(IntType::new()))
+            .column("value", DataType::Int(IntType::new()))
+            .primary_key(["pt", "id"])
+            .partition_keys(["pt"])
+            .option("bucket", "-2")
+            .build()
+            .unwrap();
+        TableSchema::new(0, &schema)
+    }
+
+    fn test_postpone_partitioned_table(file_io: &FileIO, table_path: &str) -> 
Table {
+        Table::new(
+            file_io.clone(),
+            Identifier::new("default", "test_postpone_table"),
+            table_path.to_string(),
+            test_postpone_partitioned_schema(),
+            None,
+        )
+    }
+
+    fn make_partitioned_batch_3col(pts: Vec<&str>, ids: Vec<i32>, values: 
Vec<i32>) -> RecordBatch {
+        let schema = Arc::new(ArrowSchema::new(vec![
+            ArrowField::new("pt", ArrowDataType::Utf8, false),
+            ArrowField::new("id", ArrowDataType::Int32, false),
+            ArrowField::new("value", ArrowDataType::Int32, false),
+        ]));
+        RecordBatch::try_new(
+            schema,
+            vec![
+                Arc::new(arrow_array::StringArray::from(pts)),
+                Arc::new(Int32Array::from(ids)),
+                Arc::new(Int32Array::from(values)),
+            ],
+        )
+        .unwrap()
+    }
+
+    #[tokio::test]
+    async fn test_postpone_write_and_commit() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_postpone_write";
+        setup_dirs(&file_io, table_path).await;
+
+        let table = test_postpone_pk_table(&file_io, table_path);
+        let mut table_write = TableWrite::new(&table, 
"test-user".to_string()).unwrap();
+
+        let batch = make_batch(vec![3, 1, 2], vec![30, 10, 20]);
+        table_write.write_arrow_batch(&batch).await.unwrap();
+
+        let messages = table_write.prepare_commit().await.unwrap();
+        assert_eq!(messages.len(), 1);
+        assert_eq!(messages[0].bucket, POSTPONE_BUCKET);
+        assert_eq!(messages[0].new_files.len(), 1);
+        assert_eq!(messages[0].new_files[0].row_count, 3);
+
+        // Commit and verify snapshot
+        let commit = TableCommit::new(table, "test-user".to_string());
+        commit.commit(messages).await.unwrap();
+
+        let snap_manager = SnapshotManager::new(file_io.clone(), 
table_path.to_string());
+        let snapshot = 
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+        assert_eq!(snapshot.id(), 1);
+        assert_eq!(snapshot.total_record_count(), Some(3));
+    }
+
+    #[tokio::test]
+    async fn test_postpone_write_empty_batch() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_postpone_empty";
+        let table = test_postpone_pk_table(&file_io, table_path);
+        let mut table_write = TableWrite::new(&table, 
"test-user".to_string()).unwrap();
+
+        let batch = make_batch(vec![], vec![]);
+        table_write.write_arrow_batch(&batch).await.unwrap();
+
+        let messages = table_write.prepare_commit().await.unwrap();
+        assert!(messages.is_empty());
+    }
+
+    #[tokio::test]
+    async fn test_postpone_write_multiple_batches() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_postpone_multi";
+        setup_dirs(&file_io, table_path).await;
+
+        let table = test_postpone_pk_table(&file_io, table_path);
+        let mut table_write = TableWrite::new(&table, 
"test-user".to_string()).unwrap();
+
+        table_write
+            .write_arrow_batch(&make_batch(vec![1, 2], vec![10, 20]))
+            .await
+            .unwrap();
+        table_write
+            .write_arrow_batch(&make_batch(vec![3, 4], vec![30, 40]))
+            .await
+            .unwrap();
+
+        let messages = table_write.prepare_commit().await.unwrap();
+        assert_eq!(messages.len(), 1);
+        assert_eq!(messages[0].bucket, POSTPONE_BUCKET);
+
+        let total_rows: i64 = messages[0].new_files.iter().map(|f| 
f.row_count).sum();
+        assert_eq!(total_rows, 4);
+    }
+
+    #[tokio::test]
+    async fn test_postpone_write_partitioned() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_postpone_partitioned";
+        setup_dirs(&file_io, table_path).await;
+
+        let table = test_postpone_partitioned_table(&file_io, table_path);
+        let mut table_write = TableWrite::new(&table, 
"test-user".to_string()).unwrap();
+
+        let batch =
+            make_partitioned_batch_3col(vec!["a", "b", "a"], vec![1, 2, 3], 
vec![10, 20, 30]);
+        table_write.write_arrow_batch(&batch).await.unwrap();
+
+        let messages = table_write.prepare_commit().await.unwrap();
+        // 2 partitions
+        assert_eq!(messages.len(), 2);
+        // All messages should use POSTPONE_BUCKET
+        for msg in &messages {
+            assert_eq!(msg.bucket, POSTPONE_BUCKET);
+        }
+
+        let total_rows: i64 = messages
+            .iter()
+            .flat_map(|m| &m.new_files)
+            .map(|f| f.row_count)
+            .sum();
+        assert_eq!(total_rows, 3);
+
+        // Commit and verify
+        let commit = TableCommit::new(table, "test-user".to_string());
+        commit.commit(messages).await.unwrap();
+
+        let snap_manager = SnapshotManager::new(file_io.clone(), 
table_path.to_string());
+        let snapshot = 
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+        assert_eq!(snapshot.id(), 1);
+        assert_eq!(snapshot.total_record_count(), Some(3));
+    }
+
+    #[tokio::test]
+    async fn test_postpone_write_reusable() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_postpone_reuse";
+        setup_dirs(&file_io, table_path).await;
+
+        let table = test_postpone_pk_table(&file_io, table_path);
+        let mut table_write = TableWrite::new(&table, 
"test-user".to_string()).unwrap();
+
+        // First write + prepare_commit
+        table_write
+            .write_arrow_batch(&make_batch(vec![1, 2], vec![10, 20]))
+            .await
+            .unwrap();
+        let messages1 = table_write.prepare_commit().await.unwrap();
+        assert_eq!(messages1.len(), 1);
+        assert_eq!(messages1[0].new_files[0].row_count, 2);
+
+        // Second write + prepare_commit (reuse)
+        table_write
+            .write_arrow_batch(&make_batch(vec![3, 4, 5], vec![30, 40, 50]))
+            .await
+            .unwrap();
+        let messages2 = table_write.prepare_commit().await.unwrap();
+        assert_eq!(messages2.len(), 1);
+        assert_eq!(messages2[0].new_files[0].row_count, 3);
+
+        // Empty prepare_commit
+        let messages3 = table_write.prepare_commit().await.unwrap();
+        assert!(messages3.is_empty());
+    }
+
+    #[tokio::test]
+    async fn test_postpone_write_file_naming_and_kv_format() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_postpone_kv";
+        setup_dirs(&file_io, table_path).await;
+
+        let table = test_postpone_pk_table(&file_io, table_path);
+        let mut table_write = TableWrite::new(&table, 
"my-commit-user".to_string()).unwrap();
+
+        let batch = make_batch(vec![3, 1, 2], vec![30, 10, 20]);
+        table_write.write_arrow_batch(&batch).await.unwrap();
+
+        let messages = table_write.prepare_commit().await.unwrap();
+        let file = &messages[0].new_files[0];
+
+        // Verify postpone file naming: 
data-u-{commitUser}-s-{writeId}-w-{uuid}-{index}.parquet
+        assert!(
+            file.file_name.starts_with("data-u-my-commit-user-s-"),
+            "Expected postpone file prefix, got: {}",
+            file.file_name
+        );
+        assert!(
+            file.file_name.contains("-w-"),
+            "Expected -w- in file name, got: {}",
+            file.file_name
+        );
+
+        // Verify KV format: read the parquet file and check physical columns
+        let bucket_dir = format!("{table_path}/bucket-postpone");
+        let file_path = format!("{bucket_dir}/{}", file.file_name);
+        let input = file_io.new_input(&file_path).unwrap();
+        let data = input.read().await.unwrap();
+        let reader =
+            
parquet::arrow::arrow_reader::ParquetRecordBatchReader::try_new(data, 
1024).unwrap();
+        let schema = reader.schema();
+        // Physical schema: [_SEQUENCE_NUMBER, _VALUE_KIND, id, value]
+        assert_eq!(schema.fields().len(), 4);
+        assert_eq!(schema.field(0).name(), "_SEQUENCE_NUMBER");
+        assert_eq!(schema.field(1).name(), "_VALUE_KIND");
+        assert_eq!(schema.field(2).name(), "id");
+        assert_eq!(schema.field(3).name(), "value");
+
+        // Data should be in arrival order (not sorted by PK): 3, 1, 2
+        let batches: Vec<RecordBatch> = reader.into_iter().map(|r| 
r.unwrap()).collect();
+        let ids: Vec<i32> = batches
+            .iter()
+            .flat_map(|b| {
+                b.column(2)
+                    .as_any()
+                    .downcast_ref::<Int32Array>()
+                    .unwrap()
+                    .values()
+                    .iter()
+                    .copied()
+            })
+            .collect();
+        assert_eq!(
+            ids,
+            vec![3, 1, 2],
+            "Postpone mode should preserve arrival order"
+        );
+
+        // Empty key stats for postpone mode
+        assert_eq!(file.min_key, EMPTY_SERIALIZED_ROW.clone());
+        assert_eq!(file.max_key, EMPTY_SERIALIZED_ROW.clone());
+    }
 }
diff --git a/crates/paimon/src/table/write_builder.rs 
b/crates/paimon/src/table/write_builder.rs
index 57a4820..9f238d6 100644
--- a/crates/paimon/src/table/write_builder.rs
+++ b/crates/paimon/src/table/write_builder.rs
@@ -49,6 +49,6 @@ impl<'a> WriteBuilder<'a> {
     /// For primary-key tables, sequence numbers are lazily scanned per 
partition
     /// when the first writer for that partition is created.
     pub fn new_write(&self) -> crate::Result<TableWrite> {
-        TableWrite::new(self.table)
+        TableWrite::new(self.table, self.commit_user.clone())
     }
 }

Reply via email to