This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new d5dd8fc feat: add postpone bucket (bucket=-2) write support for
primary-key tables (#252)
d5dd8fc is described below
commit d5dd8fc61930e87355a07c1727e6b6da88ef7433
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Apr 16 16:48:42 2026 +0800
feat: add postpone bucket (bucket=-2) write support for primary-key tables
(#252)
Postpone bucket mode writes data in KV format without sorting or
deduplication, deferring bucket assignment to background compaction.
Files are written to `bucket-postpone` directory and are invisible
to normal reads until compacted.
---
crates/integrations/datafusion/tests/pk_tables.rs | 121 +++++++
crates/paimon/src/spec/core_options.rs | 15 +
crates/paimon/src/table/data_file_writer.rs | 10 +-
crates/paimon/src/table/kv_file_writer.rs | 57 ++--
crates/paimon/src/table/mod.rs | 1 +
crates/paimon/src/table/postpone_file_writer.rs | 305 ++++++++++++++++++
crates/paimon/src/table/table_scan.rs | 13 +-
crates/paimon/src/table/table_write.rs | 374 +++++++++++++++++++---
crates/paimon/src/table/write_builder.rs | 2 +-
9 files changed, 829 insertions(+), 69 deletions(-)
diff --git a/crates/integrations/datafusion/tests/pk_tables.rs
b/crates/integrations/datafusion/tests/pk_tables.rs
index 753963d..b2fae22 100644
--- a/crates/integrations/datafusion/tests/pk_tables.rs
+++ b/crates/integrations/datafusion/tests/pk_tables.rs
@@ -1259,3 +1259,124 @@ async fn test_pk_first_row_insert_overwrite() {
"After second OVERWRITE: still 2 files (no stale level-0 files
accumulated)"
);
}
+
+// ======================= Postpone Bucket (bucket = -2)
=======================
+
+/// Postpone bucket files are invisible to normal SELECT but visible via
scan_all_files.
+#[tokio::test]
+async fn test_postpone_write_invisible_to_select() {
+ let (_tmp, catalog) = create_test_env();
+ let handler = create_handler(catalog.clone());
+ handler
+ .sql("CREATE SCHEMA paimon.test_db")
+ .await
+ .expect("CREATE SCHEMA failed");
+
+ handler
+ .sql(
+ "CREATE TABLE paimon.test_db.t_postpone (
+ id INT NOT NULL, value INT,
+ PRIMARY KEY (id)
+ ) WITH ('bucket' = '-2')",
+ )
+ .await
+ .unwrap();
+
+ // Write data
+ handler
+ .sql("INSERT INTO paimon.test_db.t_postpone VALUES (1, 10), (2, 20),
(3, 30)")
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ // scan_all_files should find the postpone file
+ let table = catalog
+ .get_table(&Identifier::new("test_db", "t_postpone"))
+ .await
+ .unwrap();
+ let plan = table
+ .new_read_builder()
+ .new_scan()
+ .with_scan_all_files()
+ .plan()
+ .await
+ .unwrap();
+ let file_count: usize = plan.splits().iter().map(|s|
s.data_files().len()).sum();
+ assert_eq!(file_count, 1, "scan_all_files should find 1 postpone file");
+
+ // Normal SELECT should return 0 rows (postpone files are invisible)
+ let count = row_count(&handler, "SELECT * FROM
paimon.test_db.t_postpone").await;
+ assert_eq!(count, 0, "SELECT should return 0 rows for postpone table");
+}
+
+/// INSERT OVERWRITE on a postpone table should replace old files with new
ones.
+#[tokio::test]
+async fn test_postpone_insert_overwrite() {
+ let (_tmp, catalog) = create_test_env();
+ let handler = create_handler(catalog.clone());
+ handler
+ .sql("CREATE SCHEMA paimon.test_db")
+ .await
+ .expect("CREATE SCHEMA failed");
+
+ handler
+ .sql(
+ "CREATE TABLE paimon.test_db.t_postpone_ow (
+ id INT NOT NULL, value INT,
+ PRIMARY KEY (id)
+ ) WITH ('bucket' = '-2')",
+ )
+ .await
+ .unwrap();
+
+ // First commit
+ handler
+ .sql("INSERT INTO paimon.test_db.t_postpone_ow VALUES (1, 10), (2,
20)")
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ let table = catalog
+ .get_table(&Identifier::new("test_db", "t_postpone_ow"))
+ .await
+ .unwrap();
+ let plan = table
+ .new_read_builder()
+ .new_scan()
+ .with_scan_all_files()
+ .plan()
+ .await
+ .unwrap();
+ let file_count: usize = plan.splits().iter().map(|s|
s.data_files().len()).sum();
+ assert_eq!(file_count, 1, "After INSERT: 1 postpone file");
+
+ // INSERT OVERWRITE should replace old file
+ handler
+ .sql("INSERT OVERWRITE paimon.test_db.t_postpone_ow VALUES (3, 30)")
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ let table = catalog
+ .get_table(&Identifier::new("test_db", "t_postpone_ow"))
+ .await
+ .unwrap();
+ let plan = table
+ .new_read_builder()
+ .new_scan()
+ .with_scan_all_files()
+ .plan()
+ .await
+ .unwrap();
+ let file_count: usize = plan.splits().iter().map(|s|
s.data_files().len()).sum();
+ assert_eq!(
+ file_count, 1,
+ "After OVERWRITE: only 1 new file (old file deleted)"
+ );
+}
diff --git a/crates/paimon/src/spec/core_options.rs
b/crates/paimon/src/spec/core_options.rs
index ae9d7cc..85aa801 100644
--- a/crates/paimon/src/spec/core_options.rs
+++ b/crates/paimon/src/spec/core_options.rs
@@ -28,6 +28,11 @@ const BUCKET_KEY_OPTION: &str = "bucket-key";
const BUCKET_FUNCTION_TYPE_OPTION: &str = "bucket-function.type";
const BUCKET_OPTION: &str = "bucket";
const DEFAULT_BUCKET: i32 = -1;
+/// Postpone bucket mode: data is written to `bucket-postpone` directory
+/// and is invisible to readers until compaction assigns real bucket numbers.
+pub const POSTPONE_BUCKET: i32 = -2;
+/// Directory name for postpone bucket files.
+pub const POSTPONE_BUCKET_DIR: &str = "bucket-postpone";
const COMMIT_MAX_RETRIES_OPTION: &str = "commit.max-retries";
const COMMIT_TIMEOUT_OPTION: &str = "commit.timeout";
const COMMIT_MIN_RETRY_WAIT_OPTION: &str = "commit.min-retry-wait";
@@ -63,6 +68,16 @@ pub enum MergeEngine {
FirstRow,
}
+/// Format the bucket directory name for a given bucket number.
+/// Returns `"bucket-postpone"` for `POSTPONE_BUCKET` (-2), otherwise
`"bucket-{N}"`.
+pub fn bucket_dir_name(bucket: i32) -> String {
+ if bucket == POSTPONE_BUCKET {
+ POSTPONE_BUCKET_DIR.to_string()
+ } else {
+ format!("bucket-{bucket}")
+ }
+}
+
/// Typed accessors for common table options.
///
/// This mirrors pypaimon's `CoreOptions` pattern while staying lightweight.
diff --git a/crates/paimon/src/table/data_file_writer.rs
b/crates/paimon/src/table/data_file_writer.rs
index cfbc55a..e58e0e1 100644
--- a/crates/paimon/src/table/data_file_writer.rs
+++ b/crates/paimon/src/table/data_file_writer.rs
@@ -25,7 +25,7 @@
use crate::arrow::format::{create_format_writer, FormatFileWriter};
use crate::io::FileIO;
use crate::spec::stats::BinaryTableStats;
-use crate::spec::{DataFileMeta, EMPTY_SERIALIZED_ROW};
+use crate::spec::{bucket_dir_name, DataFileMeta, EMPTY_SERIALIZED_ROW};
use crate::Result;
use arrow_array::RecordBatch;
use chrono::Utc;
@@ -133,11 +133,13 @@ impl DataFileWriter {
);
let bucket_dir = if self.partition_path.is_empty() {
- format!("{}/bucket-{}", self.table_location, self.bucket)
+ format!("{}/{}", self.table_location, bucket_dir_name(self.bucket))
} else {
format!(
- "{}/{}/bucket-{}",
- self.table_location, self.partition_path, self.bucket
+ "{}/{}/{}",
+ self.table_location,
+ self.partition_path,
+ bucket_dir_name(self.bucket)
)
};
self.file_io.mkdirs(&format!("{bucket_dir}/")).await?;
diff --git a/crates/paimon/src/table/kv_file_writer.rs
b/crates/paimon/src/table/kv_file_writer.rs
index 3309820..ef74015 100644
--- a/crates/paimon/src/table/kv_file_writer.rs
+++ b/crates/paimon/src/table/kv_file_writer.rs
@@ -74,9 +74,6 @@ pub(crate) struct KeyValueWriteConfig {
pub sequence_field_indices: Vec<usize>,
/// Merge engine for deduplication.
pub merge_engine: MergeEngine,
- /// Column index in user schema that provides the row kind value.
- /// Resolved from: `rowkind.field` option > `_VALUE_KIND` column > None
(all INSERT).
- pub value_kind_col_index: Option<usize>,
}
impl KeyValueFileWriter {
@@ -200,23 +197,8 @@ impl KeyValueFileWriter {
let min_key = self.extract_key_binary_row(&combined, first_row)?;
let max_key = self.extract_key_binary_row(&combined, last_row)?;
- // Build physical schema (thin-mode): [_SEQUENCE_NUMBER, _VALUE_KIND,
all_user_cols...]
- let user_fields = user_schema.fields();
- let mut physical_fields: Vec<Arc<ArrowField>> = Vec::new();
- physical_fields.push(Arc::new(ArrowField::new(
- SEQUENCE_NUMBER_FIELD_NAME,
- ArrowDataType::Int64,
- false,
- )));
- physical_fields.push(Arc::new(ArrowField::new(
- VALUE_KIND_FIELD_NAME,
- ArrowDataType::Int8,
- false,
- )));
- for field in user_fields.iter() {
- physical_fields.push(field.clone());
- }
- let physical_schema = Arc::new(ArrowSchema::new(physical_fields));
+ // Build physical schema and open writer.
+ let physical_schema = build_physical_schema(&user_schema);
// Open parquet writer.
let file_name = format!(
@@ -262,8 +244,13 @@ impl KeyValueFileWriter {
},
)?,
);
- // Value kind column.
- match self.config.value_kind_col_index {
+ // Value kind column — resolve from batch schema.
+ let vk_idx = combined
+ .schema()
+ .fields()
+ .iter()
+ .position(|f| f.name() == crate::spec::VALUE_KIND_FIELD_NAME);
+ match vk_idx {
Some(vk_idx) => {
physical_columns.push(
arrow_select::take::take(
@@ -282,8 +269,11 @@ impl KeyValueFileWriter {
physical_columns.push(Arc::new(Int8Array::from(vec![0i8;
chunk_len])));
}
}
- // All user columns.
+ // All user columns (skip _VALUE_KIND if present — already handled
above).
for idx in 0..combined.num_columns() {
+ if Some(idx) == vk_idx {
+ continue;
+ }
physical_columns.push(
arrow_select::take::take(combined.column(idx).as_ref(),
&chunk_indices, None)
.map_err(|e| crate::Error::DataInvalid {
@@ -459,3 +449,24 @@ impl KeyValueFileWriter {
Ok(builder.build_serialized())
}
}
+
+/// Build the physical schema: [_SEQUENCE_NUMBER, _VALUE_KIND, user_cols
(excluding _VALUE_KIND)...]
+pub(crate) fn build_physical_schema(user_schema: &ArrowSchema) ->
Arc<ArrowSchema> {
+ let mut physical_fields: Vec<Arc<ArrowField>> = Vec::new();
+ physical_fields.push(Arc::new(ArrowField::new(
+ SEQUENCE_NUMBER_FIELD_NAME,
+ ArrowDataType::Int64,
+ false,
+ )));
+ physical_fields.push(Arc::new(ArrowField::new(
+ VALUE_KIND_FIELD_NAME,
+ ArrowDataType::Int8,
+ false,
+ )));
+ for field in user_schema.fields().iter() {
+ if field.name() != VALUE_KIND_FIELD_NAME {
+ physical_fields.push(field.clone());
+ }
+ }
+ Arc::new(ArrowSchema::new(physical_fields))
+}
diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs
index 8fb8ef3..e63beaf 100644
--- a/crates/paimon/src/table/mod.rs
+++ b/crates/paimon/src/table/mod.rs
@@ -29,6 +29,7 @@ mod full_text_search_builder;
pub(crate) mod global_index_scanner;
mod kv_file_reader;
mod kv_file_writer;
+mod postpone_file_writer;
mod read_builder;
pub(crate) mod rest_env;
pub(crate) mod row_id_predicate;
diff --git a/crates/paimon/src/table/postpone_file_writer.rs
b/crates/paimon/src/table/postpone_file_writer.rs
new file mode 100644
index 0000000..af7cb06
--- /dev/null
+++ b/crates/paimon/src/table/postpone_file_writer.rs
@@ -0,0 +1,305 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Postpone bucket file writer for primary-key tables with `bucket = -2`.
+//!
+//! Writes data in KV format (`_SEQUENCE_NUMBER`, `_VALUE_KIND` + user columns)
+//! but without sorting or deduplication — compaction assigns real buckets
later.
+//!
+//! Uses a special file naming prefix: `data-u-{commitUser}-s-0-w-`.
+//!
+//! Reference:
[PostponeBucketWriter](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/sink/PostponeBucketWriter.java)
+
+use crate::arrow::format::{create_format_writer, FormatFileWriter};
+use crate::io::FileIO;
+use crate::spec::stats::BinaryTableStats;
+use crate::spec::{bucket_dir_name, DataFileMeta, EMPTY_SERIALIZED_ROW,
VALUE_KIND_FIELD_NAME};
+use crate::table::kv_file_writer::build_physical_schema;
+use crate::Result;
+use arrow_array::{Int64Array, Int8Array, RecordBatch};
+use chrono::{DateTime, Utc};
+use std::sync::Arc;
+use tokio::task::JoinSet;
+
+/// Configuration for [`PostponeFileWriter`].
+pub(crate) struct PostponeWriteConfig {
+ pub table_location: String,
+ pub partition_path: String,
+ pub bucket: i32,
+ pub schema_id: i64,
+ pub target_file_size: i64,
+ pub file_compression: String,
+ pub file_compression_zstd_level: i32,
+ pub write_buffer_size: i64,
+ /// Data file name prefix: `"data-u-{commitUser}-s-0-w-"`.
+ pub data_file_prefix: String,
+}
+
+/// Writer for postpone bucket mode (`bucket = -2`).
+///
+/// Streams data directly to a FormatFileWriter in arrival order (no
sort/dedup),
+/// prepending `_SEQUENCE_NUMBER` and `_VALUE_KIND` columns to each batch.
+/// Rolls to a new file when `target_file_size` is reached.
+pub(crate) struct PostponeFileWriter {
+ file_io: FileIO,
+ config: PostponeWriteConfig,
+ next_sequence_number: i64,
+ current_writer: Option<Box<dyn FormatFileWriter>>,
+ current_file_name: Option<String>,
+ current_row_count: i64,
+ /// Sequence number at which the current file started.
+ current_file_start_seq: i64,
+ /// Timestamp captured when the current file was opened (used for
deterministic replay order).
+ current_file_creation_time: DateTime<Utc>,
+ written_files: Vec<DataFileMeta>,
+ /// Background file close tasks spawned during rolling.
+ in_flight_closes: JoinSet<Result<DataFileMeta>>,
+}
+
+impl PostponeFileWriter {
+ pub(crate) fn new(file_io: FileIO, config: PostponeWriteConfig) -> Self {
+ Self {
+ file_io,
+ config,
+ next_sequence_number: 0,
+ current_writer: None,
+ current_file_name: None,
+ current_row_count: 0,
+ current_file_start_seq: 0,
+ current_file_creation_time: Utc::now(),
+ written_files: Vec::new(),
+ in_flight_closes: JoinSet::new(),
+ }
+ }
+
+ pub(crate) async fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+ if batch.num_rows() == 0 {
+ return Ok(());
+ }
+
+ if self.current_writer.is_none() {
+ self.open_new_file(batch.schema()).await?;
+ }
+
+ let num_rows = batch.num_rows();
+ let start_seq = self.next_sequence_number;
+ let end_seq = start_seq + num_rows as i64 - 1;
+ self.next_sequence_number = end_seq + 1;
+
+ // Build physical batch: [_SEQUENCE_NUMBER, _VALUE_KIND,
all_user_cols...]
+ let mut physical_columns: Vec<Arc<dyn arrow_array::Array>> =
Vec::new();
+ physical_columns.push(Arc::new(Int64Array::from(
+ (start_seq..=end_seq).collect::<Vec<_>>(),
+ )));
+ let vk_idx = batch
+ .schema()
+ .fields()
+ .iter()
+ .position(|f| f.name() == VALUE_KIND_FIELD_NAME);
+ match vk_idx {
+ Some(vk_idx) =>
physical_columns.push(batch.column(vk_idx).clone()),
+ None => physical_columns.push(Arc::new(Int8Array::from(vec![0i8;
num_rows]))),
+ }
+ // All user columns (skip _VALUE_KIND if present — already handled
above).
+ for (i, col) in batch.columns().iter().enumerate() {
+ if Some(i) == vk_idx {
+ continue;
+ }
+ physical_columns.push(col.clone());
+ }
+
+ let physical_schema = build_physical_schema(&batch.schema());
+ let physical_batch =
+ RecordBatch::try_new(physical_schema,
physical_columns).map_err(|e| {
+ crate::Error::DataInvalid {
+ message: format!("Failed to create physical batch: {e}"),
+ source: None,
+ }
+ })?;
+
+ self.current_row_count += num_rows as i64;
+ self.current_writer
+ .as_mut()
+ .unwrap()
+ .write(&physical_batch)
+ .await?;
+
+ // Roll to a new file if target size is reached — close in background
+ if self.current_writer.as_ref().unwrap().num_bytes() as i64 >=
self.config.target_file_size
+ {
+ self.roll_file();
+ }
+
+ // Flush row group if in-progress buffer exceeds write_buffer_size
+ if let Some(w) = self.current_writer.as_mut() {
+ if w.in_progress_size() as i64 >= self.config.write_buffer_size {
+ w.flush().await?;
+ }
+ }
+
+ Ok(())
+ }
+
+ pub(crate) async fn prepare_commit(&mut self) -> Result<Vec<DataFileMeta>>
{
+ self.close_current_file().await?;
+ while let Some(result) = self.in_flight_closes.join_next().await {
+ let meta = result.map_err(|e| crate::Error::DataInvalid {
+ message: format!("Background file close task panicked: {e}"),
+ source: None,
+ })??;
+ self.written_files.push(meta);
+ }
+ Ok(std::mem::take(&mut self.written_files))
+ }
+
+ /// Spawn the current writer's close in the background for non-blocking
rolling.
+ fn roll_file(&mut self) {
+ let writer = match self.current_writer.take() {
+ Some(w) => w,
+ None => return,
+ };
+ let file_name = self.current_file_name.take().unwrap();
+ let row_count = self.current_row_count;
+ let min_seq = self.current_file_start_seq;
+ let max_seq = self.next_sequence_number - 1;
+ self.current_row_count = 0;
+ let schema_id = self.config.schema_id;
+ // Capture creation_time from when the file was opened, not when the
async close finishes.
+ // Java's postpone compaction sorts by creationTime for replay order.
+ let creation_time = self.current_file_creation_time;
+
+ self.in_flight_closes.spawn(async move {
+ let file_size = writer.close().await? as i64;
+ Ok(build_meta(
+ file_name,
+ file_size,
+ row_count,
+ min_seq,
+ max_seq,
+ schema_id,
+ creation_time,
+ ))
+ });
+ }
+
+ async fn open_new_file(&mut self, user_schema: arrow_schema::SchemaRef) ->
Result<()> {
+ let file_name = format!(
+ "{}{}-{}.parquet",
+ self.config.data_file_prefix,
+ uuid::Uuid::new_v4(),
+ self.written_files.len()
+ );
+ let bucket_dir = if self.config.partition_path.is_empty() {
+ format!(
+ "{}/{}",
+ self.config.table_location,
+ bucket_dir_name(self.config.bucket)
+ )
+ } else {
+ format!(
+ "{}/{}/{}",
+ self.config.table_location,
+ self.config.partition_path,
+ bucket_dir_name(self.config.bucket)
+ )
+ };
+ self.file_io.mkdirs(&format!("{bucket_dir}/")).await?;
+ let physical_schema = build_physical_schema(&user_schema);
+ let file_path = format!("{bucket_dir}/{file_name}");
+ let output = self.file_io.new_output(&file_path)?;
+ let writer = create_format_writer(
+ &output,
+ physical_schema,
+ &self.config.file_compression,
+ self.config.file_compression_zstd_level,
+ )
+ .await?;
+ self.current_writer = Some(writer);
+ self.current_file_name = Some(file_name);
+ self.current_row_count = 0;
+ self.current_file_start_seq = self.next_sequence_number;
+ self.current_file_creation_time = Utc::now();
+ Ok(())
+ }
+
+ async fn close_current_file(&mut self) -> Result<()> {
+ let writer = match self.current_writer.take() {
+ Some(w) => w,
+ None => return Ok(()),
+ };
+ let file_name = self.current_file_name.take().unwrap();
+ let row_count = self.current_row_count;
+ self.current_row_count = 0;
+ let file_size = writer.close().await? as i64;
+
+ let min_seq = self.current_file_start_seq;
+ let max_seq = self.next_sequence_number - 1;
+
+ let meta = build_meta(
+ file_name,
+ file_size,
+ row_count,
+ min_seq,
+ max_seq,
+ self.config.schema_id,
+ self.current_file_creation_time,
+ );
+ self.written_files.push(meta);
+ Ok(())
+ }
+}
+
+fn build_meta(
+ file_name: String,
+ file_size: i64,
+ row_count: i64,
+ min_seq: i64,
+ max_seq: i64,
+ schema_id: i64,
+ creation_time: DateTime<Utc>,
+) -> DataFileMeta {
+ DataFileMeta {
+ file_name,
+ file_size,
+ row_count,
+ min_key: EMPTY_SERIALIZED_ROW.clone(),
+ max_key: EMPTY_SERIALIZED_ROW.clone(),
+ key_stats: BinaryTableStats::new(
+ EMPTY_SERIALIZED_ROW.clone(),
+ EMPTY_SERIALIZED_ROW.clone(),
+ vec![],
+ ),
+ value_stats: BinaryTableStats::new(
+ EMPTY_SERIALIZED_ROW.clone(),
+ EMPTY_SERIALIZED_ROW.clone(),
+ vec![],
+ ),
+ min_sequence_number: min_seq,
+ max_sequence_number: max_seq,
+ schema_id,
+ level: 0,
+ extra_files: vec![],
+ creation_time: Some(creation_time),
+ delete_row_count: Some(0),
+ embedded_index: None,
+ file_source: Some(0), // FileSource.APPEND
+ value_stats_cols: Some(vec![]),
+ external_path: None,
+ first_row_id: None,
+ write_cols: None,
+ }
+}
diff --git a/crates/paimon/src/table/table_scan.rs
b/crates/paimon/src/table/table_scan.rs
index 9b2a7e5..e71aae0 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -30,8 +30,9 @@ use super::Table;
use crate::io::FileIO;
use crate::predicate_stats::data_leaf_may_match;
use crate::spec::{
- eval_row, BinaryRow, CoreOptions, DataField, DataFileMeta, FileKind,
IndexManifest,
- ManifestEntry, ManifestFileMeta, PartitionComputer, Predicate, Snapshot,
TimeTravelSelector,
+ bucket_dir_name, eval_row, BinaryRow, CoreOptions, DataField,
DataFileMeta, FileKind,
+ IndexManifest, ManifestEntry, ManifestFileMeta, PartitionComputer,
Predicate, Snapshot,
+ TimeTravelSelector,
};
use crate::table::bin_pack::split_for_batch;
use crate::table::source::{
@@ -135,6 +136,7 @@ async fn read_all_manifest_entries(
table_path: &str,
snapshot: &Snapshot,
skip_level_zero: bool,
+ scan_all_files: bool,
has_primary_keys: bool,
partition_predicate: Option<&Predicate>,
partition_fields: &[DataField],
@@ -173,7 +175,7 @@ async fn read_all_manifest_entries(
if skip_level_zero && has_primary_keys &&
entry.file().level == 0 {
return false;
}
- if has_primary_keys && entry.bucket() < 0 {
+ if has_primary_keys && !scan_all_files &&
entry.bucket() < 0 {
return false;
}
if let Some(pred) = bucket_predicate {
@@ -537,6 +539,7 @@ impl<'a> TableScan<'a> {
table_path,
snapshot,
skip_level_zero,
+ self.scan_all_files,
has_primary_keys,
self.partition_predicate.as_ref(),
&partition_fields,
@@ -673,9 +676,9 @@ impl<'a> TableScan<'a> {
let bucket_path = if let Some(ref computer) = partition_computer {
let partition_path =
computer.generate_partition_path(&partition_row)?;
- format!("{base_path}/{partition_path}bucket-{bucket}")
+ format!("{base_path}/{partition_path}{}",
bucket_dir_name(bucket))
} else {
- format!("{base_path}/bucket-{bucket}")
+ format!("{base_path}/{}", bucket_dir_name(bucket))
};
// Original `partition` Vec consumed by PartitionBucket for DV map
lookup.
diff --git a/crates/paimon/src/table/table_write.rs
b/crates/paimon/src/table/table_write.rs
index de7e0fa..5c6145d 100644
--- a/crates/paimon/src/table/table_write.rs
+++ b/crates/paimon/src/table/table_write.rs
@@ -24,11 +24,12 @@ use crate::spec::DataFileMeta;
use crate::spec::PartitionComputer;
use crate::spec::{
extract_datum_from_arrow, BinaryRow, BinaryRowBuilder, CoreOptions,
DataField, DataType, Datum,
- MergeEngine, Predicate, PredicateBuilder, EMPTY_SERIALIZED_ROW,
+ MergeEngine, Predicate, PredicateBuilder, EMPTY_SERIALIZED_ROW,
POSTPONE_BUCKET,
};
use crate::table::commit_message::CommitMessage;
use crate::table::data_file_writer::DataFileWriter;
use crate::table::kv_file_writer::{KeyValueFileWriter, KeyValueWriteConfig};
+use crate::table::postpone_file_writer::{PostponeFileWriter,
PostponeWriteConfig};
use crate::table::{SnapshotManager, Table, TableScan};
use crate::Result;
use arrow_array::RecordBatch;
@@ -43,10 +44,11 @@ fn schema_contains_blob_type(fields: &[DataField]) -> bool {
.any(|field| field.data_type().contains_blob_type())
}
-/// Enum to hold either an append-only writer or a key-value writer.
+/// Enum to hold either an append-only writer, a key-value writer, or a
postpone writer.
enum FileWriter {
Append(DataFileWriter),
KeyValue(KeyValueFileWriter),
+ Postpone(PostponeFileWriter),
}
impl FileWriter {
@@ -54,6 +56,7 @@ impl FileWriter {
match self {
FileWriter::Append(w) => w.write(batch).await,
FileWriter::KeyValue(w) => w.write(batch).await,
+ FileWriter::Postpone(w) => w.write(batch).await,
}
}
@@ -61,6 +64,7 @@ impl FileWriter {
match self {
FileWriter::Append(ref mut w) => w.prepare_commit().await,
FileWriter::KeyValue(ref mut w) => w.prepare_commit().await,
+ FileWriter::Postpone(ref mut w) => w.prepare_commit().await,
}
}
}
@@ -95,14 +99,14 @@ pub struct TableWrite {
sequence_field_indices: Vec<usize>,
/// Merge engine for primary-key tables.
merge_engine: MergeEngine,
- /// Column index in user schema for row kind (resolved from rowkind.field
or _VALUE_KIND).
- value_kind_col_index: Option<usize>,
/// Cache of per-partition bucket→max_sequence_number, lazily populated on
first write.
partition_seq_cache: HashMap<Vec<u8>, HashMap<i32, i64>>,
+ /// Commit user identifier, used for postpone file naming.
+ commit_user: String,
}
impl TableWrite {
- pub(crate) fn new(table: &Table) -> crate::Result<Self> {
+ pub(crate) fn new(table: &Table, commit_user: String) ->
crate::Result<Self> {
let schema = table.schema();
let core_options = CoreOptions::new(schema.options());
@@ -124,16 +128,17 @@ impl TableWrite {
let has_primary_keys = !schema.primary_keys().is_empty();
if has_primary_keys {
- if total_buckets < 1 {
+ if total_buckets < 1 && total_buckets != POSTPONE_BUCKET {
return Err(crate::Error::Unsupported {
message: format!(
- "KeyValueFileWriter does not support
bucket={total_buckets}, only fixed bucket (>= 1) is supported"
+ "KeyValueFileWriter does not support
bucket={total_buckets}, only fixed bucket (>= 1) or postpone bucket (= -2) is
supported"
),
});
}
- if core_options
- .changelog_producer()
- .eq_ignore_ascii_case("input")
+ if total_buckets != POSTPONE_BUCKET
+ && core_options
+ .changelog_producer()
+ .eq_ignore_ascii_case("input")
{
return Err(crate::Error::Unsupported {
message: "KeyValueFileWriter does not support
changelog-producer=input"
@@ -200,11 +205,6 @@ impl TableWrite {
});
}
- // Resolve value_kind column from _VALUE_KIND in user schema, if
present.
- let value_kind_col_index = fields
- .iter()
- .position(|f| f.name() == crate::spec::VALUE_KIND_FIELD_NAME);
-
Ok(Self {
table: table.clone(),
partition_writers: HashMap::new(),
@@ -222,8 +222,8 @@ impl TableWrite {
primary_key_types,
sequence_field_indices,
merge_engine,
- value_kind_col_index,
partition_seq_cache: HashMap::new(),
+ commit_user,
})
}
@@ -297,7 +297,15 @@ impl TableWrite {
) -> Result<Vec<(PartitionBucketKey, RecordBatch)>> {
// Fast path: no partitions and single bucket — skip per-row routing
if self.partition_field_indices.is_empty() && self.total_buckets <= 1 {
- return Ok(vec![((EMPTY_SERIALIZED_ROW.clone(), 0),
batch.clone())]);
+ let bucket = if self.total_buckets == POSTPONE_BUCKET {
+ POSTPONE_BUCKET
+ } else {
+ 0
+ };
+ return Ok(vec![(
+ (EMPTY_SERIALIZED_ROW.clone(), bucket),
+ batch.clone(),
+ )]);
}
let fields = self.table.schema().fields();
@@ -412,7 +420,9 @@ impl TableWrite {
};
// Compute bucket
- let bucket = if self.total_buckets <= 1 ||
self.bucket_key_indices.is_empty() {
+ let bucket = if self.total_buckets == POSTPONE_BUCKET {
+ POSTPONE_BUCKET
+ } else if self.total_buckets <= 1 ||
self.bucket_key_indices.is_empty() {
0
} else {
let mut datums: Vec<(Option<Datum>, DataType)> = Vec::new();
@@ -438,6 +448,7 @@ impl TableWrite {
};
let writer = if self.primary_key_indices.is_empty() {
+ // Append-only writer for non-PK tables.
FileWriter::Append(DataFileWriter::new(
self.table.file_io().clone(),
self.table.location().to_string(),
@@ -452,6 +463,23 @@ impl TableWrite {
None, // first_row_id: assigned by commit
None, // write_cols: full-row write
))
+ } else if bucket == POSTPONE_BUCKET {
+ // Postpone bucket: KV format but no sorting/dedup, special file
naming.
+ let data_file_prefix = format!("data-u-{}-s-0-w-",
self.commit_user);
+ FileWriter::Postpone(PostponeFileWriter::new(
+ self.table.file_io().clone(),
+ PostponeWriteConfig {
+ table_location: self.table.location().to_string(),
+ partition_path,
+ bucket,
+ schema_id: self.schema_id,
+ target_file_size: self.target_file_size,
+ file_compression: self.file_compression.clone(),
+ file_compression_zstd_level:
self.file_compression_zstd_level,
+ write_buffer_size: self.write_buffer_size,
+ data_file_prefix,
+ },
+ ))
} else {
// Lazily scan partition sequence numbers on first writer creation
per partition.
if !self.partition_seq_cache.contains_key(&partition_bytes) {
@@ -481,7 +509,6 @@ impl TableWrite {
primary_key_types: self.primary_key_types.clone(),
sequence_field_indices:
self.sequence_field_indices.clone(),
merge_engine: self.merge_engine,
- value_kind_col_index: self.value_kind_col_index,
},
next_seq,
))
@@ -504,6 +531,7 @@ mod tests {
};
use crate::table::{SnapshotManager, TableCommit};
use arrow_array::Int32Array;
+ use arrow_array::RecordBatchReader as _;
use arrow_schema::{
DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema,
TimeUnit,
};
@@ -625,7 +653,7 @@ mod tests {
setup_dirs(&file_io, table_path).await;
let table = test_table(&file_io, table_path);
- let mut table_write = TableWrite::new(&table).unwrap();
+ let mut table_write = TableWrite::new(&table,
"test-user".to_string()).unwrap();
let batch = make_batch(vec![1, 2, 3], vec![10, 20, 30]);
table_write.write_arrow_batch(&batch).await.unwrap();
@@ -656,7 +684,9 @@ mod tests {
None,
);
- let err = TableWrite::new(&table).err().unwrap();
+ let err = TableWrite::new(&table, "test-user".to_string())
+ .err()
+ .unwrap();
assert!(
matches!(err, crate::Error::Unsupported { message } if
message.contains("BlobType"))
);
@@ -672,7 +702,9 @@ mod tests {
None,
);
- let err = TableWrite::new(&table).err().unwrap();
+ let err = TableWrite::new(&table, "test-user".to_string())
+ .err()
+ .unwrap();
assert!(
matches!(err, crate::Error::Unsupported { message } if
message.contains("BlobType"))
);
@@ -685,7 +717,7 @@ mod tests {
setup_dirs(&file_io, table_path).await;
let table = test_partitioned_table(&file_io, table_path);
- let mut table_write = TableWrite::new(&table).unwrap();
+ let mut table_write = TableWrite::new(&table,
"test-user".to_string()).unwrap();
let batch = make_partitioned_batch(vec!["a", "b", "a"], vec![1, 2, 3]);
table_write.write_arrow_batch(&batch).await.unwrap();
@@ -716,7 +748,7 @@ mod tests {
let file_io = test_file_io();
let table_path = "memory:/test_table_write_empty";
let table = test_table(&file_io, table_path);
- let mut table_write = TableWrite::new(&table).unwrap();
+ let mut table_write = TableWrite::new(&table,
"test-user".to_string()).unwrap();
let batch = make_batch(vec![], vec![]);
table_write.write_arrow_batch(&batch).await.unwrap();
@@ -732,7 +764,7 @@ mod tests {
setup_dirs(&file_io, table_path).await;
let table = test_table(&file_io, table_path);
- let mut table_write = TableWrite::new(&table).unwrap();
+ let mut table_write = TableWrite::new(&table,
"test-user".to_string()).unwrap();
// First write + prepare_commit
table_write
@@ -764,7 +796,7 @@ mod tests {
setup_dirs(&file_io, table_path).await;
let table = test_table(&file_io, table_path);
- let mut table_write = TableWrite::new(&table).unwrap();
+ let mut table_write = TableWrite::new(&table,
"test-user".to_string()).unwrap();
table_write
.write_arrow_batch(&make_batch(vec![1, 2], vec![10, 20]))
@@ -828,7 +860,7 @@ mod tests {
setup_dirs(&file_io, table_path).await;
let table = test_bucketed_table(&file_io, table_path);
- let mut table_write = TableWrite::new(&table).unwrap();
+ let mut table_write = TableWrite::new(&table,
"test-user".to_string()).unwrap();
// Row with NULL bucket key should not panic
let batch = make_nullable_id_batch(vec![None, Some(1), None], vec![10,
20, 30]);
@@ -850,7 +882,7 @@ mod tests {
setup_dirs(&file_io, table_path).await;
let table = test_bucketed_table(&file_io, table_path);
- let mut table_write = TableWrite::new(&table).unwrap();
+ let mut table_write = TableWrite::new(&table,
"test-user".to_string()).unwrap();
// Two NULLs should land in the same bucket
let batch = make_nullable_id_batch(vec![None, None], vec![10, 20]);
@@ -878,7 +910,7 @@ mod tests {
// Compute bucket for NULL key
let fields = table.schema().fields().to_vec();
- let tw = TableWrite::new(&table).unwrap();
+ let tw = TableWrite::new(&table, "test-user".to_string()).unwrap();
let batch_null = make_nullable_id_batch(vec![None], vec![10]);
let (_, bucket_null) = tw
@@ -943,7 +975,7 @@ mod tests {
None,
);
- let tw = TableWrite::new(&table).unwrap();
+ let tw = TableWrite::new(&table, "test-user".to_string()).unwrap();
let fields = table.schema().fields().to_vec();
// Build a batch: d=NULL, ltz=NULL, ntz=NULL, k=1
@@ -1017,7 +1049,7 @@ mod tests {
None,
);
- let mut table_write = TableWrite::new(&table).unwrap();
+ let mut table_write = TableWrite::new(&table,
"test-user".to_string()).unwrap();
// Write multiple batches — each should roll to a new file
table_write
@@ -1070,7 +1102,7 @@ mod tests {
setup_dirs(&file_io, table_path).await;
let table = test_pk_table(&file_io, table_path);
- let mut table_write = TableWrite::new(&table).unwrap();
+ let mut table_write = TableWrite::new(&table,
"test-user".to_string()).unwrap();
let batch = make_batch(vec![3, 1, 2], vec![30, 10, 20]);
table_write.write_arrow_batch(&batch).await.unwrap();
@@ -1105,7 +1137,7 @@ mod tests {
setup_dirs(&file_io, table_path).await;
let table = test_pk_table(&file_io, table_path);
- let mut table_write = TableWrite::new(&table).unwrap();
+ let mut table_write = TableWrite::new(&table,
"test-user".to_string()).unwrap();
// Write unsorted data
let batch = make_batch(vec![5, 2, 4, 1, 3], vec![50, 20, 40, 10, 30]);
@@ -1163,7 +1195,7 @@ mod tests {
let table = test_pk_table(&file_io, table_path);
// First commit: id=1,2,3
- let mut tw1 = TableWrite::new(&table).unwrap();
+ let mut tw1 = TableWrite::new(&table,
"test-user".to_string()).unwrap();
tw1.write_arrow_batch(&make_batch(vec![1, 2, 3], vec![10, 20, 30]))
.await
.unwrap();
@@ -1172,7 +1204,7 @@ mod tests {
commit.commit(msgs1).await.unwrap();
// Second commit: id=2,3,4 with updated values, higher sequence numbers
- let mut tw2 = TableWrite::new(&table).unwrap();
+ let mut tw2 = TableWrite::new(&table,
"test-user".to_string()).unwrap();
tw2.write_arrow_batch(&make_batch(vec![2, 3, 4], vec![200, 300, 400]))
.await
.unwrap();
@@ -1225,7 +1257,7 @@ mod tests {
setup_dirs(&file_io, table_path).await;
let table = test_pk_table(&file_io, table_path);
- let mut table_write = TableWrite::new(&table).unwrap();
+ let mut table_write = TableWrite::new(&table,
"test-user".to_string()).unwrap();
let batch = make_batch(vec![1, 2], vec![10, 20]);
table_write.write_arrow_batch(&batch).await.unwrap();
@@ -1236,4 +1268,274 @@ mod tests {
assert_eq!(file.min_sequence_number, 0);
assert_eq!(file.max_sequence_number, 1);
}
+
+ // -----------------------------------------------------------------------
+ // Postpone bucket (bucket = -2) write tests
+ // -----------------------------------------------------------------------
+
+ fn test_postpone_pk_schema() -> TableSchema {
+ let schema = Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .primary_key(["id"])
+ .option("bucket", "-2")
+ .build()
+ .unwrap();
+ TableSchema::new(0, &schema)
+ }
+
+ fn test_postpone_pk_table(file_io: &FileIO, table_path: &str) -> Table {
+ Table::new(
+ file_io.clone(),
+ Identifier::new("default", "test_postpone_table"),
+ table_path.to_string(),
+ test_postpone_pk_schema(),
+ None,
+ )
+ }
+
+ fn test_postpone_partitioned_schema() -> TableSchema {
+ let schema = Schema::builder()
+ .column("pt", DataType::VarChar(VarCharType::string_type()))
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .primary_key(["pt", "id"])
+ .partition_keys(["pt"])
+ .option("bucket", "-2")
+ .build()
+ .unwrap();
+ TableSchema::new(0, &schema)
+ }
+
+ fn test_postpone_partitioned_table(file_io: &FileIO, table_path: &str) ->
Table {
+ Table::new(
+ file_io.clone(),
+ Identifier::new("default", "test_postpone_table"),
+ table_path.to_string(),
+ test_postpone_partitioned_schema(),
+ None,
+ )
+ }
+
+ fn make_partitioned_batch_3col(pts: Vec<&str>, ids: Vec<i32>, values:
Vec<i32>) -> RecordBatch {
+ let schema = Arc::new(ArrowSchema::new(vec![
+ ArrowField::new("pt", ArrowDataType::Utf8, false),
+ ArrowField::new("id", ArrowDataType::Int32, false),
+ ArrowField::new("value", ArrowDataType::Int32, false),
+ ]));
+ RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(arrow_array::StringArray::from(pts)),
+ Arc::new(Int32Array::from(ids)),
+ Arc::new(Int32Array::from(values)),
+ ],
+ )
+ .unwrap()
+ }
+
+ #[tokio::test]
+ async fn test_postpone_write_and_commit() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_postpone_write";
+ setup_dirs(&file_io, table_path).await;
+
+ let table = test_postpone_pk_table(&file_io, table_path);
+ let mut table_write = TableWrite::new(&table,
"test-user".to_string()).unwrap();
+
+ let batch = make_batch(vec![3, 1, 2], vec![30, 10, 20]);
+ table_write.write_arrow_batch(&batch).await.unwrap();
+
+ let messages = table_write.prepare_commit().await.unwrap();
+ assert_eq!(messages.len(), 1);
+ assert_eq!(messages[0].bucket, POSTPONE_BUCKET);
+ assert_eq!(messages[0].new_files.len(), 1);
+ assert_eq!(messages[0].new_files[0].row_count, 3);
+
+ // Commit and verify snapshot
+ let commit = TableCommit::new(table, "test-user".to_string());
+ commit.commit(messages).await.unwrap();
+
+ let snap_manager = SnapshotManager::new(file_io.clone(),
table_path.to_string());
+ let snapshot =
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+ assert_eq!(snapshot.id(), 1);
+ assert_eq!(snapshot.total_record_count(), Some(3));
+ }
+
+ #[tokio::test]
+ async fn test_postpone_write_empty_batch() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_postpone_empty";
+ let table = test_postpone_pk_table(&file_io, table_path);
+ let mut table_write = TableWrite::new(&table,
"test-user".to_string()).unwrap();
+
+ let batch = make_batch(vec![], vec![]);
+ table_write.write_arrow_batch(&batch).await.unwrap();
+
+ let messages = table_write.prepare_commit().await.unwrap();
+ assert!(messages.is_empty());
+ }
+
+ #[tokio::test]
+ async fn test_postpone_write_multiple_batches() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_postpone_multi";
+ setup_dirs(&file_io, table_path).await;
+
+ let table = test_postpone_pk_table(&file_io, table_path);
+ let mut table_write = TableWrite::new(&table,
"test-user".to_string()).unwrap();
+
+ table_write
+ .write_arrow_batch(&make_batch(vec![1, 2], vec![10, 20]))
+ .await
+ .unwrap();
+ table_write
+ .write_arrow_batch(&make_batch(vec![3, 4], vec![30, 40]))
+ .await
+ .unwrap();
+
+ let messages = table_write.prepare_commit().await.unwrap();
+ assert_eq!(messages.len(), 1);
+ assert_eq!(messages[0].bucket, POSTPONE_BUCKET);
+
+ let total_rows: i64 = messages[0].new_files.iter().map(|f|
f.row_count).sum();
+ assert_eq!(total_rows, 4);
+ }
+
+ #[tokio::test]
+ async fn test_postpone_write_partitioned() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_postpone_partitioned";
+ setup_dirs(&file_io, table_path).await;
+
+ let table = test_postpone_partitioned_table(&file_io, table_path);
+ let mut table_write = TableWrite::new(&table,
"test-user".to_string()).unwrap();
+
+ let batch =
+ make_partitioned_batch_3col(vec!["a", "b", "a"], vec![1, 2, 3],
vec![10, 20, 30]);
+ table_write.write_arrow_batch(&batch).await.unwrap();
+
+ let messages = table_write.prepare_commit().await.unwrap();
+ // 2 partitions
+ assert_eq!(messages.len(), 2);
+ // All messages should use POSTPONE_BUCKET
+ for msg in &messages {
+ assert_eq!(msg.bucket, POSTPONE_BUCKET);
+ }
+
+ let total_rows: i64 = messages
+ .iter()
+ .flat_map(|m| &m.new_files)
+ .map(|f| f.row_count)
+ .sum();
+ assert_eq!(total_rows, 3);
+
+ // Commit and verify
+ let commit = TableCommit::new(table, "test-user".to_string());
+ commit.commit(messages).await.unwrap();
+
+ let snap_manager = SnapshotManager::new(file_io.clone(),
table_path.to_string());
+ let snapshot =
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+ assert_eq!(snapshot.id(), 1);
+ assert_eq!(snapshot.total_record_count(), Some(3));
+ }
+
+ #[tokio::test]
+ async fn test_postpone_write_reusable() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_postpone_reuse";
+ setup_dirs(&file_io, table_path).await;
+
+ let table = test_postpone_pk_table(&file_io, table_path);
+ let mut table_write = TableWrite::new(&table,
"test-user".to_string()).unwrap();
+
+ // First write + prepare_commit
+ table_write
+ .write_arrow_batch(&make_batch(vec![1, 2], vec![10, 20]))
+ .await
+ .unwrap();
+ let messages1 = table_write.prepare_commit().await.unwrap();
+ assert_eq!(messages1.len(), 1);
+ assert_eq!(messages1[0].new_files[0].row_count, 2);
+
+ // Second write + prepare_commit (reuse)
+ table_write
+ .write_arrow_batch(&make_batch(vec![3, 4, 5], vec![30, 40, 50]))
+ .await
+ .unwrap();
+ let messages2 = table_write.prepare_commit().await.unwrap();
+ assert_eq!(messages2.len(), 1);
+ assert_eq!(messages2[0].new_files[0].row_count, 3);
+
+ // Empty prepare_commit
+ let messages3 = table_write.prepare_commit().await.unwrap();
+ assert!(messages3.is_empty());
+ }
+
+ #[tokio::test]
+ async fn test_postpone_write_file_naming_and_kv_format() {
+ let file_io = test_file_io();
+ let table_path = "memory:/test_postpone_kv";
+ setup_dirs(&file_io, table_path).await;
+
+ let table = test_postpone_pk_table(&file_io, table_path);
+ let mut table_write = TableWrite::new(&table,
"my-commit-user".to_string()).unwrap();
+
+ let batch = make_batch(vec![3, 1, 2], vec![30, 10, 20]);
+ table_write.write_arrow_batch(&batch).await.unwrap();
+
+ let messages = table_write.prepare_commit().await.unwrap();
+ let file = &messages[0].new_files[0];
+
+ // Verify postpone file naming:
data-u-{commitUser}-s-{writeId}-w-{uuid}-{index}.parquet
+ assert!(
+ file.file_name.starts_with("data-u-my-commit-user-s-"),
+ "Expected postpone file prefix, got: {}",
+ file.file_name
+ );
+ assert!(
+ file.file_name.contains("-w-"),
+ "Expected -w- in file name, got: {}",
+ file.file_name
+ );
+
+ // Verify KV format: read the parquet file and check physical columns
+ let bucket_dir = format!("{table_path}/bucket-postpone");
+ let file_path = format!("{bucket_dir}/{}", file.file_name);
+ let input = file_io.new_input(&file_path).unwrap();
+ let data = input.read().await.unwrap();
+ let reader =
+
parquet::arrow::arrow_reader::ParquetRecordBatchReader::try_new(data,
1024).unwrap();
+ let schema = reader.schema();
+ // Physical schema: [_SEQUENCE_NUMBER, _VALUE_KIND, id, value]
+ assert_eq!(schema.fields().len(), 4);
+ assert_eq!(schema.field(0).name(), "_SEQUENCE_NUMBER");
+ assert_eq!(schema.field(1).name(), "_VALUE_KIND");
+ assert_eq!(schema.field(2).name(), "id");
+ assert_eq!(schema.field(3).name(), "value");
+
+ // Data should be in arrival order (not sorted by PK): 3, 1, 2
+ let batches: Vec<RecordBatch> = reader.into_iter().map(|r|
r.unwrap()).collect();
+ let ids: Vec<i32> = batches
+ .iter()
+ .flat_map(|b| {
+ b.column(2)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap()
+ .values()
+ .iter()
+ .copied()
+ })
+ .collect();
+ assert_eq!(
+ ids,
+ vec![3, 1, 2],
+ "Postpone mode should preserve arrival order"
+ );
+
+ // Empty key stats for postpone mode
+ assert_eq!(file.min_key, EMPTY_SERIALIZED_ROW.clone());
+ assert_eq!(file.max_key, EMPTY_SERIALIZED_ROW.clone());
+ }
}
diff --git a/crates/paimon/src/table/write_builder.rs
b/crates/paimon/src/table/write_builder.rs
index 57a4820..9f238d6 100644
--- a/crates/paimon/src/table/write_builder.rs
+++ b/crates/paimon/src/table/write_builder.rs
@@ -49,6 +49,6 @@ impl<'a> WriteBuilder<'a> {
/// For primary-key tables, sequence numbers are lazily scanned per
partition
/// when the first writer for that partition is created.
pub fn new_write(&self) -> crate::Result<TableWrite> {
- TableWrite::new(self.table)
+ TableWrite::new(self.table, self.commit_user.clone())
}
}