This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new ecbf458  feat(write): add write pipeline with DataFusion INSERT 
INTO/OVERWRITE support (#234)
ecbf458 is described below

commit ecbf45864177544733818737add939a5f532a58c
Author: Jingsong Lee <[email protected]>
AuthorDate: Sun Apr 12 19:39:24 2026 +0800

    feat(write): add write pipeline with DataFusion INSERT INTO/OVERWRITE 
support (#234)
    
    Add TableWrite for writing Arrow RecordBatches to Paimon append-only
    tables. Each (partition, bucket) pair gets its own DataFileWriter with
    direct writes (matching delta-rs DeltaWriter pattern). File rolling
    uses tokio::spawn for background close, and prepare_commit uses
    try_join_all for parallel finalization across partition writers.
    
    Key components:
    - TableWrite: routes batches by partition/bucket, holds DataFileWriters
    - DataFileWriter: manages parquet file lifecycle with rolling support
    - WriteBuilder: creates TableWrite and TableCommit instances
    - PaimonDataSink: DataFusion DataSink integration for INSERT/OVERWRITE
    - FormatFileWriter: extended with flush() and in_progress_size()
    
    Configurable options via CoreOptions:
    - file.compression (default: zstd)
    - target-file-size (default: 256MB)
    - write.parquet-buffer-size (default: 256MB)
    
    Includes E2E integration tests for unpartitioned, partitioned,
    fixed-bucket, multi-commit, column projection, and bucket filtering.
---
 Cargo.toml                                         |   2 +
 crates/integration_tests/Cargo.toml                |   1 +
 crates/integration_tests/tests/append_tables.rs    | 614 +++++++++++++
 crates/integrations/datafusion/src/lib.rs          |   2 +-
 .../datafusion/src/physical_plan/mod.rs            |   2 +
 .../datafusion/src/physical_plan/sink.rs           | 109 +++
 crates/integrations/datafusion/src/table/mod.rs    | 271 +++++-
 crates/paimon/Cargo.toml                           |   4 +-
 crates/paimon/src/arrow/format/mod.rs              |  47 +-
 crates/paimon/src/arrow/format/parquet.rs          | 173 +++-
 crates/paimon/src/io/file_io.rs                    |  20 +-
 crates/paimon/src/lib.rs                           |   3 +-
 crates/paimon/src/spec/binary_row.rs               | 244 +++--
 crates/paimon/src/spec/core_options.rs             |  61 +-
 crates/paimon/src/spec/partition_utils.rs          |   1 +
 crates/paimon/src/spec/schema.rs                   |  20 +
 crates/paimon/src/table/bucket_filter.rs           |  26 +-
 crates/paimon/src/table/mod.rs                     |   2 +
 crates/paimon/src/table/table_commit.rs            | 209 +++--
 crates/paimon/src/table/table_scan.rs              |  60 ++
 crates/paimon/src/table/table_write.rs             | 998 +++++++++++++++++++++
 crates/paimon/src/table/write_builder.rs           |  24 +-
 22 files changed, 2731 insertions(+), 162 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 5677a5e..83fa44b 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -34,7 +34,9 @@ arrow-buffer = "57.0"
 arrow-schema = "57.0"
 arrow-cast = "57.0"
 arrow-ord = "57.0"
+arrow-select = "57.0"
 datafusion = "52.3.0"
 datafusion-ffi = "52.3.0"
 parquet = "57.0"
 tokio = "1.39.2"
+tokio-util = "0.7"
diff --git a/crates/integration_tests/Cargo.toml 
b/crates/integration_tests/Cargo.toml
index 092ad94..7c60a53 100644
--- a/crates/integration_tests/Cargo.toml
+++ b/crates/integration_tests/Cargo.toml
@@ -26,6 +26,7 @@ homepage.workspace = true
 [dependencies]
 paimon = { path = "../paimon" }
 arrow-array = { workspace = true }
+arrow-schema = { workspace = true }
 tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
 futures = "0.3"
 
diff --git a/crates/integration_tests/tests/append_tables.rs 
b/crates/integration_tests/tests/append_tables.rs
new file mode 100644
index 0000000..b2185de
--- /dev/null
+++ b/crates/integration_tests/tests/append_tables.rs
@@ -0,0 +1,614 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! E2E integration tests for append-only (no primary key) tables.
+//!
+//! Covers: unpartitioned, partitioned, bucket=-1, fixed bucket,
+//! multiple commits, column projection, and bucket predicate filtering.
+
+use arrow_array::{Array, Int32Array, RecordBatch, StringArray};
+use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema as 
ArrowSchema};
+use futures::TryStreamExt;
+use paimon::catalog::Identifier;
+use paimon::io::FileIOBuilder;
+use paimon::spec::{DataType, IntType, Schema, TableSchema, VarCharType};
+use paimon::table::Table;
+use std::sync::Arc;
+
+// ---------------------------------------------------------------------------
+// Helpers
+// ---------------------------------------------------------------------------
+
+fn memory_file_io() -> paimon::io::FileIO {
+    FileIOBuilder::new("memory").build().unwrap()
+}
+
+async fn setup_dirs(file_io: &paimon::io::FileIO, table_path: &str) {
+    file_io
+        .mkdirs(&format!("{table_path}/snapshot/"))
+        .await
+        .unwrap();
+    file_io
+        .mkdirs(&format!("{table_path}/manifest/"))
+        .await
+        .unwrap();
+}
+
+fn make_table(file_io: &paimon::io::FileIO, table_path: &str, schema: 
TableSchema) -> Table {
+    Table::new(
+        file_io.clone(),
+        Identifier::new("default", "test"),
+        table_path.to_string(),
+        schema,
+        None,
+    )
+}
+
+fn int_batch(ids: Vec<i32>, values: Vec<i32>) -> RecordBatch {
+    let schema = Arc::new(ArrowSchema::new(vec![
+        ArrowField::new("id", ArrowDataType::Int32, false),
+        ArrowField::new("value", ArrowDataType::Int32, false),
+    ]));
+    RecordBatch::try_new(
+        schema,
+        vec![
+            Arc::new(Int32Array::from(ids)),
+            Arc::new(Int32Array::from(values)),
+        ],
+    )
+    .unwrap()
+}
+
+fn partitioned_batch(pts: Vec<&str>, ids: Vec<i32>) -> RecordBatch {
+    let schema = Arc::new(ArrowSchema::new(vec![
+        ArrowField::new("pt", ArrowDataType::Utf8, false),
+        ArrowField::new("id", ArrowDataType::Int32, false),
+    ]));
+    RecordBatch::try_new(
+        schema,
+        vec![
+            Arc::new(StringArray::from(pts)),
+            Arc::new(Int32Array::from(ids)),
+        ],
+    )
+    .unwrap()
+}
+
+fn collect_int_col(batches: &[RecordBatch], col: &str) -> Vec<i32> {
+    let mut vals: Vec<i32> = batches
+        .iter()
+        .flat_map(|b| {
+            let idx = b.schema().index_of(col).unwrap();
+            b.column(idx)
+                .as_any()
+                .downcast_ref::<Int32Array>()
+                .unwrap()
+                .values()
+                .to_vec()
+        })
+        .collect();
+    vals.sort();
+    vals
+}
+
+fn collect_string_col(batches: &[RecordBatch], col: &str) -> Vec<String> {
+    let mut vals: Vec<String> = batches
+        .iter()
+        .flat_map(|b| {
+            let idx = b.schema().index_of(col).unwrap();
+            let arr = b
+                .column(idx)
+                .as_any()
+                .downcast_ref::<StringArray>()
+                .unwrap();
+            (0..arr.len())
+                .map(|i| arr.value(i).to_string())
+                .collect::<Vec<_>>()
+        })
+        .collect();
+    vals.sort();
+    vals
+}
+
+/// Write batches → commit → scan → read, return all batches.
+async fn write_commit_read(table: &Table, batches: Vec<RecordBatch>) -> 
Vec<RecordBatch> {
+    let wb = table.new_write_builder();
+    let mut tw = wb.new_write().unwrap();
+    for batch in &batches {
+        tw.write_arrow_batch(batch).await.unwrap();
+    }
+    wb.new_commit()
+        .commit(tw.prepare_commit().await.unwrap())
+        .await
+        .unwrap();
+
+    let rb = table.new_read_builder();
+    let plan = rb.new_scan().plan().await.unwrap();
+    let read = rb.new_read().unwrap();
+    read.to_arrow(plan.splits())
+        .unwrap()
+        .try_collect()
+        .await
+        .unwrap()
+}
+
+// ---------------------------------------------------------------------------
+// Unpartitioned, bucket = -1 (default)
+// ---------------------------------------------------------------------------
+
+fn unpartitioned_schema() -> TableSchema {
+    let schema = Schema::builder()
+        .column("id", DataType::Int(IntType::new()))
+        .column("value", DataType::Int(IntType::new()))
+        .build()
+        .unwrap();
+    TableSchema::new(0, &schema)
+}
+
+#[tokio::test]
+async fn test_unpartitioned_single_batch() {
+    let file_io = memory_file_io();
+    let path = "memory:/append_unpart_single";
+    setup_dirs(&file_io, path).await;
+    let table = make_table(&file_io, path, unpartitioned_schema());
+
+    let result = write_commit_read(&table, vec![int_batch(vec![1, 2, 3], 
vec![10, 20, 30])]).await;
+
+    assert_eq!(collect_int_col(&result, "id"), vec![1, 2, 3]);
+    assert_eq!(collect_int_col(&result, "value"), vec![10, 20, 30]);
+}
+
+#[tokio::test]
+async fn test_unpartitioned_multiple_batches() {
+    let file_io = memory_file_io();
+    let path = "memory:/append_unpart_multi";
+    setup_dirs(&file_io, path).await;
+    let table = make_table(&file_io, path, unpartitioned_schema());
+
+    let result = write_commit_read(
+        &table,
+        vec![
+            int_batch(vec![1, 2], vec![10, 20]),
+            int_batch(vec![3, 4, 5], vec![30, 40, 50]),
+        ],
+    )
+    .await;
+
+    assert_eq!(collect_int_col(&result, "id"), vec![1, 2, 3, 4, 5]);
+}
+
+#[tokio::test]
+async fn test_unpartitioned_two_commits() {
+    let file_io = memory_file_io();
+    let path = "memory:/append_unpart_two_commits";
+    setup_dirs(&file_io, path).await;
+    let table = make_table(&file_io, path, unpartitioned_schema());
+
+    // First commit
+    let wb = table.new_write_builder();
+    let mut tw = wb.new_write().unwrap();
+    tw.write_arrow_batch(&int_batch(vec![1, 2], vec![10, 20]))
+        .await
+        .unwrap();
+    wb.new_commit()
+        .commit(tw.prepare_commit().await.unwrap())
+        .await
+        .unwrap();
+
+    // Second commit
+    let mut tw2 = wb.new_write().unwrap();
+    tw2.write_arrow_batch(&int_batch(vec![3, 4], vec![30, 40]))
+        .await
+        .unwrap();
+    wb.new_commit()
+        .commit(tw2.prepare_commit().await.unwrap())
+        .await
+        .unwrap();
+
+    // Read all
+    let rb = table.new_read_builder();
+    let plan = rb.new_scan().plan().await.unwrap();
+    let read = rb.new_read().unwrap();
+    let result: Vec<RecordBatch> = read
+        .to_arrow(plan.splits())
+        .unwrap()
+        .try_collect()
+        .await
+        .unwrap();
+
+    assert_eq!(collect_int_col(&result, "id"), vec![1, 2, 3, 4]);
+}
+
+#[tokio::test]
+async fn test_unpartitioned_projection() {
+    let file_io = memory_file_io();
+    let path = "memory:/append_unpart_proj";
+    setup_dirs(&file_io, path).await;
+    let table = make_table(&file_io, path, unpartitioned_schema());
+
+    // Write
+    let wb = table.new_write_builder();
+    let mut tw = wb.new_write().unwrap();
+    tw.write_arrow_batch(&int_batch(vec![1, 2, 3], vec![10, 20, 30]))
+        .await
+        .unwrap();
+    wb.new_commit()
+        .commit(tw.prepare_commit().await.unwrap())
+        .await
+        .unwrap();
+
+    // Read with projection
+    let mut rb = table.new_read_builder();
+    rb.with_projection(&["value"]);
+    let plan = rb.new_scan().plan().await.unwrap();
+    let read = rb.new_read().unwrap();
+    let result: Vec<RecordBatch> = read
+        .to_arrow(plan.splits())
+        .unwrap()
+        .try_collect()
+        .await
+        .unwrap();
+
+    assert_eq!(result[0].schema().fields().len(), 1);
+    assert_eq!(result[0].schema().field(0).name(), "value");
+    assert_eq!(collect_int_col(&result, "value"), vec![10, 20, 30]);
+}
+
+// ---------------------------------------------------------------------------
+// Unpartitioned, fixed bucket
+// ---------------------------------------------------------------------------
+
+fn fixed_bucket_schema(buckets: i32) -> TableSchema {
+    let schema = Schema::builder()
+        .column("id", DataType::Int(IntType::new()))
+        .column("value", DataType::Int(IntType::new()))
+        .option("bucket", buckets.to_string())
+        .option("bucket-key", "id")
+        .build()
+        .unwrap();
+    TableSchema::new(0, &schema)
+}
+
+#[tokio::test]
+async fn test_fixed_bucket_write_read() {
+    let file_io = memory_file_io();
+    let path = "memory:/append_fixed_bucket";
+    setup_dirs(&file_io, path).await;
+    let table = make_table(&file_io, path, fixed_bucket_schema(4));
+
+    let result = write_commit_read(
+        &table,
+        vec![int_batch(
+            vec![1, 2, 3, 4, 5, 6, 7, 8],
+            vec![10, 20, 30, 40, 50, 60, 70, 80],
+        )],
+    )
+    .await;
+
+    assert_eq!(collect_int_col(&result, "id"), vec![1, 2, 3, 4, 5, 6, 7, 8]);
+}
+
+#[tokio::test]
+async fn test_fixed_bucket_scan_filters_by_bucket() {
+    use paimon::spec::{Datum, PredicateBuilder};
+
+    let file_io = memory_file_io();
+    let path = "memory:/append_bucket_filter";
+    setup_dirs(&file_io, path).await;
+    let table = make_table(&file_io, path, fixed_bucket_schema(4));
+
+    // Write enough data to spread across buckets
+    let wb = table.new_write_builder();
+    let mut tw = wb.new_write().unwrap();
+    tw.write_arrow_batch(&int_batch(
+        vec![1, 2, 3, 4, 5, 6, 7, 8],
+        vec![10, 20, 30, 40, 50, 60, 70, 80],
+    ))
+    .await
+    .unwrap();
+    wb.new_commit()
+        .commit(tw.prepare_commit().await.unwrap())
+        .await
+        .unwrap();
+
+    // Full scan — should have multiple buckets
+    let full_rb = table.new_read_builder();
+    let full_plan = full_rb.new_scan().plan().await.unwrap();
+    let all_buckets: std::collections::HashSet<i32> =
+        full_plan.splits().iter().map(|s| s.bucket()).collect();
+
+    if all_buckets.len() <= 1 {
+        // All rows hashed to same bucket — can't test filtering
+        return;
+    }
+
+    // Filter by id = 1 — should narrow to one bucket
+    let pb = PredicateBuilder::new(table.schema().fields());
+    let filter = pb.equal("id", Datum::Int(1)).unwrap();
+
+    let mut rb = table.new_read_builder();
+    rb.with_filter(filter);
+    let plan = rb.new_scan().plan().await.unwrap();
+    let filtered_buckets: std::collections::HashSet<i32> =
+        plan.splits().iter().map(|s| s.bucket()).collect();
+
+    assert_eq!(
+        filtered_buckets.len(),
+        1,
+        "Bucket predicate should narrow to one bucket, got: 
{filtered_buckets:?}"
+    );
+    assert!(filtered_buckets.is_subset(&all_buckets));
+
+    // Read and verify id=1 is in the result
+    let read = rb.new_read().unwrap();
+    let result: Vec<RecordBatch> = read
+        .to_arrow(plan.splits())
+        .unwrap()
+        .try_collect()
+        .await
+        .unwrap();
+    let ids = collect_int_col(&result, "id");
+    assert!(ids.contains(&1));
+}
+
+// ---------------------------------------------------------------------------
+// Partitioned, bucket = -1
+// ---------------------------------------------------------------------------
+
+fn partitioned_schema() -> TableSchema {
+    let schema = Schema::builder()
+        .column("pt", DataType::VarChar(VarCharType::string_type()))
+        .column("id", DataType::Int(IntType::new()))
+        .partition_keys(["pt"])
+        .build()
+        .unwrap();
+    TableSchema::new(0, &schema)
+}
+
+#[tokio::test]
+async fn test_partitioned_write_read() {
+    let file_io = memory_file_io();
+    let path = "memory:/append_partitioned";
+    setup_dirs(&file_io, path).await;
+    let table = make_table(&file_io, path, partitioned_schema());
+
+    let result = write_commit_read(
+        &table,
+        vec![partitioned_batch(
+            vec!["a", "b", "a", "b"],
+            vec![1, 2, 3, 4],
+        )],
+    )
+    .await;
+
+    let total: usize = result.iter().map(|b| b.num_rows()).sum();
+    assert_eq!(total, 4);
+    assert_eq!(collect_int_col(&result, "id"), vec![1, 2, 3, 4]);
+    assert_eq!(collect_string_col(&result, "pt"), vec!["a", "a", "b", "b"]);
+}
+
+#[tokio::test]
+async fn test_partitioned_two_commits() {
+    let file_io = memory_file_io();
+    let path = "memory:/append_part_two_commits";
+    setup_dirs(&file_io, path).await;
+    let table = make_table(&file_io, path, partitioned_schema());
+
+    let wb = table.new_write_builder();
+
+    // First commit: partition "a"
+    let mut tw1 = wb.new_write().unwrap();
+    tw1.write_arrow_batch(&partitioned_batch(vec!["a", "a"], vec![1, 2]))
+        .await
+        .unwrap();
+    wb.new_commit()
+        .commit(tw1.prepare_commit().await.unwrap())
+        .await
+        .unwrap();
+
+    // Second commit: partition "b"
+    let mut tw2 = wb.new_write().unwrap();
+    tw2.write_arrow_batch(&partitioned_batch(vec!["b", "b"], vec![3, 4]))
+        .await
+        .unwrap();
+    wb.new_commit()
+        .commit(tw2.prepare_commit().await.unwrap())
+        .await
+        .unwrap();
+
+    // Read all
+    let rb = table.new_read_builder();
+    let plan = rb.new_scan().plan().await.unwrap();
+    let read = rb.new_read().unwrap();
+    let result: Vec<RecordBatch> = read
+        .to_arrow(plan.splits())
+        .unwrap()
+        .try_collect()
+        .await
+        .unwrap();
+
+    assert_eq!(collect_int_col(&result, "id"), vec![1, 2, 3, 4]);
+    assert_eq!(collect_string_col(&result, "pt"), vec!["a", "a", "b", "b"]);
+}
+
+#[tokio::test]
+async fn test_partitioned_scan_partition_filter() {
+    use paimon::spec::{Datum, PredicateBuilder};
+
+    let file_io = memory_file_io();
+    let path = "memory:/append_part_filter";
+    setup_dirs(&file_io, path).await;
+    let table = make_table(&file_io, path, partitioned_schema());
+
+    // Write data to two partitions
+    let wb = table.new_write_builder();
+    let mut tw = wb.new_write().unwrap();
+    tw.write_arrow_batch(&partitioned_batch(
+        vec!["a", "b", "a", "b"],
+        vec![1, 2, 3, 4],
+    ))
+    .await
+    .unwrap();
+    wb.new_commit()
+        .commit(tw.prepare_commit().await.unwrap())
+        .await
+        .unwrap();
+
+    // Filter by pt = "a"
+    let pb = PredicateBuilder::new(table.schema().fields());
+    let filter = pb.equal("pt", Datum::String("a".into())).unwrap();
+
+    let mut rb = table.new_read_builder();
+    rb.with_filter(filter);
+    let plan = rb.new_scan().plan().await.unwrap();
+
+    // Only partition "a" splits should survive
+    for split in plan.splits() {
+        let pt = split.partition().get_string(0).unwrap().to_string();
+        assert_eq!(pt, "a");
+    }
+
+    let read = rb.new_read().unwrap();
+    let result: Vec<RecordBatch> = read
+        .to_arrow(plan.splits())
+        .unwrap()
+        .try_collect()
+        .await
+        .unwrap();
+
+    assert_eq!(collect_int_col(&result, "id"), vec![1, 3]);
+    assert_eq!(collect_string_col(&result, "pt"), vec!["a", "a"]);
+}
+
+// ---------------------------------------------------------------------------
+// Partitioned + fixed bucket
+// ---------------------------------------------------------------------------
+
+fn partitioned_bucket_schema(buckets: i32) -> TableSchema {
+    let schema = Schema::builder()
+        .column("pt", DataType::VarChar(VarCharType::string_type()))
+        .column("id", DataType::Int(IntType::new()))
+        .column("value", DataType::Int(IntType::new()))
+        .partition_keys(["pt"])
+        .option("bucket", buckets.to_string())
+        .option("bucket-key", "id")
+        .build()
+        .unwrap();
+    TableSchema::new(0, &schema)
+}
+
+fn partitioned_value_batch(pts: Vec<&str>, ids: Vec<i32>, values: Vec<i32>) -> 
RecordBatch {
+    let schema = Arc::new(ArrowSchema::new(vec![
+        ArrowField::new("pt", ArrowDataType::Utf8, false),
+        ArrowField::new("id", ArrowDataType::Int32, false),
+        ArrowField::new("value", ArrowDataType::Int32, false),
+    ]));
+    RecordBatch::try_new(
+        schema,
+        vec![
+            Arc::new(StringArray::from(pts)),
+            Arc::new(Int32Array::from(ids)),
+            Arc::new(Int32Array::from(values)),
+        ],
+    )
+    .unwrap()
+}
+
+#[tokio::test]
+async fn test_partitioned_fixed_bucket_write_read() {
+    let file_io = memory_file_io();
+    let path = "memory:/append_part_bucket";
+    setup_dirs(&file_io, path).await;
+    let table = make_table(&file_io, path, partitioned_bucket_schema(2));
+
+    let wb = table.new_write_builder();
+    let mut tw = wb.new_write().unwrap();
+    tw.write_arrow_batch(&partitioned_value_batch(
+        vec!["a", "a", "b", "b"],
+        vec![1, 2, 3, 4],
+        vec![10, 20, 30, 40],
+    ))
+    .await
+    .unwrap();
+    wb.new_commit()
+        .commit(tw.prepare_commit().await.unwrap())
+        .await
+        .unwrap();
+
+    let rb = table.new_read_builder();
+    let plan = rb.new_scan().plan().await.unwrap();
+    let read = rb.new_read().unwrap();
+    let result: Vec<RecordBatch> = read
+        .to_arrow(plan.splits())
+        .unwrap()
+        .try_collect()
+        .await
+        .unwrap();
+
+    assert_eq!(collect_int_col(&result, "id"), vec![1, 2, 3, 4]);
+    assert_eq!(collect_int_col(&result, "value"), vec![10, 20, 30, 40]);
+}
+
+// ---------------------------------------------------------------------------
+// Unsupported: primary key table should be rejected
+// ---------------------------------------------------------------------------
+
+#[tokio::test]
+async fn test_reject_primary_key_table() {
+    let schema = Schema::builder()
+        .column("id", DataType::Int(IntType::new()))
+        .column("value", DataType::Int(IntType::new()))
+        .primary_key(["id"])
+        .build()
+        .unwrap();
+    let table_schema = TableSchema::new(0, &schema);
+
+    let file_io = memory_file_io();
+    let path = "memory:/append_reject_pk";
+    let table = make_table(&file_io, path, table_schema);
+
+    let result = table.new_write_builder().new_write();
+    assert!(result.is_err());
+    let err = result.err().unwrap();
+    assert!(
+        matches!(&err, paimon::Error::Unsupported { message } if 
message.contains("primary keys")),
+        "Expected Unsupported error for PK table, got: {err:?}"
+    );
+}
+
+#[tokio::test]
+async fn test_reject_fixed_bucket_without_bucket_key() {
+    let schema = Schema::builder()
+        .column("id", DataType::Int(IntType::new()))
+        .column("value", DataType::Int(IntType::new()))
+        .option("bucket", "4")
+        .build()
+        .unwrap();
+    let table_schema = TableSchema::new(0, &schema);
+
+    let file_io = memory_file_io();
+    let path = "memory:/append_reject_no_bucket_key";
+    let table = make_table(&file_io, path, table_schema);
+
+    let result = table.new_write_builder().new_write();
+    assert!(result.is_err());
+    let err = result.err().unwrap();
+    assert!(
+        matches!(&err, paimon::Error::Unsupported { message } if 
message.contains("bucket-key")),
+        "Expected Unsupported error for missing bucket-key, got: {err:?}"
+    );
+}
diff --git a/crates/integrations/datafusion/src/lib.rs 
b/crates/integrations/datafusion/src/lib.rs
index abcf744..4e9fdb3 100644
--- a/crates/integrations/datafusion/src/lib.rs
+++ b/crates/integrations/datafusion/src/lib.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Apache Paimon DataFusion Integration (read-only).
+//! Apache Paimon DataFusion Integration.
 //!
 //! Register a Paimon table as a DataFusion table provider to query it with 
SQL or DataFrame API.
 //!
diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs 
b/crates/integrations/datafusion/src/physical_plan/mod.rs
index 48aa546..2fa35bf 100644
--- a/crates/integrations/datafusion/src/physical_plan/mod.rs
+++ b/crates/integrations/datafusion/src/physical_plan/mod.rs
@@ -16,5 +16,7 @@
 // under the License.
 
 pub(crate) mod scan;
+pub(crate) mod sink;
 
 pub use scan::PaimonTableScan;
+pub use sink::PaimonDataSink;
diff --git a/crates/integrations/datafusion/src/physical_plan/sink.rs 
b/crates/integrations/datafusion/src/physical_plan/sink.rs
new file mode 100644
index 0000000..a0c5c9b
--- /dev/null
+++ b/crates/integrations/datafusion/src/physical_plan/sink.rs
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! DataSink implementation for writing to Paimon tables via DataFusion.
+
+use std::any::Any;
+use std::fmt;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
+use datafusion::datasource::sink::DataSink;
+use datafusion::error::Result as DFResult;
+use datafusion::execution::SendableRecordBatchStream;
+use datafusion::execution::TaskContext;
+use datafusion::physical_plan::DisplayAs;
+use futures::StreamExt;
+use paimon::table::Table;
+
+use crate::error::to_datafusion_error;
+
+/// DataSink that writes RecordBatches to a Paimon table.
+///
+/// Uses the Paimon write pipeline: `WriteBuilder` → `TableWrite` → 
`TableCommit`.
+/// Internal parallelism is handled by `TableWrite` which routes each
+/// (partition, bucket) to its own background tokio task.
+#[derive(Debug)]
+pub struct PaimonDataSink {
+    table: Table,
+    schema: ArrowSchemaRef,
+    overwrite: bool,
+}
+
+impl PaimonDataSink {
+    pub fn new(table: Table, schema: ArrowSchemaRef, overwrite: bool) -> Self {
+        Self {
+            table,
+            schema,
+            overwrite,
+        }
+    }
+}
+
+impl DisplayAs for PaimonDataSink {
+    fn fmt_as(
+        &self,
+        _t: datafusion::physical_plan::DisplayFormatType,
+        f: &mut fmt::Formatter,
+    ) -> fmt::Result {
+        write!(f, "PaimonDataSink: table={}", self.table.identifier())
+    }
+}
+
+#[async_trait]
+impl DataSink for PaimonDataSink {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> &ArrowSchemaRef {
+        &self.schema
+    }
+
+    async fn write_all(
+        &self,
+        mut data: SendableRecordBatchStream,
+        _context: &Arc<TaskContext>,
+    ) -> DFResult<u64> {
+        let wb = self.table.new_write_builder();
+        let mut tw = wb.new_write().map_err(to_datafusion_error)?;
+        let mut row_count = 0u64;
+
+        while let Some(batch) = data.next().await {
+            let batch = batch?;
+            row_count += batch.num_rows() as u64;
+            tw.write_arrow_batch(&batch)
+                .await
+                .map_err(to_datafusion_error)?;
+        }
+
+        let messages = tw.prepare_commit().await.map_err(to_datafusion_error)?;
+        let commit = wb.new_commit();
+
+        if self.overwrite {
+            commit
+                .overwrite(messages)
+                .await
+                .map_err(to_datafusion_error)?;
+        } else {
+            commit.commit(messages).await.map_err(to_datafusion_error)?;
+        }
+
+        Ok(row_count)
+    }
+}
diff --git a/crates/integrations/datafusion/src/table/mod.rs 
b/crates/integrations/datafusion/src/table/mod.rs
index 65eb07f..275f943 100644
--- a/crates/integrations/datafusion/src/table/mod.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Paimon table provider for DataFusion (read-only).
+//! Paimon table provider for DataFusion.
 
 use std::any::Any;
 use std::sync::Arc;
@@ -23,12 +23,16 @@ use std::sync::Arc;
 use async_trait::async_trait;
 use datafusion::arrow::datatypes::{Field, Schema, SchemaRef as ArrowSchemaRef};
 use datafusion::catalog::Session;
+use datafusion::datasource::sink::DataSinkExec;
 use datafusion::datasource::{TableProvider, TableType};
 use datafusion::error::Result as DFResult;
+use datafusion::logical_expr::dml::InsertOp;
 use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
 use datafusion::physical_plan::ExecutionPlan;
 use paimon::table::Table;
 
+use crate::physical_plan::PaimonDataSink;
+
 use crate::error::to_datafusion_error;
 use crate::filter_pushdown::{build_pushed_predicate, classify_filter_pushdown};
 use crate::physical_plan::PaimonTableScan;
@@ -178,6 +182,25 @@ impl TableProvider for PaimonTableProvider {
         )
     }
 
+    async fn insert_into(
+        &self,
+        _state: &dyn Session,
+        input: Arc<dyn ExecutionPlan>,
+        insert_op: InsertOp,
+    ) -> DFResult<Arc<dyn ExecutionPlan>> {
+        let overwrite = match insert_op {
+            InsertOp::Append => false,
+            InsertOp::Overwrite => true,
+            other => {
+                return 
Err(datafusion::error::DataFusionError::NotImplemented(format!(
+                    "{other} is not supported for Paimon tables"
+                )));
+            }
+        };
+        let sink = PaimonDataSink::new(self.table.clone(), 
self.schema.clone(), overwrite);
+        Ok(Arc::new(DataSinkExec::new(input, Arc::new(sink), None)))
+    }
+
     fn supports_filters_pushdown(
         &self,
         filters: &[&Expr],
@@ -373,4 +396,250 @@ mod tests {
 
         assert_eq!(scan.pushed_predicate(), Some(&expected));
     }
+
+    #[tokio::test]
+    async fn test_insert_into_and_read_back() {
+        use paimon::io::FileIOBuilder;
+        use paimon::spec::{DataType, IntType, Schema as PaimonSchema, 
TableSchema};
+
+        let file_io = FileIOBuilder::new("memory").build().unwrap();
+        let table_path = "memory:/test_df_insert_into";
+        file_io
+            .mkdirs(&format!("{table_path}/snapshot/"))
+            .await
+            .unwrap();
+        file_io
+            .mkdirs(&format!("{table_path}/manifest/"))
+            .await
+            .unwrap();
+
+        let schema = PaimonSchema::builder()
+            .column("id", DataType::Int(IntType::new()))
+            .column("value", DataType::Int(IntType::new()))
+            .build()
+            .unwrap();
+        let table_schema = TableSchema::new(0, &schema);
+        let table = paimon::table::Table::new(
+            file_io,
+            Identifier::new("default", "test_insert"),
+            table_path.to_string(),
+            table_schema,
+            None,
+        );
+
+        let provider = PaimonTableProvider::try_new(table).unwrap();
+        let ctx = SessionContext::new();
+        ctx.register_table("t", Arc::new(provider)).unwrap();
+
+        // INSERT INTO
+        let result = ctx
+            .sql("INSERT INTO t VALUES (1, 10), (2, 20), (3, 30)")
+            .await
+            .unwrap()
+            .collect()
+            .await
+            .unwrap();
+
+        // Verify count output
+        let count_array = result[0]
+            .column(0)
+            .as_any()
+            .downcast_ref::<datafusion::arrow::array::UInt64Array>()
+            .unwrap();
+        assert_eq!(count_array.value(0), 3);
+
+        // Read back
+        let batches = ctx
+            .sql("SELECT id, value FROM t ORDER BY id")
+            .await
+            .unwrap()
+            .collect()
+            .await
+            .unwrap();
+
+        let mut rows = Vec::new();
+        for batch in &batches {
+            let ids = batch
+                .column(0)
+                .as_any()
+                .downcast_ref::<datafusion::arrow::array::Int32Array>()
+                .unwrap();
+            let vals = batch
+                .column(1)
+                .as_any()
+                .downcast_ref::<datafusion::arrow::array::Int32Array>()
+                .unwrap();
+            for i in 0..batch.num_rows() {
+                rows.push((ids.value(i), vals.value(i)));
+            }
+        }
+        assert_eq!(rows, vec![(1, 10), (2, 20), (3, 30)]);
+    }
+
+    #[tokio::test]
+    async fn test_insert_overwrite() {
+        use paimon::io::FileIOBuilder;
+        use paimon::spec::{DataType, IntType, Schema as PaimonSchema, 
TableSchema, VarCharType};
+
+        let file_io = FileIOBuilder::new("memory").build().unwrap();
+        let table_path = "memory:/test_df_insert_overwrite";
+        file_io
+            .mkdirs(&format!("{table_path}/snapshot/"))
+            .await
+            .unwrap();
+        file_io
+            .mkdirs(&format!("{table_path}/manifest/"))
+            .await
+            .unwrap();
+
+        let schema = PaimonSchema::builder()
+            .column("pt", DataType::VarChar(VarCharType::string_type()))
+            .column("id", DataType::Int(IntType::new()))
+            .partition_keys(["pt"])
+            .build()
+            .unwrap();
+        let table_schema = TableSchema::new(0, &schema);
+        let table = paimon::table::Table::new(
+            file_io,
+            Identifier::new("default", "test_overwrite"),
+            table_path.to_string(),
+            table_schema,
+            None,
+        );
+
+        let provider = PaimonTableProvider::try_new(table).unwrap();
+        let ctx = SessionContext::new();
+        ctx.register_table("t", Arc::new(provider)).unwrap();
+
+        // Initial INSERT: partition "a" and "b"
+        ctx.sql("INSERT INTO t VALUES ('a', 1), ('a', 2), ('b', 3), ('b', 4)")
+            .await
+            .unwrap()
+            .collect()
+            .await
+            .unwrap();
+
+        // INSERT OVERWRITE with only partition "a" data
+        // Should overwrite partition "a" but leave partition "b" intact
+        ctx.sql("INSERT OVERWRITE t VALUES ('a', 10), ('a', 20)")
+            .await
+            .unwrap()
+            .collect()
+            .await
+            .unwrap();
+
+        // Read back
+        let batches = ctx
+            .sql("SELECT pt, id FROM t ORDER BY pt, id")
+            .await
+            .unwrap()
+            .collect()
+            .await
+            .unwrap();
+
+        let mut rows = Vec::new();
+        for batch in &batches {
+            let pts = batch
+                .column(0)
+                .as_any()
+                .downcast_ref::<datafusion::arrow::array::StringArray>()
+                .unwrap();
+            let ids = batch
+                .column(1)
+                .as_any()
+                .downcast_ref::<datafusion::arrow::array::Int32Array>()
+                .unwrap();
+            for i in 0..batch.num_rows() {
+                rows.push((pts.value(i).to_string(), ids.value(i)));
+            }
+        }
+        // Partition "a" overwritten with new data, partition "b" untouched
+        assert_eq!(
+            rows,
+            vec![
+                ("a".to_string(), 10),
+                ("a".to_string(), 20),
+                ("b".to_string(), 3),
+                ("b".to_string(), 4),
+            ]
+        );
+    }
+
+    #[tokio::test]
+    async fn test_insert_overwrite_unpartitioned() {
+        use paimon::io::FileIOBuilder;
+        use paimon::spec::{DataType, IntType, Schema as PaimonSchema, 
TableSchema};
+
+        let file_io = FileIOBuilder::new("memory").build().unwrap();
+        let table_path = "memory:/test_df_insert_overwrite_unpart";
+        file_io
+            .mkdirs(&format!("{table_path}/snapshot/"))
+            .await
+            .unwrap();
+        file_io
+            .mkdirs(&format!("{table_path}/manifest/"))
+            .await
+            .unwrap();
+
+        let schema = PaimonSchema::builder()
+            .column("id", DataType::Int(IntType::new()))
+            .column("value", DataType::Int(IntType::new()))
+            .build()
+            .unwrap();
+        let table_schema = TableSchema::new(0, &schema);
+        let table = paimon::table::Table::new(
+            file_io,
+            Identifier::new("default", "test_overwrite_unpart"),
+            table_path.to_string(),
+            table_schema,
+            None,
+        );
+
+        let provider = PaimonTableProvider::try_new(table).unwrap();
+        let ctx = SessionContext::new();
+        ctx.register_table("t", Arc::new(provider)).unwrap();
+
+        // Initial INSERT
+        ctx.sql("INSERT INTO t VALUES (1, 10), (2, 20), (3, 30)")
+            .await
+            .unwrap()
+            .collect()
+            .await
+            .unwrap();
+
+        // INSERT OVERWRITE on unpartitioned table — full table overwrite
+        ctx.sql("INSERT OVERWRITE t VALUES (4, 40), (5, 50)")
+            .await
+            .unwrap()
+            .collect()
+            .await
+            .unwrap();
+
+        let batches = ctx
+            .sql("SELECT id, value FROM t ORDER BY id")
+            .await
+            .unwrap()
+            .collect()
+            .await
+            .unwrap();
+
+        let mut rows = Vec::new();
+        for batch in &batches {
+            let ids = batch
+                .column(0)
+                .as_any()
+                .downcast_ref::<datafusion::arrow::array::Int32Array>()
+                .unwrap();
+            let vals = batch
+                .column(1)
+                .as_any()
+                .downcast_ref::<datafusion::arrow::array::Int32Array>()
+                .unwrap();
+            for i in 0..batch.num_rows() {
+                rows.push((ids.value(i), vals.value(i)));
+            }
+        }
+        // Old data fully replaced
+        assert_eq!(rows, vec![(4, 40), (5, 50)]);
+    }
 }
diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index 10e5fa0..908781f 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -64,8 +64,10 @@ arrow-buffer = { workspace = true }
 arrow-cast = { workspace = true }
 arrow-ord = { workspace = true }
 arrow-schema = { workspace = true }
+arrow-select = { workspace = true }
 futures = "0.3"
-parquet = { workspace = true, features = ["async", "zstd"] }
+tokio-util = { workspace = true, features = ["compat"] }
+parquet = { workspace = true, features = ["async", "zstd", "lz4", "snap"] }
 orc-rust = "0.7.0"
 async-stream = "0.3.6"
 reqwest = { version = "0.12", features = ["json"] }
diff --git a/crates/paimon/src/arrow/format/mod.rs 
b/crates/paimon/src/arrow/format/mod.rs
index 63fd4e0..454e621 100644
--- a/crates/paimon/src/arrow/format/mod.rs
+++ b/crates/paimon/src/arrow/format/mod.rs
@@ -19,10 +19,12 @@ mod avro;
 mod orc;
 mod parquet;
 
-use crate::io::FileRead;
+use crate::io::{FileRead, OutputFile};
 use crate::spec::{DataField, Predicate};
 use crate::table::{ArrowRecordBatchStream, RowRange};
 use crate::Error;
+use arrow_array::RecordBatch;
+use arrow_schema::SchemaRef;
 use async_trait::async_trait;
 
 /// Predicates with the file-level field context needed for pushdown.
@@ -58,6 +60,30 @@ pub(crate) trait FormatFileReader: Send + Sync {
     ) -> crate::Result<ArrowRecordBatchStream>;
 }
 
+/// Format-agnostic file writer that streams Arrow RecordBatches directly to 
storage.
+///
+/// Each implementation (Parquet, ORC, ...) handles format-specific encoding.
+/// Usage: create via [`create_format_writer`], call 
[`write`](FormatFileWriter::write)
+/// for each batch, then [`close`](FormatFileWriter::close) to finalize the 
file.
+#[async_trait]
+pub(crate) trait FormatFileWriter: Send {
+    /// Write a RecordBatch to the underlying storage.
+    async fn write(&mut self, batch: &RecordBatch) -> crate::Result<()>;
+
+    /// Number of bytes written so far (approximate, before close).
+    fn num_bytes(&self) -> usize;
+
+    /// Number of bytes buffered in the current row group (not yet flushed).
+    fn in_progress_size(&self) -> usize;
+
+    /// Flush the current row group to storage without closing the file.
+    async fn flush(&mut self) -> crate::Result<()>;
+
+    /// Flush and close the writer, finalizing the file on storage.
+    /// Returns the total number of bytes written.
+    async fn close(self: Box<Self>) -> crate::Result<u64>;
+}
+
 /// Create a format reader based on the file extension.
 pub(crate) fn create_format_reader(path: &str) -> crate::Result<Box<dyn 
FormatFileReader>> {
     if path.to_ascii_lowercase().ends_with(".parquet") {
@@ -74,3 +100,22 @@ pub(crate) fn create_format_reader(path: &str) -> 
crate::Result<Box<dyn FormatFi
         })
     }
 }
+
+/// Create a format writer that streams directly to storage.
+pub(crate) async fn create_format_writer(
+    output: &OutputFile,
+    schema: SchemaRef,
+    compression: &str,
+    zstd_level: i32,
+) -> crate::Result<Box<dyn FormatFileWriter>> {
+    let path = output.location();
+    if path.to_ascii_lowercase().ends_with(".parquet") {
+        Ok(Box::new(
+            parquet::ParquetFormatWriter::new(output, schema, compression, 
zstd_level).await?,
+        ))
+    } else {
+        Err(Error::Unsupported {
+            message: format!("unsupported write format: expected .parquet, 
got: {path}"),
+        })
+    }
+}
diff --git a/crates/paimon/src/arrow/format/parquet.rs 
b/crates/paimon/src/arrow/format/parquet.rs
index b0aa0ec..74de8a2 100644
--- a/crates/paimon/src/arrow/format/parquet.rs
+++ b/crates/paimon/src/arrow/format/parquet.rs
@@ -15,9 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use super::{FilePredicates, FormatFileReader};
+use super::{FilePredicates, FormatFileReader, FormatFileWriter};
 use crate::arrow::filtering::{predicates_may_match_with_schema, StatsAccessor};
-use crate::io::FileRead;
+use crate::io::{FileRead, OutputFile};
 use crate::spec::{DataField, DataType, Datum, Predicate, PredicateOperator};
 use crate::table::{ArrowRecordBatchStream, RowRange};
 use crate::Error;
@@ -38,9 +38,11 @@ use parquet::arrow::arrow_reader::{
     ArrowPredicate, ArrowPredicateFn, ArrowReaderOptions, RowFilter, 
RowSelection, RowSelector,
 };
 use parquet::arrow::async_reader::{AsyncFileReader, MetadataFetch};
-use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
+use parquet::arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder, 
ProjectionMask};
+use parquet::basic::{Compression, ZstdLevel};
 use parquet::file::metadata::ParquetMetaDataReader;
 use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
+use parquet::file::properties::WriterProperties;
 use parquet::file::statistics::Statistics as ParquetStatistics;
 use std::collections::HashMap;
 use std::ops::Range;
@@ -48,6 +50,89 @@ use std::sync::Arc;
 
 pub(crate) struct ParquetFormatReader;
 
+/// Parquet implementation of [`FormatFileWriter`].
+/// Streams data directly to storage via `AsyncArrowWriter` + opendal.
+pub(crate) struct ParquetFormatWriter {
+    inner: AsyncArrowWriter<Box<dyn crate::io::AsyncFileWrite>>,
+}
+
+impl ParquetFormatWriter {
+    pub(crate) async fn new(
+        output: &OutputFile,
+        schema: arrow_schema::SchemaRef,
+        compression: &str,
+        zstd_level: i32,
+    ) -> crate::Result<Self> {
+        let async_write = output.async_writer().await?;
+        let codec = parse_compression(compression, zstd_level);
+        let props = WriterProperties::builder().set_compression(codec).build();
+        let inner = AsyncArrowWriter::try_new(async_write, schema, 
Some(props)).map_err(|e| {
+            crate::Error::DataInvalid {
+                message: format!("Failed to create parquet writer: {e}"),
+                source: None,
+            }
+        })?;
+        Ok(Self { inner })
+    }
+}
+
+/// Map Paimon `file.compression` value to parquet [`Compression`].
+fn parse_compression(codec: &str, zstd_level: i32) -> Compression {
+    match codec.to_ascii_lowercase().as_str() {
+        "zstd" => {
+            let level = ZstdLevel::try_new(zstd_level).unwrap_or_default();
+            Compression::ZSTD(level)
+        }
+        "lz4" => Compression::LZ4_RAW,
+        "snappy" => Compression::SNAPPY,
+        "gzip" | "gz" => Compression::GZIP(Default::default()),
+        "none" | "uncompressed" => Compression::UNCOMPRESSED,
+        _ => Compression::UNCOMPRESSED,
+    }
+}
+
+#[async_trait]
+impl FormatFileWriter for ParquetFormatWriter {
+    async fn write(&mut self, batch: &RecordBatch) -> crate::Result<()> {
+        self.inner
+            .write(batch)
+            .await
+            .map_err(|e| crate::Error::DataInvalid {
+                message: format!("Failed to write parquet batch: {e}"),
+                source: None,
+            })
+    }
+
+    fn num_bytes(&self) -> usize {
+        self.inner.bytes_written() + self.inner.in_progress_size()
+    }
+
+    fn in_progress_size(&self) -> usize {
+        self.inner.in_progress_size()
+    }
+
+    async fn flush(&mut self) -> crate::Result<()> {
+        self.inner
+            .flush()
+            .await
+            .map_err(|e| crate::Error::DataInvalid {
+                message: format!("Failed to flush parquet writer: {e}"),
+                source: None,
+            })
+    }
+
+    async fn close(mut self: Box<Self>) -> crate::Result<u64> {
+        self.inner
+            .finish()
+            .await
+            .map_err(|e| crate::Error::DataInvalid {
+                message: format!("Failed to close parquet writer: {e}"),
+                source: None,
+            })?;
+        Ok(self.inner.bytes_written() as u64)
+    }
+}
+
 #[async_trait]
 impl FormatFileReader for ParquetFormatReader {
     async fn read_batch_stream(
@@ -1050,7 +1135,12 @@ fn split_ranges_for_concurrency(merged: Vec<Range<u64>>, 
concurrency: usize) ->
 #[cfg(test)]
 mod tests {
     use super::build_parquet_row_filter;
+    use super::ParquetFormatWriter;
+    use crate::arrow::format::FormatFileWriter;
+    use crate::io::FileIOBuilder;
     use crate::spec::{DataField, DataType, Datum, IntType, PredicateBuilder};
+    use arrow_array::{Int32Array, RecordBatch};
+    use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema 
as ArrowSchema};
     use parquet::schema::{parser::parse_message_type, types::SchemaDescriptor};
     use std::sync::Arc;
 
@@ -1241,4 +1331,81 @@ mod tests {
         let result = super::split_ranges_for_concurrency(merged, 4);
         assert!(result.is_empty());
     }
+
+    fn writer_arrow_schema() -> Arc<ArrowSchema> {
+        Arc::new(ArrowSchema::new(vec![
+            ArrowField::new("id", ArrowDataType::Int32, false),
+            ArrowField::new("value", ArrowDataType::Int32, false),
+        ]))
+    }
+
+    fn writer_test_batch(
+        schema: &Arc<ArrowSchema>,
+        ids: Vec<i32>,
+        values: Vec<i32>,
+    ) -> RecordBatch {
+        RecordBatch::try_new(
+            schema.clone(),
+            vec![
+                Arc::new(Int32Array::from(ids)),
+                Arc::new(Int32Array::from(values)),
+            ],
+        )
+        .unwrap()
+    }
+
+    #[tokio::test]
+    async fn test_parquet_writer_write_and_close() {
+        let file_io = FileIOBuilder::new("memory").build().unwrap();
+        let path = "memory:/test_parquet_writer_write_close.parquet";
+        let output = file_io.new_output(path).unwrap();
+        let schema = writer_arrow_schema();
+
+        let mut writer: Box<dyn FormatFileWriter> = Box::new(
+            ParquetFormatWriter::new(&output, schema.clone(), "zstd", 1)
+                .await
+                .unwrap(),
+        );
+
+        let batch = writer_test_batch(&schema, vec![1, 2, 3], vec![10, 20, 
30]);
+        writer.write(&batch).await.unwrap();
+        writer.close().await.unwrap();
+
+        // Verify valid parquet by reading back
+        let bytes = file_io.new_input(path).unwrap().read().await.unwrap();
+        let reader =
+            
parquet::arrow::arrow_reader::ParquetRecordBatchReader::try_new(bytes, 
1024).unwrap();
+        let total_rows: usize = reader.into_iter().map(|r| 
r.unwrap().num_rows()).sum();
+        assert_eq!(total_rows, 3);
+    }
+
+    #[tokio::test]
+    async fn test_parquet_writer_multiple_batches() {
+        let file_io = FileIOBuilder::new("memory").build().unwrap();
+        let path = "memory:/test_parquet_writer_multi.parquet";
+        let output = file_io.new_output(path).unwrap();
+        let schema = writer_arrow_schema();
+
+        let mut writer: Box<dyn FormatFileWriter> = Box::new(
+            ParquetFormatWriter::new(&output, schema.clone(), "zstd", 1)
+                .await
+                .unwrap(),
+        );
+
+        writer
+            .write(&writer_test_batch(&schema, vec![1, 2], vec![10, 20]))
+            .await
+            .unwrap();
+        writer
+            .write(&writer_test_batch(&schema, vec![3, 4, 5], vec![30, 40, 
50]))
+            .await
+            .unwrap();
+        writer.close().await.unwrap();
+
+        let bytes = file_io.new_input(path).unwrap().read().await.unwrap();
+        let reader =
+            
parquet::arrow::arrow_reader::ParquetRecordBatchReader::try_new(bytes, 
1024).unwrap();
+        let total_rows: usize = reader.into_iter().map(|r| 
r.unwrap().num_rows()).sum();
+        assert_eq!(total_rows, 5);
+    }
 }
diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs
index 6f41f11..93758e8 100644
--- a/crates/paimon/src/io/file_io.rs
+++ b/crates/paimon/src/io/file_io.rs
@@ -26,6 +26,7 @@ use chrono::{DateTime, Utc};
 use opendal::raw::normalize_root;
 use opendal::Operator;
 use snafu::ResultExt;
+use tokio_util::compat::FuturesAsyncWriteCompatExt;
 use url::Url;
 
 use super::Storage;
@@ -309,6 +310,11 @@ impl FileWrite for opendal::Writer {
     }
 }
 
+/// Async streaming writer trait for format-level writers (e.g. parquet).
+pub trait AsyncFileWrite: tokio::io::AsyncWrite + Unpin + Send {}
+
+impl<T: tokio::io::AsyncWrite + Unpin + Send> AsyncFileWrite for T {}
+
 #[derive(Clone, Debug)]
 pub struct FileStatus {
     pub size: u64,
@@ -390,10 +396,22 @@ impl OutputFile {
     }
 
     pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
+        Ok(Box::new(self.opendal_writer().await?))
+    }
+
+    /// Get an async streaming writer for format-level writes (e.g. parquet).
+    pub(crate) async fn async_writer(&self) -> crate::Result<Box<dyn 
AsyncFileWrite>> {
         Ok(Box::new(
-            self.op.writer(&self.path[self.relative_path_pos..]).await?,
+            self.opendal_writer()
+                .await?
+                .into_futures_async_write()
+                .compat_write(),
         ))
     }
+
+    async fn opendal_writer(&self) -> crate::Result<opendal::Writer> {
+        Ok(self.op.writer(&self.path[self.relative_path_pos..]).await?)
+    }
 }
 
 #[cfg(test)]
diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs
index 3867d69..b322347 100644
--- a/crates/paimon/src/lib.rs
+++ b/crates/paimon/src/lib.rs
@@ -44,5 +44,6 @@ pub use catalog::FileSystemCatalog;
 pub use table::{
     CommitMessage, DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, 
Plan, RESTEnv,
     RESTSnapshotCommit, ReadBuilder, RenamingSnapshotCommit, RowRange, 
SnapshotCommit,
-    SnapshotManager, Table, TableCommit, TableRead, TableScan, TagManager, 
WriteBuilder,
+    SnapshotManager, Table, TableCommit, TableRead, TableScan, TableWrite, 
TagManager,
+    WriteBuilder,
 };
diff --git a/crates/paimon/src/spec/binary_row.rs 
b/crates/paimon/src/spec/binary_row.rs
index 0ba6b78..6599484 100644
--- a/crates/paimon/src/spec/binary_row.rs
+++ b/crates/paimon/src/spec/binary_row.rs
@@ -20,10 +20,15 @@
 
 use crate::spec::murmur_hash::hash_by_words;
 use crate::spec::{DataType, Datum};
+use arrow_array::RecordBatch;
 use serde::{Deserialize, Serialize};
+use std::sync::LazyLock;
 
 pub const EMPTY_BINARY_ROW: BinaryRow = BinaryRow::new(0);
 
+pub static EMPTY_SERIALIZED_ROW: LazyLock<Vec<u8>> =
+    LazyLock::new(|| BinaryRowBuilder::new(0).build_serialized());
+
 /// Highest bit mask for detecting inline vs variable-length encoding.
 const HIGHEST_FIRST_BIT: u64 = 0x80 << 56;
 
@@ -276,82 +281,28 @@ impl BinaryRow {
     }
 
     /// Build a BinaryRow from typed Datum values using `BinaryRowBuilder`.
-    pub fn from_datums(datums: &[(&crate::spec::Datum, 
&crate::spec::DataType)]) -> Option<Self> {
+    /// `None` entries are written as null fields.
+    pub fn from_datums(datums: &[(Option<&crate::spec::Datum>, 
&crate::spec::DataType)]) -> Self {
         let arity = datums.len() as i32;
         let mut builder = BinaryRowBuilder::new(arity);
 
-        for (pos, (datum, data_type)) in datums.iter().enumerate() {
-            match datum {
-                crate::spec::Datum::Bool(v) => builder.write_boolean(pos, *v),
-                crate::spec::Datum::TinyInt(v) => builder.write_byte(pos, *v),
-                crate::spec::Datum::SmallInt(v) => builder.write_short(pos, 
*v),
-                crate::spec::Datum::Int(v)
-                | crate::spec::Datum::Date(v)
-                | crate::spec::Datum::Time(v) => builder.write_int(pos, *v),
-                crate::spec::Datum::Long(v) => builder.write_long(pos, *v),
-                crate::spec::Datum::Float(v) => builder.write_float(pos, *v),
-                crate::spec::Datum::Double(v) => builder.write_double(pos, *v),
-                crate::spec::Datum::Timestamp { millis, nanos } => {
-                    let precision = match data_type {
-                        crate::spec::DataType::Timestamp(ts) => ts.precision(),
-                        _ => 3,
-                    };
-                    if precision <= 3 {
-                        builder.write_timestamp_compact(pos, *millis);
-                    } else {
-                        builder.write_timestamp_non_compact(pos, *millis, 
*nanos);
-                    }
-                }
-                crate::spec::Datum::LocalZonedTimestamp { millis, nanos } => {
-                    let precision = match data_type {
-                        crate::spec::DataType::LocalZonedTimestamp(ts) => 
ts.precision(),
-                        _ => 3,
-                    };
-                    if precision <= 3 {
-                        builder.write_timestamp_compact(pos, *millis);
-                    } else {
-                        builder.write_timestamp_non_compact(pos, *millis, 
*nanos);
-                    }
-                }
-                crate::spec::Datum::Decimal {
-                    unscaled,
-                    precision,
-                    ..
-                } => {
-                    if *precision <= 18 {
-                        builder.write_decimal_compact(pos, *unscaled as i64);
-                    } else {
-                        builder.write_decimal_var_len(pos, *unscaled);
-                    }
-                }
-                crate::spec::Datum::String(s) => {
-                    if s.len() <= 7 {
-                        builder.write_string_inline(pos, s);
-                    } else {
-                        builder.write_string(pos, s);
-                    }
-                }
-                crate::spec::Datum::Bytes(b) => {
-                    if b.len() <= 7 {
-                        builder.write_binary_inline(pos, b);
-                    } else {
-                        builder.write_binary(pos, b);
-                    }
-                }
+        for (pos, (datum_opt, data_type)) in datums.iter().enumerate() {
+            match datum_opt {
+                Some(datum) => builder.write_datum(pos, datum, data_type),
+                None => builder.set_null_at(pos),
             }
         }
 
-        let row = builder.build();
-        Some(row)
+        builder.build()
     }
 
     pub fn compute_bucket_from_datums(
-        datums: &[(&crate::spec::Datum, &crate::spec::DataType)],
+        datums: &[(Option<&crate::spec::Datum>, &crate::spec::DataType)],
         total_buckets: i32,
-    ) -> Option<i32> {
-        let row = Self::from_datums(datums)?;
+    ) -> i32 {
+        let row = Self::from_datums(datums);
         let hash = row.hash_code();
-        Some((hash % total_buckets).abs())
+        (hash % total_buckets).abs()
     }
 }
 
@@ -607,6 +558,169 @@ pub fn datums_to_binary_row(datums: &[(&Option<Datum>, 
&DataType)]) -> Vec<u8> {
     builder.build_serialized()
 }
 
+/// Extract a Datum from an Arrow RecordBatch column at the given row index.
+pub fn extract_datum_from_arrow(
+    batch: &RecordBatch,
+    row_idx: usize,
+    col_idx: usize,
+    data_type: &DataType,
+) -> crate::Result<Option<Datum>> {
+    use arrow_array::Array;
+
+    let col = batch.column(col_idx);
+    if col.is_null(row_idx) {
+        return Ok(None);
+    }
+
+    let datum = match data_type {
+        DataType::Boolean(_) => {
+            let arr = col
+                .as_any()
+                .downcast_ref::<arrow_array::BooleanArray>()
+                .ok_or_else(|| type_mismatch_err("Boolean", col_idx))?;
+            Datum::Bool(arr.value(row_idx))
+        }
+        DataType::TinyInt(_) => {
+            let arr = col
+                .as_any()
+                .downcast_ref::<arrow_array::Int8Array>()
+                .ok_or_else(|| type_mismatch_err("TinyInt", col_idx))?;
+            Datum::TinyInt(arr.value(row_idx))
+        }
+        DataType::SmallInt(_) => {
+            let arr = col
+                .as_any()
+                .downcast_ref::<arrow_array::Int16Array>()
+                .ok_or_else(|| type_mismatch_err("SmallInt", col_idx))?;
+            Datum::SmallInt(arr.value(row_idx))
+        }
+        DataType::Int(_) => {
+            let arr = col
+                .as_any()
+                .downcast_ref::<arrow_array::Int32Array>()
+                .ok_or_else(|| type_mismatch_err("Int", col_idx))?;
+            Datum::Int(arr.value(row_idx))
+        }
+        DataType::BigInt(_) => {
+            let arr = col
+                .as_any()
+                .downcast_ref::<arrow_array::Int64Array>()
+                .ok_or_else(|| type_mismatch_err("BigInt", col_idx))?;
+            Datum::Long(arr.value(row_idx))
+        }
+        DataType::Float(_) => {
+            let arr = col
+                .as_any()
+                .downcast_ref::<arrow_array::Float32Array>()
+                .ok_or_else(|| type_mismatch_err("Float", col_idx))?;
+            Datum::Float(arr.value(row_idx))
+        }
+        DataType::Double(_) => {
+            let arr = col
+                .as_any()
+                .downcast_ref::<arrow_array::Float64Array>()
+                .ok_or_else(|| type_mismatch_err("Double", col_idx))?;
+            Datum::Double(arr.value(row_idx))
+        }
+        DataType::Char(_) | DataType::VarChar(_) => {
+            let arr = col
+                .as_any()
+                .downcast_ref::<arrow_array::StringArray>()
+                .ok_or_else(|| type_mismatch_err("String", col_idx))?;
+            Datum::String(arr.value(row_idx).to_string())
+        }
+        DataType::Date(_) => {
+            let arr = col
+                .as_any()
+                .downcast_ref::<arrow_array::Date32Array>()
+                .ok_or_else(|| type_mismatch_err("Date", col_idx))?;
+            Datum::Date(arr.value(row_idx))
+        }
+        DataType::Decimal(d) => {
+            let arr = col
+                .as_any()
+                .downcast_ref::<arrow_array::Decimal128Array>()
+                .ok_or_else(|| type_mismatch_err("Decimal", col_idx))?;
+            Datum::Decimal {
+                unscaled: arr.value(row_idx),
+                precision: d.precision(),
+                scale: d.scale(),
+            }
+        }
+        DataType::Binary(_) | DataType::VarBinary(_) => {
+            let arr = col
+                .as_any()
+                .downcast_ref::<arrow_array::BinaryArray>()
+                .ok_or_else(|| type_mismatch_err("Binary", col_idx))?;
+            Datum::Bytes(arr.value(row_idx).to_vec())
+        }
+        DataType::Timestamp(ts) => {
+            if ts.precision() <= 3 {
+                let arr = col
+                    .as_any()
+                    .downcast_ref::<arrow_array::TimestampMillisecondArray>()
+                    .ok_or_else(|| type_mismatch_err("Timestamp(ms)", 
col_idx))?;
+                Datum::Timestamp {
+                    millis: arr.value(row_idx),
+                    nanos: 0,
+                }
+            } else {
+                let arr = col
+                    .as_any()
+                    .downcast_ref::<arrow_array::TimestampMicrosecondArray>()
+                    .ok_or_else(|| type_mismatch_err("Timestamp(us)", 
col_idx))?;
+                let micros = arr.value(row_idx);
+                Datum::Timestamp {
+                    millis: micros / 1000,
+                    nanos: ((micros % 1000) * 1000) as i32,
+                }
+            }
+        }
+        DataType::LocalZonedTimestamp(ts) => {
+            if ts.precision() <= 3 {
+                let arr = col
+                    .as_any()
+                    .downcast_ref::<arrow_array::TimestampMillisecondArray>()
+                    .ok_or_else(|| 
type_mismatch_err("LocalZonedTimestamp(ms)", col_idx))?;
+                Datum::LocalZonedTimestamp {
+                    millis: arr.value(row_idx),
+                    nanos: 0,
+                }
+            } else {
+                let arr = col
+                    .as_any()
+                    .downcast_ref::<arrow_array::TimestampMicrosecondArray>()
+                    .ok_or_else(|| 
type_mismatch_err("LocalZonedTimestamp(us)", col_idx))?;
+                let micros = arr.value(row_idx);
+                Datum::LocalZonedTimestamp {
+                    millis: micros / 1000,
+                    nanos: ((micros % 1000) * 1000) as i32,
+                }
+            }
+        }
+        _ => {
+            return Err(crate::Error::Unsupported {
+                message: format!(
+                    "Unsupported data type {:?} for Arrow extraction at column 
{}",
+                    data_type, col_idx
+                ),
+            });
+        }
+    };
+
+    Ok(Some(datum))
+}
+
+fn type_mismatch_err(expected: &str, col_idx: usize) -> crate::Error {
+    crate::Error::DataInvalid {
+        message: format!(
+            "Arrow column {} type mismatch: expected {} compatible array",
+            col_idx, expected
+        ),
+        source: None,
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/crates/paimon/src/spec/core_options.rs 
b/crates/paimon/src/spec/core_options.rs
index 17993d9..bb52022 100644
--- a/crates/paimon/src/spec/core_options.rs
+++ b/crates/paimon/src/spec/core_options.rs
@@ -27,12 +27,15 @@ const PARTITION_LEGACY_NAME_OPTION: &str = 
"partition.legacy-name";
 const BUCKET_KEY_OPTION: &str = "bucket-key";
 const BUCKET_FUNCTION_TYPE_OPTION: &str = "bucket-function.type";
 const BUCKET_OPTION: &str = "bucket";
-const DEFAULT_BUCKET: i32 = 1;
+const DEFAULT_BUCKET: i32 = -1;
 const COMMIT_MAX_RETRIES_OPTION: &str = "commit.max-retries";
 const COMMIT_TIMEOUT_OPTION: &str = "commit.timeout";
 const COMMIT_MIN_RETRY_WAIT_OPTION: &str = "commit.min-retry-wait";
 const COMMIT_MAX_RETRY_WAIT_OPTION: &str = "commit.max-retry-wait";
+const FILE_COMPRESSION_OPTION: &str = "file.compression";
+const FILE_COMPRESSION_ZSTD_LEVEL_OPTION: &str = "file.compression.zstd-level";
 const ROW_TRACKING_ENABLED_OPTION: &str = "row-tracking.enabled";
+const WRITE_PARQUET_BUFFER_SIZE_OPTION: &str = "write.parquet-buffer-size";
 const DEFAULT_COMMIT_MAX_RETRIES: u32 = 10;
 const DEFAULT_COMMIT_TIMEOUT_MS: u64 = 120_000;
 const DEFAULT_COMMIT_MIN_RETRY_WAIT_MS: u64 = 1_000;
@@ -43,6 +46,8 @@ pub const SCAN_TAG_NAME_OPTION: &str = "scan.tag-name";
 const DEFAULT_SOURCE_SPLIT_TARGET_SIZE: i64 = 128 * 1024 * 1024;
 const DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST: i64 = 4 * 1024 * 1024;
 const DEFAULT_PARTITION_DEFAULT_NAME: &str = "__DEFAULT_PARTITION__";
+const DEFAULT_TARGET_FILE_SIZE: i64 = 256 * 1024 * 1024;
+const DEFAULT_WRITE_PARQUET_BUFFER_SIZE: i64 = 256 * 1024 * 1024;
 
 /// Typed accessors for common table options.
 ///
@@ -264,6 +269,41 @@ impl<'a> CoreOptions<'a> {
             .map(|v| v.eq_ignore_ascii_case("default"))
             .unwrap_or(true)
     }
+
+    /// Target file size for data files. Default is 128MB.
+    pub fn target_file_size(&self) -> i64 {
+        self.options
+            .get("target-file-size")
+            .and_then(|v| parse_memory_size(v))
+            .unwrap_or(DEFAULT_TARGET_FILE_SIZE)
+    }
+
+    /// File compression codec (e.g. "lz4", "zstd", "snappy", "none").
+    /// Default is "zstd".
+    pub fn file_compression(&self) -> &str {
+        self.options
+            .get(FILE_COMPRESSION_OPTION)
+            .map(String::as_str)
+            .unwrap_or("zstd")
+    }
+
+    /// Zstd compression level. Only meaningful when `file.compression` is 
`"zstd"`.
+    /// Default is 1 (matching Paimon Java).
+    pub fn file_compression_zstd_level(&self) -> i32 {
+        self.options
+            .get(FILE_COMPRESSION_ZSTD_LEVEL_OPTION)
+            .and_then(|v| v.parse().ok())
+            .unwrap_or(1)
+    }
+
+    /// Parquet writer in-progress buffer size limit. Default is 256MB.
+    /// When the buffered data exceeds this, the writer flushes the current 
row group.
+    pub fn write_parquet_buffer_size(&self) -> i64 {
+        self.options
+            .get(WRITE_PARQUET_BUFFER_SIZE_OPTION)
+            .and_then(|v| parse_memory_size(v))
+            .unwrap_or(DEFAULT_WRITE_PARQUET_BUFFER_SIZE)
+    }
 }
 
 /// Parse a memory size string to bytes using binary (1024-based) semantics.
@@ -421,7 +461,7 @@ mod tests {
     fn test_commit_options_defaults() {
         let options = HashMap::new();
         let core = CoreOptions::new(&options);
-        assert_eq!(core.bucket(), 1);
+        assert_eq!(core.bucket(), -1);
         assert_eq!(core.commit_max_retries(), 10);
         assert_eq!(core.commit_timeout_ms(), 120_000);
         assert_eq!(core.commit_min_retry_wait_ms(), 1_000);
@@ -477,4 +517,21 @@ mod tests {
             Some(TimeTravelSelector::TimestampMillis(1234))
         );
     }
+
+    #[test]
+    fn test_write_options_defaults() {
+        let options = HashMap::new();
+        let core = CoreOptions::new(&options);
+        assert_eq!(core.write_parquet_buffer_size(), 256 * 1024 * 1024);
+    }
+
+    #[test]
+    fn test_write_options_custom() {
+        let options = HashMap::from([(
+            WRITE_PARQUET_BUFFER_SIZE_OPTION.to_string(),
+            "32mb".to_string(),
+        )]);
+        let core = CoreOptions::new(&options);
+        assert_eq!(core.write_parquet_buffer_size(), 32 * 1024 * 1024);
+    }
 }
diff --git a/crates/paimon/src/spec/partition_utils.rs 
b/crates/paimon/src/spec/partition_utils.rs
index fa41cfc..5d05863 100644
--- a/crates/paimon/src/spec/partition_utils.rs
+++ b/crates/paimon/src/spec/partition_utils.rs
@@ -38,6 +38,7 @@ const MILLIS_PER_DAY: i64 = 86_400_000;
 /// (escaped directory path).
 ///
 /// Reference: `org.apache.paimon.utils.InternalRowPartitionComputer` in Java 
Paimon.
+#[derive(Debug)]
 pub(crate) struct PartitionComputer {
     partition_keys: Vec<String>,
     partition_fields: Vec<DataField>,
diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs
index 62ab3fe..b0baf66 100644
--- a/crates/paimon/src/spec/schema.rs
+++ b/crates/paimon/src/spec/schema.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::spec::core_options::CoreOptions;
 use crate::spec::types::{DataType, RowType};
 use serde::{Deserialize, Serialize};
 use serde_with::serde_as;
@@ -115,6 +116,25 @@ impl TableSchema {
     pub fn time_millis(&self) -> i64 {
         self.time_millis
     }
+
+    /// Compute the effective bucket key columns.
+    ///
+    /// Priority: explicit `bucket-key` option > primary keys > all 
non-partition fields.
+    pub fn bucket_keys(&self) -> Vec<String> {
+        let core_options = CoreOptions::new(&self.options);
+        if let Some(keys) = core_options.bucket_key() {
+            return keys;
+        }
+        if !self.primary_keys.is_empty() {
+            return self.primary_keys.clone();
+        }
+        let partition_set: HashSet<&str> = 
self.partition_keys.iter().map(String::as_str).collect();
+        self.fields
+            .iter()
+            .filter(|f| !partition_set.contains(f.name()))
+            .map(|f| f.name().to_string())
+            .collect()
+    }
 }
 
 pub const ROW_ID_FIELD_NAME: &str = "_ROW_ID";
diff --git a/crates/paimon/src/table/bucket_filter.rs 
b/crates/paimon/src/table/bucket_filter.rs
index ea300df..80942d3 100644
--- a/crates/paimon/src/table/bucket_filter.rs
+++ b/crates/paimon/src/table/bucket_filter.rs
@@ -101,14 +101,14 @@ pub(super) fn compute_target_buckets(
     }
 
     // Collect equal-value candidates per bucket key field (by projected 
index).
-    // Each field can have one value (Eq) or multiple values (In).
+    // Each field can have one value (Eq), multiple values (In), or NULL 
(IsNull).
     let num_keys = bucket_key_fields.len();
-    let mut field_candidates: Vec<Option<Vec<&Datum>>> = vec![None; num_keys];
+    let mut field_candidates: Vec<Option<Vec<Option<&Datum>>>> = vec![None; 
num_keys];
 
     collect_eq_candidates(bucket_predicate, &mut field_candidates);
 
     // All bucket key fields must have candidates.
-    let candidates: Vec<&Vec<&Datum>> =
+    let candidates: Vec<&Vec<Option<&Datum>>> =
         field_candidates.iter().filter_map(|c| c.as_ref()).collect();
     if candidates.len() != num_keys {
         return None;
@@ -118,18 +118,15 @@ pub(super) fn compute_target_buckets(
     let mut buckets = HashSet::new();
     let mut combo: Vec<usize> = vec![0; num_keys];
     loop {
-        let datums: Vec<(&Datum, &DataType)> = (0..num_keys)
+        let datums: Vec<(Option<&Datum>, &DataType)> = (0..num_keys)
             .map(|i| {
                 let vals = field_candidates[i].as_ref().unwrap();
                 (vals[combo[i]], bucket_key_fields[i].data_type())
             })
             .collect();
 
-        if let Some(bucket) = BinaryRow::compute_bucket_from_datums(&datums, 
total_buckets) {
-            buckets.insert(bucket);
-        } else {
-            return None;
-        }
+        let bucket = BinaryRow::compute_bucket_from_datums(&datums, 
total_buckets);
+        buckets.insert(bucket);
 
         // Advance the combination counter (rightmost first).
         let mut carry = true;
@@ -155,10 +152,10 @@ pub(super) fn compute_target_buckets(
     }
 }
 
-/// Recursively collect Eq/In literal candidates from a predicate for each 
bucket key field.
+/// Recursively collect Eq/In/IsNull literal candidates from a predicate for 
each bucket key field.
 fn collect_eq_candidates<'a>(
     predicate: &'a Predicate,
-    field_candidates: &mut Vec<Option<Vec<&'a Datum>>>,
+    field_candidates: &mut Vec<Option<Vec<Option<&'a Datum>>>>,
 ) {
     match predicate {
         Predicate::And(children) => {
@@ -176,14 +173,17 @@ fn collect_eq_candidates<'a>(
                 match op {
                     PredicateOperator::Eq => {
                         if let Some(lit) = literals.first() {
-                            field_candidates[*index] = Some(vec![lit]);
+                            field_candidates[*index] = Some(vec![Some(lit)]);
                         }
                     }
                     PredicateOperator::In => {
                         if !literals.is_empty() {
-                            field_candidates[*index] = 
Some(literals.iter().collect());
+                            field_candidates[*index] = 
Some(literals.iter().map(Some).collect());
                         }
                     }
+                    PredicateOperator::IsNull => {
+                        field_candidates[*index] = Some(vec![None]);
+                    }
                     _ => {}
                 }
             }
diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs
index c17ebbc..2fbd0a8 100644
--- a/crates/paimon/src/table/mod.rs
+++ b/crates/paimon/src/table/mod.rs
@@ -33,6 +33,7 @@ mod source;
 mod stats_filter;
 pub(crate) mod table_commit;
 mod table_scan;
+pub(crate) mod table_write;
 mod tag_manager;
 mod write_builder;
 
@@ -52,6 +53,7 @@ pub use source::{
 };
 pub use table_commit::TableCommit;
 pub use table_scan::TableScan;
+pub use table_write::TableWrite;
 pub use tag_manager::TagManager;
 pub use write_builder::WriteBuilder;
 
diff --git a/crates/paimon/src/table/table_commit.rs 
b/crates/paimon/src/table/table_commit.rs
index bc7f5c1..e14a4b7 100644
--- a/crates/paimon/src/table/table_commit.rs
+++ b/crates/paimon/src/table/table_commit.rs
@@ -48,21 +48,17 @@ pub struct TableCommit {
     snapshot_commit: Arc<dyn SnapshotCommit>,
     commit_user: String,
     total_buckets: i32,
-    overwrite_partition: Option<HashMap<String, Datum>>,
     // commit config
     commit_max_retries: u32,
     commit_timeout_ms: u64,
     commit_min_retry_wait_ms: u64,
     commit_max_retry_wait_ms: u64,
     row_tracking_enabled: bool,
+    partition_default_name: String,
 }
 
 impl TableCommit {
-    pub fn new(
-        table: Table,
-        commit_user: String,
-        overwrite_partition: Option<HashMap<String, Datum>>,
-    ) -> Self {
+    pub fn new(table: Table, commit_user: String) -> Self {
         let snapshot_manager = SnapshotManager::new(table.file_io.clone(), 
table.location.clone());
         let snapshot_commit = if let Some(env) = &table.rest_env {
             env.snapshot_commit()
@@ -78,65 +74,124 @@ impl TableCommit {
         let commit_min_retry_wait_ms = core_options.commit_min_retry_wait_ms();
         let commit_max_retry_wait_ms = core_options.commit_max_retry_wait_ms();
         let row_tracking_enabled = core_options.row_tracking_enabled();
+        let partition_default_name = 
core_options.partition_default_name().to_string();
         Self {
             table,
             snapshot_manager,
             snapshot_commit,
             commit_user,
             total_buckets,
-            overwrite_partition,
             commit_max_retries,
             commit_timeout_ms,
             commit_min_retry_wait_ms,
             commit_max_retry_wait_ms,
             row_tracking_enabled,
+            partition_default_name,
         }
     }
 
-    /// Commit new files. Uses OVERWRITE mode if overwrite_partition was set
-    /// in the constructor, otherwise uses APPEND mode.
+    /// Commit new files in APPEND mode.
     pub async fn commit(&self, commit_messages: Vec<CommitMessage>) -> 
Result<()> {
         if commit_messages.is_empty() {
             return Ok(());
         }
 
         let commit_entries = self.messages_to_entries(&commit_messages);
+        self.try_commit(
+            CommitKind::APPEND,
+            CommitEntriesPlan::Static(commit_entries),
+        )
+        .await
+    }
 
-        if let Some(overwrite_partition) = &self.overwrite_partition {
-            let partition_predicate = if overwrite_partition.is_empty() {
-                None
-            } else {
-                Some(self.build_partition_predicate(overwrite_partition)?)
-            };
-            self.try_commit(
-                CommitKind::OVERWRITE,
-                CommitEntriesPlan::Overwrite {
-                    partition_predicate,
-                    new_entries: commit_entries,
-                },
-            )
-            .await
-        } else {
-            self.try_commit(
-                CommitKind::APPEND,
-                CommitEntriesPlan::Static(commit_entries),
-            )
-            .await
+    /// Overwrite with dynamic partition detection.
+    ///
+    /// Extracts the set of partitions touched by `commit_messages` and 
overwrites
+    /// only those partitions. For unpartitioned tables this is a full table 
overwrite.
+    pub async fn overwrite(&self, commit_messages: Vec<CommitMessage>) -> 
Result<()> {
+        if commit_messages.is_empty() {
+            return Ok(());
+        }
+
+        let commit_entries = self.messages_to_entries(&commit_messages);
+        let partition_predicate = 
self.build_dynamic_partition_predicate(&commit_messages)?;
+        self.try_commit(
+            CommitKind::OVERWRITE,
+            CommitEntriesPlan::Overwrite {
+                partition_predicate,
+                new_entries: commit_entries,
+            },
+        )
+        .await
+    }
+
+    /// Build a dynamic partition predicate from the partitions present in 
commit messages.
+    ///
+    /// Returns `None` for unpartitioned tables (full table overwrite).
+    fn build_dynamic_partition_predicate(
+        &self,
+        commit_messages: &[CommitMessage],
+    ) -> Result<Option<Predicate>> {
+        let partition_fields = self.table.schema().partition_fields();
+        if partition_fields.is_empty() {
+            return Ok(None);
         }
+
+        let data_types: Vec<_> = partition_fields
+            .iter()
+            .map(|f| f.data_type().clone())
+            .collect();
+        let partition_keys: Vec<_> = self
+            .table
+            .schema()
+            .partition_keys()
+            .iter()
+            .map(|s| s.to_string())
+            .collect();
+
+        // Collect unique partition bytes
+        let mut seen = std::collections::HashSet::new();
+        let mut partition_specs: Vec<HashMap<String, Option<Datum>>> = 
Vec::new();
+        for msg in commit_messages {
+            if seen.insert(msg.partition.clone()) {
+                let row = BinaryRow::from_serialized_bytes(&msg.partition)?;
+                let mut spec = HashMap::new();
+                for (i, key) in partition_keys.iter().enumerate() {
+                    spec.insert(key.clone(), extract_datum(&row, i, 
&data_types[i])?);
+                }
+                partition_specs.push(spec);
+            }
+        }
+
+        let predicates: Vec<Predicate> = partition_specs
+            .iter()
+            .map(|p| self.build_partition_predicate(p))
+            .collect::<Result<Vec<_>>>()?;
+
+        Ok(Some(Predicate::or(predicates)))
     }
 
-    /// Build a partition predicate from key-value pairs.
-    fn build_partition_predicate(&self, partition: &HashMap<String, Datum>) -> 
Result<Predicate> {
+    /// Build a partition predicate from key-value pairs, handling NULL via IS 
NULL.
+    fn build_partition_predicate(
+        &self,
+        partition: &HashMap<String, Option<Datum>>,
+    ) -> Result<Predicate> {
         let pb = 
PredicateBuilder::new(&self.table.schema().partition_fields());
         let predicates: Vec<Predicate> = partition
             .iter()
-            .map(|(key, value)| pb.equal(key, value.clone()))
+            .map(|(key, value)| match value {
+                Some(v) => pb.equal(key, v.clone()),
+                None => pb.is_null(key),
+            })
             .collect::<Result<Vec<_>>>()?;
         Ok(Predicate::and(predicates))
     }
 
     /// Drop specific partitions (OVERWRITE with only deletes).
-    pub async fn truncate_partitions(&self, partitions: Vec<HashMap<String, 
Datum>>) -> Result<()> {
+    pub async fn truncate_partitions(
+        &self,
+        partitions: Vec<HashMap<String, Option<Datum>>>,
+    ) -> Result<()> {
         if partitions.is_empty() {
             return Ok(());
         }
@@ -603,9 +658,11 @@ impl TableCommit {
         }
         let row = BinaryRow::from_serialized_bytes(partition_bytes)?;
         for (i, key) in partition_keys.iter().enumerate() {
-            if let Some(datum) = extract_datum(&row, i, &data_types[i])? {
-                spec.insert(key.clone(), datum.to_string());
-            }
+            let value = match extract_datum(&row, i, &data_types[i])? {
+                Some(datum) => datum.to_string(),
+                None => self.partition_default_name.clone(),
+            };
+            spec.insert(key.clone(), value);
         }
         Ok(spec)
     }
@@ -747,20 +804,15 @@ mod tests {
 
     fn setup_commit(file_io: &FileIO, table_path: &str) -> TableCommit {
         let table = test_table(file_io, table_path);
-        TableCommit::new(table, "test-user".to_string(), None)
+        TableCommit::new(table, "test-user".to_string())
     }
 
     fn setup_partitioned_commit(file_io: &FileIO, table_path: &str) -> 
TableCommit {
         let table = test_partitioned_table(file_io, table_path);
-        TableCommit::new(table, "test-user".to_string(), None)
+        TableCommit::new(table, "test-user".to_string())
     }
 
     fn partition_bytes(pt: &str) -> Vec<u8> {
-        use crate::spec::{DataType, VarCharType};
-        let datum = Datum::String(pt.to_string());
-        let dt = DataType::VarChar(VarCharType::string_type());
-        let datums = vec![(&datum, &dt)];
-        BinaryRow::from_datums(&datums).unwrap();
         let mut builder = BinaryRowBuilder::new(1);
         if pt.len() <= 7 {
             builder.write_string_inline(0, pt);
@@ -923,16 +975,9 @@ mod tests {
             .await
             .unwrap();
 
-        // Overwrite partition "a" with new data
-        let mut overwrite_partition = HashMap::new();
-        overwrite_partition.insert("pt".to_string(), 
Datum::String("a".to_string()));
-
-        let table = test_partitioned_table(&file_io, table_path);
-        let overwrite_commit =
-            TableCommit::new(table, "test-user".to_string(), 
Some(overwrite_partition));
-
-        overwrite_commit
-            .commit(vec![CommitMessage::new(
+        // Overwrite partition "a" with new data (dynamic partition overwrite)
+        commit
+            .overwrite(vec![CommitMessage::new(
                 partition_bytes("a"),
                 0,
                 vec![test_data_file("data-a2.parquet", 50)],
@@ -980,8 +1025,8 @@ mod tests {
 
         // Drop partitions "a" and "c"
         let partitions = vec![
-            HashMap::from([("pt".to_string(), 
Datum::String("a".to_string()))]),
-            HashMap::from([("pt".to_string(), 
Datum::String("c".to_string()))]),
+            HashMap::from([("pt".to_string(), 
Some(Datum::String("a".to_string())))]),
+            HashMap::from([("pt".to_string(), 
Some(Datum::String("c".to_string())))]),
         ];
         commit.truncate_partitions(partitions).await.unwrap();
 
@@ -992,4 +1037,58 @@ mod tests {
         // 600 - 100 (a) - 300 (c) = 200
         assert_eq!(snapshot.total_record_count(), Some(200));
     }
+
+    fn null_partition_bytes() -> Vec<u8> {
+        let mut builder = BinaryRowBuilder::new(1);
+        builder.set_null_at(0);
+        builder.build_serialized()
+    }
+
+    #[tokio::test]
+    async fn test_overwrite_null_partition() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_overwrite_null_partition";
+        setup_dirs(&file_io, table_path).await;
+
+        let commit = setup_partitioned_commit(&file_io, table_path);
+
+        // Append data for partition "a", "b", and NULL
+        commit
+            .commit(vec![
+                CommitMessage::new(
+                    partition_bytes("a"),
+                    0,
+                    vec![test_data_file("data-a.parquet", 100)],
+                ),
+                CommitMessage::new(
+                    partition_bytes("b"),
+                    0,
+                    vec![test_data_file("data-b.parquet", 200)],
+                ),
+                CommitMessage::new(
+                    null_partition_bytes(),
+                    0,
+                    vec![test_data_file("data-null.parquet", 300)],
+                ),
+            ])
+            .await
+            .unwrap();
+
+        // Overwrite NULL partition only — should NOT affect "a" or "b"
+        commit
+            .overwrite(vec![CommitMessage::new(
+                null_partition_bytes(),
+                0,
+                vec![test_data_file("data-null2.parquet", 50)],
+            )])
+            .await
+            .unwrap();
+
+        let snap_manager = SnapshotManager::new(file_io.clone(), 
table_path.to_string());
+        let snapshot = 
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+        assert_eq!(snapshot.id(), 2);
+        assert_eq!(snapshot.commit_kind(), &CommitKind::OVERWRITE);
+        // 600 - 300 (delete null) + 50 (add null2) = 350
+        assert_eq!(snapshot.total_record_count(), Some(350));
+    }
 }
diff --git a/crates/paimon/src/table/table_scan.rs 
b/crates/paimon/src/table/table_scan.rs
index 06fe87a..1d2f621 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -1375,4 +1375,64 @@ mod tests {
         let bucket = *buckets.iter().next().unwrap();
         assert!((0..4).contains(&bucket));
     }
+
+    #[test]
+    fn test_compute_target_buckets_is_null() {
+        let fields = bucket_key_fields();
+        let pred = Predicate::Leaf {
+            column: "id".into(),
+            index: 0,
+            data_type: DataType::Int(IntType::new()),
+            op: PredicateOperator::IsNull,
+            literals: vec![],
+        };
+
+        let buckets = compute_target_buckets(&pred, &fields, 4);
+        assert!(buckets.is_some(), "IsNull should determine a target bucket");
+        let buckets = buckets.unwrap();
+        assert_eq!(buckets.len(), 1);
+        let bucket = *buckets.iter().next().unwrap();
+        assert!((0..4).contains(&bucket));
+
+        // Verify it matches the expected bucket from a null BinaryRow
+        let mut builder = BinaryRowBuilder::new(1);
+        builder.set_null_at(0);
+        let expected = (builder.build().hash_code() % 4).abs();
+        assert_eq!(bucket, expected);
+    }
+
+    #[test]
+    fn test_compute_target_buckets_composite_key_with_null() {
+        let fields = vec![
+            DataField::new(0, "a".to_string(), DataType::Int(IntType::new())),
+            DataField::new(1, "b".to_string(), DataType::Int(IntType::new())),
+        ];
+        // a = 1 AND b IS NULL
+        let pred = Predicate::And(vec![
+            Predicate::Leaf {
+                column: "a".into(),
+                index: 0,
+                data_type: DataType::Int(IntType::new()),
+                op: PredicateOperator::Eq,
+                literals: vec![Datum::Int(1)],
+            },
+            Predicate::Leaf {
+                column: "b".into(),
+                index: 1,
+                data_type: DataType::Int(IntType::new()),
+                op: PredicateOperator::IsNull,
+                literals: vec![],
+            },
+        ]);
+
+        let buckets = compute_target_buckets(&pred, &fields, 8);
+        assert!(
+            buckets.is_some(),
+            "Composite key with IsNull should determine a target bucket"
+        );
+        let buckets = buckets.unwrap();
+        assert_eq!(buckets.len(), 1);
+        let bucket = *buckets.iter().next().unwrap();
+        assert!((0..8).contains(&bucket));
+    }
 }
diff --git a/crates/paimon/src/table/table_write.rs 
b/crates/paimon/src/table/table_write.rs
new file mode 100644
index 0000000..7d5252e
--- /dev/null
+++ b/crates/paimon/src/table/table_write.rs
@@ -0,0 +1,998 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TableWrite for writing Arrow data to Paimon tables.
+//!
+//! Reference: [pypaimon 
TableWrite](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/table_write.py)
+//! and [pypaimon 
FileStoreWrite](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/file_store_write.py)
+
+use crate::arrow::format::{create_format_writer, FormatFileWriter};
+use crate::io::FileIO;
+use crate::spec::stats::BinaryTableStats;
+use crate::spec::PartitionComputer;
+use crate::spec::{
+    extract_datum_from_arrow, BinaryRow, BinaryRowBuilder, CoreOptions, 
DataField, DataFileMeta,
+    DataType, Datum, EMPTY_SERIALIZED_ROW,
+};
+use crate::table::commit_message::CommitMessage;
+use crate::table::Table;
+use crate::Result;
+use arrow_array::RecordBatch;
+use chrono::Utc;
+use std::collections::HashMap;
+use std::sync::Arc;
+use tokio::task::JoinSet;
+
+type PartitionBucketKey = (Vec<u8>, i32);
+
+/// TableWrite writes Arrow RecordBatches to Paimon data files.
+///
+/// Each (partition, bucket) pair gets its own `DataFileWriter` held in a 
HashMap.
+/// Batches are routed to the correct writer based on partition/bucket.
+///
+/// Call `prepare_commit()` to close all writers and collect
+/// `CommitMessage`s for use with `TableCommit`.
+///
+/// Reference: [pypaimon 
BatchTableWrite](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/table_write.py)
+pub struct TableWrite {
+    table: Table,
+    partition_writers: HashMap<PartitionBucketKey, DataFileWriter>,
+    partition_computer: PartitionComputer,
+    partition_keys: Vec<String>,
+    partition_field_indices: Vec<usize>,
+    bucket_key_indices: Vec<usize>,
+    total_buckets: i32,
+    schema_id: i64,
+    target_file_size: i64,
+    file_compression: String,
+    file_compression_zstd_level: i32,
+    write_buffer_size: i64,
+}
+
+impl TableWrite {
+    pub(crate) fn new(table: &Table) -> crate::Result<Self> {
+        let schema = table.schema();
+        let core_options = CoreOptions::new(schema.options());
+
+        if !schema.primary_keys().is_empty() {
+            return Err(crate::Error::Unsupported {
+                message: "TableWrite does not support tables with primary 
keys".to_string(),
+            });
+        }
+        if core_options.data_evolution_enabled() {
+            return Err(crate::Error::Unsupported {
+                message: "TableWrite does not support data-evolution.enabled 
mode".to_string(),
+            });
+        }
+
+        let total_buckets = core_options.bucket();
+        if total_buckets != -1 && core_options.bucket_key().is_none() {
+            return Err(crate::Error::Unsupported {
+                message: "Append tables with fixed bucket must configure 
'bucket-key'".to_string(),
+            });
+        }
+        let target_file_size = core_options.target_file_size();
+        let file_compression = core_options.file_compression().to_string();
+        let file_compression_zstd_level = 
core_options.file_compression_zstd_level();
+        let write_buffer_size = core_options.write_parquet_buffer_size();
+        let partition_keys: Vec<String> = schema.partition_keys().to_vec();
+        let fields = schema.fields();
+
+        let partition_field_indices: Vec<usize> = partition_keys
+            .iter()
+            .filter_map(|pk| fields.iter().position(|f| f.name() == pk))
+            .collect();
+
+        // Bucket keys: resolved by TableSchema
+        let bucket_keys = schema.bucket_keys();
+
+        let bucket_key_indices: Vec<usize> = bucket_keys
+            .iter()
+            .filter_map(|bk| fields.iter().position(|f| f.name() == bk))
+            .collect();
+
+        let partition_computer = PartitionComputer::new(
+            &partition_keys,
+            fields,
+            core_options.partition_default_name(),
+            core_options.legacy_partition_name(),
+        )
+        .unwrap();
+
+        Ok(Self {
+            table: table.clone(),
+            partition_writers: HashMap::new(),
+            partition_computer,
+            partition_keys,
+            partition_field_indices,
+            bucket_key_indices,
+            total_buckets,
+            schema_id: schema.id(),
+            target_file_size,
+            file_compression,
+            file_compression_zstd_level,
+            write_buffer_size,
+        })
+    }
+
+    /// Write an Arrow RecordBatch. Rows are routed to the correct partition 
and bucket.
+    pub async fn write_arrow_batch(&mut self, batch: &RecordBatch) -> 
Result<()> {
+        if batch.num_rows() == 0 {
+            return Ok(());
+        }
+
+        let grouped = self.divide_by_partition_bucket(batch)?;
+        for ((partition_bytes, bucket), sub_batch) in grouped {
+            self.write_bucket(partition_bytes, bucket, sub_batch)
+                .await?;
+        }
+        Ok(())
+    }
+
+    /// Group rows by (partition_bytes, bucket) and return sub-batches.
+    fn divide_by_partition_bucket(
+        &self,
+        batch: &RecordBatch,
+    ) -> Result<Vec<(PartitionBucketKey, RecordBatch)>> {
+        // Fast path: no partitions and single bucket — skip per-row routing
+        if self.partition_field_indices.is_empty() && self.total_buckets <= 1 {
+            return Ok(vec![((EMPTY_SERIALIZED_ROW.clone(), 0), 
batch.clone())]);
+        }
+
+        let fields = self.table.schema().fields();
+        let mut groups: HashMap<PartitionBucketKey, Vec<usize>> = 
HashMap::new();
+
+        for row_idx in 0..batch.num_rows() {
+            let (partition_bytes, bucket) =
+                self.extract_partition_bucket(batch, row_idx, fields)?;
+            groups
+                .entry((partition_bytes, bucket))
+                .or_default()
+                .push(row_idx);
+        }
+
+        let mut result = Vec::with_capacity(groups.len());
+        for (key, row_indices) in groups {
+            let sub_batch = if row_indices.len() == batch.num_rows() {
+                batch.clone()
+            } else {
+                let indices = arrow_array::UInt32Array::from(
+                    row_indices.iter().map(|&i| i as u32).collect::<Vec<_>>(),
+                );
+                let columns: Vec<Arc<dyn arrow_array::Array>> = batch
+                    .columns()
+                    .iter()
+                    .map(|col| arrow_select::take::take(col.as_ref(), 
&indices, None))
+                    .collect::<std::result::Result<Vec<_>, _>>()
+                    .map_err(|e| crate::Error::DataInvalid {
+                        message: format!("Failed to take rows: {e}"),
+                        source: None,
+                    })?;
+                RecordBatch::try_new(batch.schema(), columns).map_err(|e| {
+                    crate::Error::DataInvalid {
+                        message: format!("Failed to create sub-batch: {e}"),
+                        source: None,
+                    }
+                })?
+            };
+            result.push((key, sub_batch));
+        }
+        Ok(result)
+    }
+
+    /// Write a batch directly to the DataFileWriter for the given (partition, 
bucket).
+    async fn write_bucket(
+        &mut self,
+        partition_bytes: Vec<u8>,
+        bucket: i32,
+        batch: RecordBatch,
+    ) -> Result<()> {
+        let key = (partition_bytes, bucket);
+        if !self.partition_writers.contains_key(&key) {
+            self.create_writer(key.0.clone(), key.1)?;
+        }
+        let writer = self.partition_writers.get_mut(&key).unwrap();
+        writer.write(&batch).await
+    }
+
+    /// Write multiple Arrow RecordBatches.
+    pub async fn write_arrow(&mut self, batches: &[RecordBatch]) -> Result<()> 
{
+        for batch in batches {
+            self.write_arrow_batch(batch).await?;
+        }
+        Ok(())
+    }
+
+    /// Close all writers and collect CommitMessages for use with TableCommit.
+    /// Writers are cleared after this call, allowing the TableWrite to be 
reused.
+    pub async fn prepare_commit(&mut self) -> Result<Vec<CommitMessage>> {
+        let writers: Vec<(PartitionBucketKey, DataFileWriter)> =
+            self.partition_writers.drain().collect();
+
+        let futures: Vec<_> = writers
+            .into_iter()
+            .map(|((partition_bytes, bucket), mut writer)| async move {
+                let files = writer.prepare_commit().await?;
+                Ok::<_, crate::Error>((partition_bytes, bucket, files))
+            })
+            .collect();
+
+        let results = futures::future::try_join_all(futures).await?;
+
+        let mut messages = Vec::new();
+        for (partition_bytes, bucket, files) in results {
+            if !files.is_empty() {
+                messages.push(CommitMessage::new(partition_bytes, bucket, 
files));
+            }
+        }
+        Ok(messages)
+    }
+
+    /// Extract partition bytes and bucket for a single row.
+    fn extract_partition_bucket(
+        &self,
+        batch: &RecordBatch,
+        row_idx: usize,
+        fields: &[DataField],
+    ) -> Result<PartitionBucketKey> {
+        // Build partition BinaryRow
+        let partition_bytes = if self.partition_field_indices.is_empty() {
+            EMPTY_SERIALIZED_ROW.clone()
+        } else {
+            let mut builder = 
BinaryRowBuilder::new(self.partition_field_indices.len() as i32);
+            for (pos, &field_idx) in 
self.partition_field_indices.iter().enumerate() {
+                let field = &fields[field_idx];
+                match extract_datum_from_arrow(batch, row_idx, field_idx, 
field.data_type())? {
+                    Some(datum) => builder.write_datum(pos, &datum, 
field.data_type()),
+                    None => builder.set_null_at(pos),
+                }
+            }
+            builder.build_serialized()
+        };
+
+        // Compute bucket
+        let bucket = if self.total_buckets <= 1 || 
self.bucket_key_indices.is_empty() {
+            0
+        } else {
+            let mut datums: Vec<(Option<Datum>, DataType)> = Vec::new();
+            for &field_idx in &self.bucket_key_indices {
+                let field = &fields[field_idx];
+                let datum = extract_datum_from_arrow(batch, row_idx, 
field_idx, field.data_type())?;
+                datums.push((datum, field.data_type().clone()));
+            }
+            let refs: Vec<(Option<&Datum>, &DataType)> =
+                datums.iter().map(|(d, t)| (d.as_ref(), t)).collect();
+            BinaryRow::compute_bucket_from_datums(&refs, self.total_buckets)
+        };
+
+        Ok((partition_bytes, bucket))
+    }
+
+    fn create_writer(&mut self, partition_bytes: Vec<u8>, bucket: i32) -> 
Result<()> {
+        let partition_path = if self.partition_keys.is_empty() {
+            String::new()
+        } else {
+            let row = BinaryRow::from_serialized_bytes(&partition_bytes)?;
+            self.partition_computer.generate_partition_path(&row)?
+        };
+
+        let writer = DataFileWriter::new(
+            self.table.file_io().clone(),
+            self.table.location().to_string(),
+            partition_path,
+            bucket,
+            self.schema_id,
+            self.target_file_size,
+            self.file_compression.clone(),
+            self.file_compression_zstd_level,
+            self.write_buffer_size,
+        );
+
+        self.partition_writers
+            .insert((partition_bytes, bucket), writer);
+        Ok(())
+    }
+}
+
+/// Internal writer that produces parquet data files for a single (partition, 
bucket).
+///
+/// Batches are accumulated into a single `FormatFileWriter` that streams 
directly
+/// to storage. Call `prepare_commit()` to finalize and collect file metadata.
+struct DataFileWriter {
+    file_io: FileIO,
+    table_location: String,
+    partition_path: String,
+    bucket: i32,
+    schema_id: i64,
+    target_file_size: i64,
+    file_compression: String,
+    file_compression_zstd_level: i32,
+    write_buffer_size: i64,
+    written_files: Vec<DataFileMeta>,
+    /// Background file close tasks spawned during rolling.
+    in_flight_closes: JoinSet<Result<DataFileMeta>>,
+    /// Current open format writer, lazily created on first write.
+    current_writer: Option<Box<dyn FormatFileWriter>>,
+    current_file_name: Option<String>,
+    current_row_count: i64,
+}
+
+impl DataFileWriter {
+    #[allow(clippy::too_many_arguments)]
+    fn new(
+        file_io: FileIO,
+        table_location: String,
+        partition_path: String,
+        bucket: i32,
+        schema_id: i64,
+        target_file_size: i64,
+        file_compression: String,
+        file_compression_zstd_level: i32,
+        write_buffer_size: i64,
+    ) -> Self {
+        Self {
+            file_io,
+            table_location,
+            partition_path,
+            bucket,
+            schema_id,
+            target_file_size,
+            file_compression,
+            file_compression_zstd_level,
+            write_buffer_size,
+            written_files: Vec::new(),
+            in_flight_closes: JoinSet::new(),
+            current_writer: None,
+            current_file_name: None,
+            current_row_count: 0,
+        }
+    }
+
+    /// Write a RecordBatch. Rolls to a new file when target size is reached.
+    async fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+        if batch.num_rows() == 0 {
+            return Ok(());
+        }
+
+        if self.current_writer.is_none() {
+            self.open_new_file(batch.schema()).await?;
+        }
+
+        self.current_row_count += batch.num_rows() as i64;
+        self.current_writer.as_mut().unwrap().write(batch).await?;
+
+        // Roll to a new file if target size is reached — close in background
+        if self.current_writer.as_ref().unwrap().num_bytes() as i64 >= 
self.target_file_size {
+            self.roll_file();
+        }
+
+        // Flush row group if in-progress buffer exceeds write_buffer_size
+        if let Some(w) = self.current_writer.as_mut() {
+            if w.in_progress_size() as i64 >= self.write_buffer_size {
+                w.flush().await?;
+            }
+        }
+
+        Ok(())
+    }
+
+    async fn open_new_file(&mut self, schema: arrow_schema::SchemaRef) -> 
Result<()> {
+        let file_name = format!(
+            "data-{}-{}.parquet",
+            uuid::Uuid::new_v4(),
+            self.written_files.len()
+        );
+
+        let bucket_dir = if self.partition_path.is_empty() {
+            format!("{}/bucket-{}", self.table_location, self.bucket)
+        } else {
+            format!(
+                "{}/{}/bucket-{}",
+                self.table_location, self.partition_path, self.bucket
+            )
+        };
+        self.file_io.mkdirs(&format!("{bucket_dir}/")).await?;
+
+        let file_path = format!("{}/{}", bucket_dir, file_name);
+        let output = self.file_io.new_output(&file_path)?;
+        let writer = create_format_writer(
+            &output,
+            schema,
+            &self.file_compression,
+            self.file_compression_zstd_level,
+        )
+        .await?;
+        self.current_writer = Some(writer);
+        self.current_file_name = Some(file_name);
+        self.current_row_count = 0;
+        Ok(())
+    }
+
+    /// Close the current file writer and record the file metadata.
+    async fn close_current_file(&mut self) -> Result<()> {
+        let writer = match self.current_writer.take() {
+            Some(w) => w,
+            None => return Ok(()),
+        };
+        let file_name = self.current_file_name.take().unwrap();
+
+        let row_count = self.current_row_count;
+        self.current_row_count = 0;
+        let file_size = writer.close().await? as i64;
+
+        let meta = Self::build_meta(file_name, file_size, row_count, 
self.schema_id);
+        self.written_files.push(meta);
+        Ok(())
+    }
+
+    /// Spawn the current writer's close in the background for non-blocking 
rolling.
+    fn roll_file(&mut self) {
+        let writer = match self.current_writer.take() {
+            Some(w) => w,
+            None => return,
+        };
+        let file_name = self.current_file_name.take().unwrap();
+        let row_count = self.current_row_count;
+        self.current_row_count = 0;
+        let schema_id = self.schema_id;
+
+        self.in_flight_closes.spawn(async move {
+            let file_size = writer.close().await? as i64;
+            Ok(Self::build_meta(file_name, file_size, row_count, schema_id))
+        });
+    }
+
+    /// Close the current writer and return all written file metadata.
+    async fn prepare_commit(&mut self) -> Result<Vec<DataFileMeta>> {
+        self.close_current_file().await?;
+        while let Some(result) = self.in_flight_closes.join_next().await {
+            let meta = result.map_err(|e| crate::Error::DataInvalid {
+                message: format!("Background file close task panicked: {e}"),
+                source: None,
+            })??;
+            self.written_files.push(meta);
+        }
+        Ok(std::mem::take(&mut self.written_files))
+    }
+
+    fn build_meta(
+        file_name: String,
+        file_size: i64,
+        row_count: i64,
+        schema_id: i64,
+    ) -> DataFileMeta {
+        DataFileMeta {
+            file_name,
+            file_size,
+            row_count,
+            min_key: EMPTY_SERIALIZED_ROW.clone(),
+            max_key: EMPTY_SERIALIZED_ROW.clone(),
+            key_stats: BinaryTableStats::new(
+                EMPTY_SERIALIZED_ROW.clone(),
+                EMPTY_SERIALIZED_ROW.clone(),
+                vec![],
+            ),
+            value_stats: BinaryTableStats::new(
+                EMPTY_SERIALIZED_ROW.clone(),
+                EMPTY_SERIALIZED_ROW.clone(),
+                vec![],
+            ),
+            min_sequence_number: 0,
+            max_sequence_number: 0,
+            schema_id,
+            level: 0,
+            extra_files: vec![],
+            creation_time: Some(Utc::now()),
+            delete_row_count: Some(0),
+            embedded_index: None,
+            file_source: Some(0), // APPEND
+            value_stats_cols: Some(vec![]),
+            external_path: None,
+            first_row_id: None,
+            write_cols: None,
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::catalog::Identifier;
+    use crate::io::FileIOBuilder;
+    use crate::spec::{
+        DataType, DecimalType, IntType, LocalZonedTimestampType, Schema, 
TableSchema,
+        TimestampType, VarCharType,
+    };
+    use crate::table::{SnapshotManager, TableCommit};
+    use arrow_array::Int32Array;
+    use arrow_schema::{
+        DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, 
TimeUnit,
+    };
+    use std::sync::Arc;
+
+    fn test_file_io() -> FileIO {
+        FileIOBuilder::new("memory").build().unwrap()
+    }
+
+    fn test_schema() -> TableSchema {
+        let schema = Schema::builder()
+            .column("id", DataType::Int(IntType::new()))
+            .column("value", DataType::Int(IntType::new()))
+            .build()
+            .unwrap();
+        TableSchema::new(0, &schema)
+    }
+
+    fn test_partitioned_schema() -> TableSchema {
+        let schema = Schema::builder()
+            .column("pt", DataType::VarChar(VarCharType::string_type()))
+            .column("id", DataType::Int(IntType::new()))
+            .partition_keys(["pt"])
+            .build()
+            .unwrap();
+        TableSchema::new(0, &schema)
+    }
+
+    fn test_table(file_io: &FileIO, table_path: &str) -> Table {
+        Table::new(
+            file_io.clone(),
+            Identifier::new("default", "test_table"),
+            table_path.to_string(),
+            test_schema(),
+            None,
+        )
+    }
+
+    fn test_partitioned_table(file_io: &FileIO, table_path: &str) -> Table {
+        Table::new(
+            file_io.clone(),
+            Identifier::new("default", "test_table"),
+            table_path.to_string(),
+            test_partitioned_schema(),
+            None,
+        )
+    }
+
+    async fn setup_dirs(file_io: &FileIO, table_path: &str) {
+        file_io
+            .mkdirs(&format!("{table_path}/snapshot/"))
+            .await
+            .unwrap();
+        file_io
+            .mkdirs(&format!("{table_path}/manifest/"))
+            .await
+            .unwrap();
+    }
+
+    fn make_batch(ids: Vec<i32>, values: Vec<i32>) -> RecordBatch {
+        let schema = Arc::new(ArrowSchema::new(vec![
+            ArrowField::new("id", ArrowDataType::Int32, false),
+            ArrowField::new("value", ArrowDataType::Int32, false),
+        ]));
+        RecordBatch::try_new(
+            schema,
+            vec![
+                Arc::new(Int32Array::from(ids)),
+                Arc::new(Int32Array::from(values)),
+            ],
+        )
+        .unwrap()
+    }
+
+    fn make_partitioned_batch(pts: Vec<&str>, ids: Vec<i32>) -> RecordBatch {
+        let schema = Arc::new(ArrowSchema::new(vec![
+            ArrowField::new("pt", ArrowDataType::Utf8, false),
+            ArrowField::new("id", ArrowDataType::Int32, false),
+        ]));
+        RecordBatch::try_new(
+            schema,
+            vec![
+                Arc::new(arrow_array::StringArray::from(pts)),
+                Arc::new(Int32Array::from(ids)),
+            ],
+        )
+        .unwrap()
+    }
+
+    #[tokio::test]
+    async fn test_write_and_commit() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_table_write";
+        setup_dirs(&file_io, table_path).await;
+
+        let table = test_table(&file_io, table_path);
+        let mut table_write = TableWrite::new(&table).unwrap();
+
+        let batch = make_batch(vec![1, 2, 3], vec![10, 20, 30]);
+        table_write.write_arrow_batch(&batch).await.unwrap();
+
+        let messages = table_write.prepare_commit().await.unwrap();
+        assert_eq!(messages.len(), 1);
+        assert_eq!(messages[0].bucket, 0);
+        assert_eq!(messages[0].new_files.len(), 1);
+        assert_eq!(messages[0].new_files[0].row_count, 3);
+
+        // Commit and verify snapshot
+        let commit = TableCommit::new(table, "test-user".to_string());
+        commit.commit(messages).await.unwrap();
+
+        let snap_manager = SnapshotManager::new(file_io.clone(), 
table_path.to_string());
+        let snapshot = 
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+        assert_eq!(snapshot.id(), 1);
+        assert_eq!(snapshot.total_record_count(), Some(3));
+    }
+
+    #[tokio::test]
+    async fn test_write_partitioned() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_table_write_partitioned";
+        setup_dirs(&file_io, table_path).await;
+
+        let table = test_partitioned_table(&file_io, table_path);
+        let mut table_write = TableWrite::new(&table).unwrap();
+
+        let batch = make_partitioned_batch(vec!["a", "b", "a"], vec![1, 2, 3]);
+        table_write.write_arrow_batch(&batch).await.unwrap();
+
+        let messages = table_write.prepare_commit().await.unwrap();
+        // Should have 2 commit messages (one per partition)
+        assert_eq!(messages.len(), 2);
+
+        let total_rows: i64 = messages
+            .iter()
+            .flat_map(|m| &m.new_files)
+            .map(|f| f.row_count)
+            .sum();
+        assert_eq!(total_rows, 3);
+
+        // Commit and verify
+        let commit = TableCommit::new(table, "test-user".to_string());
+        commit.commit(messages).await.unwrap();
+
+        let snap_manager = SnapshotManager::new(file_io.clone(), 
table_path.to_string());
+        let snapshot = 
snap_manager.get_latest_snapshot().await.unwrap().unwrap();
+        assert_eq!(snapshot.id(), 1);
+        assert_eq!(snapshot.total_record_count(), Some(3));
+    }
+
+    #[tokio::test]
+    async fn test_write_empty_batch() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_table_write_empty";
+        let table = test_table(&file_io, table_path);
+        let mut table_write = TableWrite::new(&table).unwrap();
+
+        let batch = make_batch(vec![], vec![]);
+        table_write.write_arrow_batch(&batch).await.unwrap();
+
+        let messages = table_write.prepare_commit().await.unwrap();
+        assert!(messages.is_empty());
+    }
+
+    #[tokio::test]
+    async fn test_prepare_commit_reusable() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_table_write_reuse";
+        setup_dirs(&file_io, table_path).await;
+
+        let table = test_table(&file_io, table_path);
+        let mut table_write = TableWrite::new(&table).unwrap();
+
+        // First write + prepare_commit
+        table_write
+            .write_arrow_batch(&make_batch(vec![1, 2], vec![10, 20]))
+            .await
+            .unwrap();
+        let messages1 = table_write.prepare_commit().await.unwrap();
+        assert_eq!(messages1.len(), 1);
+        assert_eq!(messages1[0].new_files[0].row_count, 2);
+
+        // Second write + prepare_commit (reuse)
+        table_write
+            .write_arrow_batch(&make_batch(vec![3, 4, 5], vec![30, 40, 50]))
+            .await
+            .unwrap();
+        let messages2 = table_write.prepare_commit().await.unwrap();
+        assert_eq!(messages2.len(), 1);
+        assert_eq!(messages2[0].new_files[0].row_count, 3);
+
+        // Empty prepare_commit is fine
+        let messages3 = table_write.prepare_commit().await.unwrap();
+        assert!(messages3.is_empty());
+    }
+
+    #[tokio::test]
+    async fn test_write_multiple_batches() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_table_write_multi";
+        setup_dirs(&file_io, table_path).await;
+
+        let table = test_table(&file_io, table_path);
+        let mut table_write = TableWrite::new(&table).unwrap();
+
+        table_write
+            .write_arrow_batch(&make_batch(vec![1, 2], vec![10, 20]))
+            .await
+            .unwrap();
+        table_write
+            .write_arrow_batch(&make_batch(vec![3, 4], vec![30, 40]))
+            .await
+            .unwrap();
+
+        let messages = table_write.prepare_commit().await.unwrap();
+        assert_eq!(messages.len(), 1);
+        // Multiple batches accumulate into a single file
+        assert_eq!(messages[0].new_files.len(), 1);
+
+        let total_rows: i64 = messages[0].new_files.iter().map(|f| 
f.row_count).sum();
+        assert_eq!(total_rows, 4);
+    }
+
+    fn test_bucketed_schema() -> TableSchema {
+        let schema = Schema::builder()
+            .column("id", DataType::Int(IntType::new()))
+            .column("value", DataType::Int(IntType::new()))
+            .option("bucket", "4")
+            .option("bucket-key", "id")
+            .build()
+            .unwrap();
+        TableSchema::new(0, &schema)
+    }
+
+    fn test_bucketed_table(file_io: &FileIO, table_path: &str) -> Table {
+        Table::new(
+            file_io.clone(),
+            Identifier::new("default", "test_table"),
+            table_path.to_string(),
+            test_bucketed_schema(),
+            None,
+        )
+    }
+
+    /// Build a batch where the bucket-key column ("id") is nullable.
+    fn make_nullable_id_batch(ids: Vec<Option<i32>>, values: Vec<i32>) -> 
RecordBatch {
+        let schema = Arc::new(ArrowSchema::new(vec![
+            ArrowField::new("id", ArrowDataType::Int32, true),
+            ArrowField::new("value", ArrowDataType::Int32, false),
+        ]));
+        RecordBatch::try_new(
+            schema,
+            vec![
+                Arc::new(Int32Array::from(ids)),
+                Arc::new(Int32Array::from(values)),
+            ],
+        )
+        .unwrap()
+    }
+
+    #[tokio::test]
+    async fn test_write_bucketed_with_null_bucket_key() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_table_write_null_bk";
+        setup_dirs(&file_io, table_path).await;
+
+        let table = test_bucketed_table(&file_io, table_path);
+        let mut table_write = TableWrite::new(&table).unwrap();
+
+        // Row with NULL bucket key should not panic
+        let batch = make_nullable_id_batch(vec![None, Some(1), None], vec![10, 
20, 30]);
+        table_write.write_arrow_batch(&batch).await.unwrap();
+
+        let messages = table_write.prepare_commit().await.unwrap();
+        let total_rows: i64 = messages
+            .iter()
+            .flat_map(|m| &m.new_files)
+            .map(|f| f.row_count)
+            .sum();
+        assert_eq!(total_rows, 3);
+    }
+
+    #[tokio::test]
+    async fn test_null_bucket_key_routes_consistently() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_table_write_null_bk_consistent";
+        setup_dirs(&file_io, table_path).await;
+
+        let table = test_bucketed_table(&file_io, table_path);
+        let mut table_write = TableWrite::new(&table).unwrap();
+
+        // Two NULLs should land in the same bucket
+        let batch = make_nullable_id_batch(vec![None, None], vec![10, 20]);
+        table_write.write_arrow_batch(&batch).await.unwrap();
+
+        let messages = table_write.prepare_commit().await.unwrap();
+        // Both NULL-key rows must be in the same (partition, bucket) group
+        let null_bucket_rows: i64 = messages
+            .iter()
+            .flat_map(|m| &m.new_files)
+            .map(|f| f.row_count)
+            .sum();
+        assert_eq!(null_bucket_rows, 2);
+        // All NULL-key rows go to exactly one bucket
+        assert_eq!(messages.len(), 1);
+    }
+
+    #[tokio::test]
+    async fn test_null_vs_nonnull_bucket_key_differ() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_table_write_null_vs_nonnull";
+        setup_dirs(&file_io, table_path).await;
+
+        let table = test_bucketed_table(&file_io, table_path);
+
+        // Compute bucket for NULL key
+        let fields = table.schema().fields().to_vec();
+        let tw = TableWrite::new(&table).unwrap();
+
+        let batch_null = make_nullable_id_batch(vec![None], vec![10]);
+        let (_, bucket_null) = tw
+            .extract_partition_bucket(&batch_null, 0, &fields)
+            .unwrap();
+
+        // Compute bucket for key = 0 (the value a null field's fixed bytes 
happen to be)
+        let batch_zero = make_nullable_id_batch(vec![Some(0)], vec![20]);
+        let (_, bucket_zero) = tw
+            .extract_partition_bucket(&batch_zero, 0, &fields)
+            .unwrap();
+
+        // A NULL bucket key must produce a BinaryRow with the null bit set,
+        // which hashes differently from a non-null 0 value.
+        // (With 4 buckets they could theoretically collide, but the hash 
codes differ.)
+        let mut builder_null = BinaryRowBuilder::new(1);
+        builder_null.set_null_at(0);
+        let hash_null = builder_null.build().hash_code();
+
+        let mut builder_zero = BinaryRowBuilder::new(1);
+        builder_zero.write_int(0, 0);
+        let hash_zero = builder_zero.build().hash_code();
+
+        assert_ne!(hash_null, hash_zero, "NULL and 0 should hash differently");
+        // If hashes differ, buckets should differ (with 4 buckets, very 
likely)
+        // But we verify the hash difference is the important invariant
+        let _ = (bucket_null, bucket_zero);
+    }
+
+    /// Mirrors Java's testUnCompactDecimalAndTimestampNullValueBucketNumber.
+    /// Non-compact types (Decimal(38,18), LocalZonedTimestamp(6), 
Timestamp(6))
+    /// use variable-length encoding in BinaryRow — NULL handling must still 
work.
+    #[tokio::test]
+    async fn test_non_compact_null_bucket_key() {
+        let file_io = test_file_io();
+
+        let bucket_cols = ["d", "ltz", "ntz"];
+        let total_buckets = 16;
+
+        for bucket_col in &bucket_cols {
+            let table_path = format!("memory:/test_null_bk_{bucket_col}");
+            setup_dirs(&file_io, &table_path).await;
+
+            let schema = Schema::builder()
+                .column("d", DataType::Decimal(DecimalType::new(38, 
18).unwrap()))
+                .column(
+                    "ltz",
+                    
DataType::LocalZonedTimestamp(LocalZonedTimestampType::new(6).unwrap()),
+                )
+                .column("ntz", 
DataType::Timestamp(TimestampType::new(6).unwrap()))
+                .column("k", DataType::Int(IntType::new()))
+                .option("bucket", total_buckets.to_string())
+                .option("bucket-key", *bucket_col)
+                .build()
+                .unwrap();
+            let table_schema = TableSchema::new(0, &schema);
+            let table = Table::new(
+                file_io.clone(),
+                Identifier::new("default", "test_table"),
+                table_path.to_string(),
+                table_schema,
+                None,
+            );
+
+            let tw = TableWrite::new(&table).unwrap();
+            let fields = table.schema().fields().to_vec();
+
+            // Build a batch: d=NULL, ltz=NULL, ntz=NULL, k=1
+            let arrow_schema = Arc::new(ArrowSchema::new(vec![
+                ArrowField::new("d", ArrowDataType::Decimal128(38, 18), true),
+                ArrowField::new(
+                    "ltz",
+                    ArrowDataType::Timestamp(TimeUnit::Microsecond, 
Some("UTC".into())),
+                    true,
+                ),
+                ArrowField::new(
+                    "ntz",
+                    ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
+                    true,
+                ),
+                ArrowField::new("k", ArrowDataType::Int32, false),
+            ]));
+            let batch = RecordBatch::try_new(
+                arrow_schema,
+                vec![
+                    Arc::new(
+                        arrow_array::Decimal128Array::from(vec![None::<i128>])
+                            .with_precision_and_scale(38, 18)
+                            .unwrap(),
+                    ),
+                    Arc::new(
+                        
arrow_array::TimestampMicrosecondArray::from(vec![None::<i64>])
+                            .with_timezone("UTC"),
+                    ),
+                    Arc::new(arrow_array::TimestampMicrosecondArray::from(vec![
+                        None::<i64>,
+                    ])),
+                    Arc::new(Int32Array::from(vec![1])),
+                ],
+            )
+            .unwrap();
+
+            let (_, bucket) = tw.extract_partition_bucket(&batch, 0, 
&fields).unwrap();
+
+            // Expected: BinaryRow with 1 field, null at pos 0
+            let mut builder = BinaryRowBuilder::new(1);
+            builder.set_null_at(0);
+            let expected_bucket = (builder.build().hash_code() % 
total_buckets).abs();
+
+            assert_eq!(
+                bucket, expected_bucket,
+                "NULL bucket-key '{bucket_col}' should produce bucket 
{expected_bucket}, got {bucket}"
+            );
+        }
+    }
+
+    #[tokio::test]
+    async fn test_write_rolling_on_target_file_size() {
+        let file_io = test_file_io();
+        let table_path = "memory:/test_table_write_rolling";
+        setup_dirs(&file_io, table_path).await;
+
+        // Create table with very small target-file-size to trigger rolling
+        let schema = Schema::builder()
+            .column("id", DataType::Int(IntType::new()))
+            .column("value", DataType::Int(IntType::new()))
+            .option("target-file-size", "1b")
+            .build()
+            .unwrap();
+        let table_schema = TableSchema::new(0, &schema);
+        let table = Table::new(
+            file_io.clone(),
+            Identifier::new("default", "test_table"),
+            table_path.to_string(),
+            table_schema,
+            None,
+        );
+
+        let mut table_write = TableWrite::new(&table).unwrap();
+
+        // Write multiple batches — each should roll to a new file
+        table_write
+            .write_arrow_batch(&make_batch(vec![1, 2], vec![10, 20]))
+            .await
+            .unwrap();
+        table_write
+            .write_arrow_batch(&make_batch(vec![3, 4], vec![30, 40]))
+            .await
+            .unwrap();
+
+        let messages = table_write.prepare_commit().await.unwrap();
+        assert_eq!(messages.len(), 1);
+        // With 1-byte target, each batch should produce a separate file
+        assert_eq!(messages[0].new_files.len(), 2);
+
+        let total_rows: i64 = messages[0].new_files.iter().map(|f| 
f.row_count).sum();
+        assert_eq!(total_rows, 4);
+    }
+}
diff --git a/crates/paimon/src/table/write_builder.rs 
b/crates/paimon/src/table/write_builder.rs
index d6458cf..45db333 100644
--- a/crates/paimon/src/table/write_builder.rs
+++ b/crates/paimon/src/table/write_builder.rs
@@ -19,9 +19,7 @@
 //!
 //! Reference: [pypaimon 
WriteBuilder](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/write_builder.py)
 
-use crate::spec::Datum;
-use crate::table::{Table, TableCommit};
-use std::collections::HashMap;
+use crate::table::{Table, TableCommit, TableWrite};
 use uuid::Uuid;
 
 /// Builder for creating table writers and committers.
@@ -31,7 +29,6 @@ use uuid::Uuid;
 pub struct WriteBuilder<'a> {
     table: &'a Table,
     commit_user: String,
-    overwrite_partition: Option<HashMap<String, Datum>>,
 }
 
 impl<'a> WriteBuilder<'a> {
@@ -39,25 +36,16 @@ impl<'a> WriteBuilder<'a> {
         Self {
             table,
             commit_user: Uuid::new_v4().to_string(),
-            overwrite_partition: None,
         }
     }
 
-    /// Set overwrite mode. If `partition` is None, overwrites the entire 
table.
-    /// If `partition` is Some, overwrites only the specified partition.
-    pub fn overwrite(&mut self, partition: Option<HashMap<String, Datum>>) -> 
&mut Self {
-        self.overwrite_partition = Some(partition.unwrap_or_default());
-        self
-    }
-
     /// Create a new TableCommit for committing write results.
     pub fn new_commit(&self) -> TableCommit {
-        TableCommit::new(
-            self.table.clone(),
-            self.commit_user.clone(),
-            self.overwrite_partition.clone(),
-        )
+        TableCommit::new(self.table.clone(), self.commit_user.clone())
     }
 
-    // TODO: pub fn new_write(&self) -> TableWrite { ... }
+    /// Create a new TableWrite for writing Arrow data.
+    pub fn new_write(&self) -> crate::Result<TableWrite> {
+        TableWrite::new(self.table)
+    }
 }

Reply via email to