This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new cc13352  feat(go): add filter push-down and predicate API for Go 
binding (#216)
cc13352 is described below

commit cc1335207d5755f4b20e71c08c0209ea13135b88
Author: yuxia Luo <[email protected]>
AuthorDate: Wed Apr 8 12:13:33 2026 +0800

    feat(go): add filter push-down and predicate API for Go binding (#216)
    
    Add complete predicate support to the Go binding, enabling filter
    push-down for table scans. This includes:
    
    - C FFI layer: predicate construction functions (equal, not_equal,
      less_than, greater_than, is_null, is_in, etc.), combinators
      (and/or/not), with_filter on ReadBuilder, and fix scan to
      propagate filters via TableScanState
    - Go binding: Predicate type, Go-native value types (Date, Time,
      Timestamp, LocalZonedTimestamp, Decimal, Bytes), automatic
      type inference from Go literals (int, string, bool, etc.),
      and ReadBuilder.WithFilter method
    - Tests: TestReadWithFilter with shared test helpers
---
 bindings/c/src/result.rs                |   6 +
 bindings/c/src/table.rs                 | 586 ++++++++++++++++++++++++++++++-
 bindings/c/src/types.rs                 |  61 +++-
 bindings/go/predicate.go                | 595 ++++++++++++++++++++++++++++++++
 bindings/go/predicate/predicate.go      |  70 ++++
 bindings/go/read_builder.go             |  60 ++++
 bindings/go/table.go                    |   5 +
 bindings/go/tests/paimon_test.go        | 186 ++++++----
 bindings/go/types.go                    |  47 +++
 crates/paimon/src/table/read_builder.rs |   5 +
 docs/mkdocs.yml                         |   1 +
 docs/src/go-binding.md                  | 397 +++++++++++++++++++++
 12 files changed, 1925 insertions(+), 94 deletions(-)

diff --git a/bindings/c/src/result.rs b/bindings/c/src/result.rs
index a4fd62b..19d523c 100644
--- a/bindings/c/src/result.rs
+++ b/bindings/c/src/result.rs
@@ -66,6 +66,12 @@ pub struct paimon_result_record_batch_reader {
     pub error: *mut paimon_error,
 }
 
+#[repr(C)]
+pub struct paimon_result_predicate {
+    pub predicate: *mut paimon_predicate,
+    pub error: *mut paimon_error,
+}
+
 #[repr(C)]
 pub struct paimon_result_next_batch {
     pub batch: paimon_arrow_batch,
diff --git a/bindings/c/src/table.rs b/bindings/c/src/table.rs
index 6f6637d..c6496f6 100644
--- a/bindings/c/src/table.rs
+++ b/bindings/c/src/table.rs
@@ -20,23 +20,18 @@ use std::ffi::c_void;
 use arrow_array::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
 use arrow_array::{Array, StructArray};
 use futures::StreamExt;
+use paimon::spec::{DataField, DataType, Datum, Predicate, PredicateBuilder};
 use paimon::table::{ArrowRecordBatchStream, Table};
 use paimon::Plan;
 
-use crate::error::{check_non_null, paimon_error};
+use crate::error::{check_non_null, paimon_error, validate_cstr, 
PaimonErrorCode};
 use crate::result::{
-    paimon_result_new_read, paimon_result_next_batch, paimon_result_plan,
+    paimon_result_new_read, paimon_result_next_batch, paimon_result_plan, 
paimon_result_predicate,
     paimon_result_read_builder, paimon_result_record_batch_reader, 
paimon_result_table_scan,
 };
 use crate::runtime;
 use crate::types::*;
 
-// Helper to box a Table clone into a wrapper struct and return a raw pointer.
-unsafe fn box_table_wrapper<T>(table: &Table, make: impl FnOnce(*mut c_void) 
-> T) -> *mut T {
-    let inner = Box::into_raw(Box::new(table.clone())) as *mut c_void;
-    Box::into_raw(Box::new(make(inner)))
-}
-
 // Helper to free a wrapper struct that contains a Table clone.
 unsafe fn free_table_wrapper<T>(ptr: *mut T, get_inner: impl FnOnce(&T) -> 
*mut c_void) {
     if !ptr.is_null() {
@@ -89,6 +84,7 @@ pub unsafe extern "C" fn paimon_table_new_read_builder(
     let state = ReadBuilderState {
         table: table_ref.clone(),
         projected_columns: None,
+        filter: None,
     };
     paimon_result_read_builder {
         read_builder: box_read_builder_state(state),
@@ -157,6 +153,36 @@ pub unsafe extern "C" fn 
paimon_read_builder_with_projection(
     std::ptr::null_mut()
 }
 
+/// Set a filter predicate for scan planning.
+///
+/// The predicate is consumed (ownership transferred to the read builder).
+/// Pass null to clear any previously set filter.
+///
+/// # Safety
+/// `rb` must be a valid pointer from `paimon_table_new_read_builder`, or null 
(returns error).
+/// `predicate` must be a valid pointer from a `paimon_predicate_*` function, 
or null.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_read_builder_with_filter(
+    rb: *mut paimon_read_builder,
+    predicate: *mut paimon_predicate,
+) -> *mut paimon_error {
+    if let Err(e) = check_non_null(rb, "rb") {
+        return e;
+    }
+
+    let state = &mut *((*rb).inner as *mut ReadBuilderState);
+
+    if predicate.is_null() {
+        state.filter = None;
+        return std::ptr::null_mut();
+    }
+
+    let pred_wrapper = Box::from_raw(predicate);
+    let pred = Box::from_raw(pred_wrapper.inner as *mut Predicate);
+    state.filter = Some(*pred);
+    std::ptr::null_mut()
+}
+
 /// Create a new TableScan from a ReadBuilder.
 ///
 /// # Safety
@@ -172,8 +198,13 @@ pub unsafe extern "C" fn paimon_read_builder_new_scan(
         };
     }
     let state = &*((*rb).inner as *const ReadBuilderState);
+    let scan_state = TableScanState {
+        table: state.table.clone(),
+        filter: state.filter.clone(),
+    };
+    let inner = Box::into_raw(Box::new(scan_state)) as *mut c_void;
     paimon_result_table_scan {
-        scan: box_table_wrapper(&state.table, |inner| paimon_table_scan { 
inner }),
+        scan: Box::into_raw(Box::new(paimon_table_scan { inner })),
         error: std::ptr::null_mut(),
     }
 }
@@ -201,11 +232,17 @@ pub unsafe extern "C" fn paimon_read_builder_new_read(
         rb_rust.with_projection(&col_refs);
     }
 
+    // Apply filter if set
+    if let Some(ref filter) = state.filter {
+        rb_rust.with_filter(filter.clone());
+    }
+
     match rb_rust.new_read() {
         Ok(table_read) => {
             let read_state = TableReadState {
                 table: state.table.clone(),
                 read_type: table_read.read_type().to_vec(),
+                data_predicates: table_read.data_predicates().to_vec(),
             };
             paimon_result_new_read {
                 read: box_table_read_state(read_state),
@@ -227,7 +264,12 @@ pub unsafe extern "C" fn paimon_read_builder_new_read(
 /// Only call with a scan returned from `paimon_read_builder_new_scan`.
 #[no_mangle]
 pub unsafe extern "C" fn paimon_table_scan_free(scan: *mut paimon_table_scan) {
-    free_table_wrapper(scan, |s| s.inner);
+    if !scan.is_null() {
+        let wrapper = Box::from_raw(scan);
+        if !wrapper.inner.is_null() {
+            drop(Box::from_raw(wrapper.inner as *mut TableScanState));
+        }
+    }
 }
 
 /// Execute a scan plan to get splits.
@@ -244,8 +286,11 @@ pub unsafe extern "C" fn paimon_table_scan_plan(
             error: e,
         };
     }
-    let table = &*((*scan).inner as *const Table);
-    let rb = table.new_read_builder();
+    let scan_state = &*((*scan).inner as *const TableScanState);
+    let mut rb = scan_state.table.new_read_builder();
+    if let Some(ref filter) = scan_state.filter {
+        rb.with_filter(filter.clone());
+    }
     let table_scan = rb.new_scan();
 
     match runtime().block_on(table_scan.plan()) {
@@ -349,10 +394,11 @@ pub unsafe extern "C" fn paimon_table_read_to_arrow(
     let end = (offset.saturating_add(length)).min(all_splits.len());
     let selected = &all_splits[start..end];
 
-    // C bindings currently persist only the projection, so reconstructing the
-    // read uses an empty predicate set.
-    let table_read =
-        paimon::table::TableRead::new(&state.table, state.read_type.clone(), 
Vec::new());
+    let table_read = paimon::table::TableRead::new(
+        &state.table,
+        state.read_type.clone(),
+        state.data_predicates.clone(),
+    );
 
     match table_read.to_arrow(selected) {
         Ok(stream) => {
@@ -476,3 +522,511 @@ pub unsafe extern "C" fn paimon_arrow_batch_free(batch: 
paimon_arrow_batch) {
         drop(Box::from_raw(batch.schema as *mut FFI_ArrowSchema));
     }
 }
+
+// ======================= Predicate ===============================
+
+/// Convert a C datum to a Rust Datum.
+unsafe fn datum_from_c(d: &paimon_datum) -> Result<Datum, *mut paimon_error> {
+    match d.tag {
+        0 => Ok(Datum::Bool(d.int_val != 0)),
+        1 => Ok(Datum::TinyInt(d.int_val as i8)),
+        2 => Ok(Datum::SmallInt(d.int_val as i16)),
+        3 => Ok(Datum::Int(d.int_val as i32)),
+        4 => Ok(Datum::Long(d.int_val)),
+        5 => Ok(Datum::Float(d.double_val as f32)),
+        6 => Ok(Datum::Double(d.double_val)),
+        7 => {
+            if d.str_len == 0 {
+                return Ok(Datum::String(String::new()));
+            }
+            if d.str_data.is_null() {
+                return Err(paimon_error::new(
+                    PaimonErrorCode::InvalidInput,
+                    "null string data in datum with non-zero 
length".to_string(),
+                ));
+            }
+            let bytes = std::slice::from_raw_parts(d.str_data, d.str_len);
+            let s = std::str::from_utf8(bytes).map_err(|e| {
+                paimon_error::new(
+                    PaimonErrorCode::InvalidInput,
+                    format!("invalid UTF-8 in datum string: {e}"),
+                )
+            })?;
+            Ok(Datum::String(s.to_string()))
+        }
+        8 => Ok(Datum::Date(d.int_val as i32)),
+        9 => Ok(Datum::Time(d.int_val as i32)),
+        10 => Ok(Datum::Timestamp {
+            millis: d.int_val,
+            nanos: d.int_val2 as i32,
+        }),
+        11 => Ok(Datum::LocalZonedTimestamp {
+            millis: d.int_val,
+            nanos: d.int_val2 as i32,
+        }),
+        12 => {
+            let unscaled = ((d.int_val2 as i128) << 64) | (d.int_val as u64 as 
i128);
+            Ok(Datum::Decimal {
+                unscaled,
+                precision: d.uint_val,
+                scale: d.uint_val2,
+            })
+        }
+        13 => {
+            if d.str_data.is_null() && d.str_len > 0 {
+                return Err(paimon_error::new(
+                    PaimonErrorCode::InvalidInput,
+                    "null bytes data in datum".to_string(),
+                ));
+            }
+            let bytes = if d.str_len > 0 {
+                std::slice::from_raw_parts(d.str_data, d.str_len).to_vec()
+            } else {
+                Vec::new()
+            };
+            Ok(Datum::Bytes(bytes))
+        }
+        _ => Err(paimon_error::new(
+            PaimonErrorCode::InvalidInput,
+            format!("unknown datum tag: {}", d.tag),
+        )),
+    }
+}
+
+/// Coerce an integer-family datum to match the target column's integer type.
+///
+/// FFI callers (e.g. Go) often pass a narrower integer literal (Int) for a
+/// wider column (BigInt). This function widens or narrows the datum to match,
+/// checking range for narrowing conversions.
+///
+/// Non-integer datums or non-integer columns are returned as-is.
+fn coerce_integer_datum(
+    datum: Datum,
+    fields: &[DataField],
+    column: &str,
+) -> Result<Datum, *mut paimon_error> {
+    let val = match &datum {
+        Datum::TinyInt(v) => *v as i64,
+        Datum::SmallInt(v) => *v as i64,
+        Datum::Int(v) => *v as i64,
+        Datum::Long(v) => *v,
+        _ => return Ok(datum),
+    };
+
+    let Some(field) = fields.iter().find(|f| f.name() == column) else {
+        // Column not found; let PredicateBuilder produce the proper error.
+        return Ok(datum);
+    };
+
+    match field.data_type() {
+        DataType::TinyInt(_) if !matches!(datum, Datum::TinyInt(_)) => {
+            if val < i8::MIN as i64 || val > i8::MAX as i64 {
+                Err(paimon_error::new(
+                    PaimonErrorCode::InvalidInput,
+                    format!("value {val} out of range for TinyInt column 
'{column}'"),
+                ))
+            } else {
+                Ok(Datum::TinyInt(val as i8))
+            }
+        }
+        DataType::SmallInt(_) if !matches!(datum, Datum::SmallInt(_)) => {
+            if val < i16::MIN as i64 || val > i16::MAX as i64 {
+                Err(paimon_error::new(
+                    PaimonErrorCode::InvalidInput,
+                    format!("value {val} out of range for SmallInt column 
'{column}'"),
+                ))
+            } else {
+                Ok(Datum::SmallInt(val as i16))
+            }
+        }
+        DataType::Int(_) if !matches!(datum, Datum::Int(_)) => {
+            if val < i32::MIN as i64 || val > i32::MAX as i64 {
+                Err(paimon_error::new(
+                    PaimonErrorCode::InvalidInput,
+                    format!("value {val} out of range for Int column 
'{column}'"),
+                ))
+            } else {
+                Ok(Datum::Int(val as i32))
+            }
+        }
+        DataType::BigInt(_) if !matches!(datum, Datum::Long(_)) => 
Ok(Datum::Long(val)),
+        _ => Ok(datum),
+    }
+}
+
+/// Helper to build a leaf predicate that takes a datum, via PredicateBuilder.
+unsafe fn build_leaf_predicate_datum(
+    table: *const paimon_table,
+    column: *const std::ffi::c_char,
+    datum: &paimon_datum,
+    build_fn: impl FnOnce(&PredicateBuilder, &str, Datum) -> 
paimon::Result<Predicate>,
+) -> paimon_result_predicate {
+    if let Err(e) = check_non_null(table, "table") {
+        return paimon_result_predicate {
+            predicate: std::ptr::null_mut(),
+            error: e,
+        };
+    }
+    let col_name = match validate_cstr(column, "column") {
+        Ok(s) => s,
+        Err(e) => {
+            return paimon_result_predicate {
+                predicate: std::ptr::null_mut(),
+                error: e,
+            }
+        }
+    };
+
+    let d = match datum_from_c(datum) {
+        Ok(d) => d,
+        Err(e) => {
+            return paimon_result_predicate {
+                predicate: std::ptr::null_mut(),
+                error: e,
+            }
+        }
+    };
+
+    let table_ref = &*((*table).inner as *const Table);
+    let fields = table_ref.schema().fields();
+
+    let d = match coerce_integer_datum(d, fields, &col_name) {
+        Ok(d) => d,
+        Err(e) => {
+            return paimon_result_predicate {
+                predicate: std::ptr::null_mut(),
+                error: e,
+            }
+        }
+    };
+
+    let pb = PredicateBuilder::new(fields);
+    match build_fn(&pb, &col_name, d) {
+        Ok(pred) => {
+            let inner = Box::into_raw(Box::new(pred)) as *mut c_void;
+            paimon_result_predicate {
+                predicate: Box::into_raw(Box::new(paimon_predicate { inner })),
+                error: std::ptr::null_mut(),
+            }
+        }
+        Err(e) => paimon_result_predicate {
+            predicate: std::ptr::null_mut(),
+            error: paimon_error::from_paimon(e),
+        },
+    }
+}
+
+/// Helper to build a leaf predicate without a datum (IS NULL / IS NOT NULL).
+unsafe fn build_leaf_predicate(
+    table: *const paimon_table,
+    column: *const std::ffi::c_char,
+    build_fn: impl FnOnce(&PredicateBuilder, &str) -> 
paimon::Result<Predicate>,
+) -> paimon_result_predicate {
+    if let Err(e) = check_non_null(table, "table") {
+        return paimon_result_predicate {
+            predicate: std::ptr::null_mut(),
+            error: e,
+        };
+    }
+    let col_name = match validate_cstr(column, "column") {
+        Ok(s) => s,
+        Err(e) => {
+            return paimon_result_predicate {
+                predicate: std::ptr::null_mut(),
+                error: e,
+            }
+        }
+    };
+    let table_ref = &*((*table).inner as *const Table);
+    let pb = PredicateBuilder::new(table_ref.schema().fields());
+    match build_fn(&pb, &col_name) {
+        Ok(pred) => {
+            let inner = Box::into_raw(Box::new(pred)) as *mut c_void;
+            paimon_result_predicate {
+                predicate: Box::into_raw(Box::new(paimon_predicate { inner })),
+                error: std::ptr::null_mut(),
+            }
+        }
+        Err(e) => paimon_result_predicate {
+            predicate: std::ptr::null_mut(),
+            error: paimon_error::from_paimon(e),
+        },
+    }
+}
+
+/// Create an equality predicate: `column = datum`.
+///
+/// # Safety
+/// `table` and `column` must be valid pointers.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_predicate_equal(
+    table: *const paimon_table,
+    column: *const std::ffi::c_char,
+    datum: paimon_datum,
+) -> paimon_result_predicate {
+    build_leaf_predicate_datum(table, column, &datum, |pb, col, d| 
pb.equal(col, d))
+}
+
+/// Create a not-equal predicate: `column != datum`.
+///
+/// # Safety
+/// `table` and `column` must be valid pointers.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_predicate_not_equal(
+    table: *const paimon_table,
+    column: *const std::ffi::c_char,
+    datum: paimon_datum,
+) -> paimon_result_predicate {
+    build_leaf_predicate_datum(table, column, &datum, |pb, col, d| 
pb.not_equal(col, d))
+}
+
+/// Create a less-than predicate: `column < datum`.
+///
+/// # Safety
+/// `table` and `column` must be valid pointers.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_predicate_less_than(
+    table: *const paimon_table,
+    column: *const std::ffi::c_char,
+    datum: paimon_datum,
+) -> paimon_result_predicate {
+    build_leaf_predicate_datum(table, column, &datum, |pb, col, d| 
pb.less_than(col, d))
+}
+
+/// Create a less-or-equal predicate: `column <= datum`.
+///
+/// # Safety
+/// `table` and `column` must be valid pointers.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_predicate_less_or_equal(
+    table: *const paimon_table,
+    column: *const std::ffi::c_char,
+    datum: paimon_datum,
+) -> paimon_result_predicate {
+    build_leaf_predicate_datum(table, column, &datum, |pb, col, d| 
pb.less_or_equal(col, d))
+}
+
+/// Create a greater-than predicate: `column > datum`.
+///
+/// # Safety
+/// `table` and `column` must be valid pointers.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_predicate_greater_than(
+    table: *const paimon_table,
+    column: *const std::ffi::c_char,
+    datum: paimon_datum,
+) -> paimon_result_predicate {
+    build_leaf_predicate_datum(table, column, &datum, |pb, col, d| 
pb.greater_than(col, d))
+}
+
+/// Create a greater-or-equal predicate: `column >= datum`.
+///
+/// # Safety
+/// `table` and `column` must be valid pointers.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_predicate_greater_or_equal(
+    table: *const paimon_table,
+    column: *const std::ffi::c_char,
+    datum: paimon_datum,
+) -> paimon_result_predicate {
+    build_leaf_predicate_datum(table, column, &datum, |pb, col, d| {
+        pb.greater_or_equal(col, d)
+    })
+}
+
+/// Create an IS NULL predicate.
+///
+/// # Safety
+/// `table` and `column` must be valid pointers.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_predicate_is_null(
+    table: *const paimon_table,
+    column: *const std::ffi::c_char,
+) -> paimon_result_predicate {
+    build_leaf_predicate(table, column, |pb, col| pb.is_null(col))
+}
+
+/// Create an IS NOT NULL predicate.
+///
+/// # Safety
+/// `table` and `column` must be valid pointers.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_predicate_is_not_null(
+    table: *const paimon_table,
+    column: *const std::ffi::c_char,
+) -> paimon_result_predicate {
+    build_leaf_predicate(table, column, |pb, col| pb.is_not_null(col))
+}
+
+/// Create an IN predicate: `column IN (datum1, datum2, ...)`.
+///
+/// # Safety
+/// `table`, `column`, and `datums` must be valid pointers. `datums_len` must 
be the length.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_predicate_is_in(
+    table: *const paimon_table,
+    column: *const std::ffi::c_char,
+    datums: *const paimon_datum,
+    datums_len: usize,
+) -> paimon_result_predicate {
+    build_leaf_predicate_datums(table, column, datums, datums_len, |pb, col, 
values| {
+        pb.is_in(col, values)
+    })
+}
+
+/// Create a NOT IN predicate: `column NOT IN (datum1, datum2, ...)`.
+///
+/// # Safety
+/// `table`, `column`, and `datums` must be valid pointers. `datums_len` must 
be the length.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_predicate_is_not_in(
+    table: *const paimon_table,
+    column: *const std::ffi::c_char,
+    datums: *const paimon_datum,
+    datums_len: usize,
+) -> paimon_result_predicate {
+    build_leaf_predicate_datums(table, column, datums, datums_len, |pb, col, 
values| {
+        pb.is_not_in(col, values)
+    })
+}
+
+/// Helper to build an IN/NOT IN predicate with a datum array.
+unsafe fn build_leaf_predicate_datums(
+    table: *const paimon_table,
+    column: *const std::ffi::c_char,
+    datums: *const paimon_datum,
+    datums_len: usize,
+    build_fn: impl FnOnce(&PredicateBuilder, &str, Vec<Datum>) -> 
paimon::Result<Predicate>,
+) -> paimon_result_predicate {
+    if let Err(e) = check_non_null(table, "table") {
+        return paimon_result_predicate {
+            predicate: std::ptr::null_mut(),
+            error: e,
+        };
+    }
+    let col_name = match validate_cstr(column, "column") {
+        Ok(s) => s,
+        Err(e) => {
+            return paimon_result_predicate {
+                predicate: std::ptr::null_mut(),
+                error: e,
+            }
+        }
+    };
+
+    if datums.is_null() && datums_len > 0 {
+        return paimon_result_predicate {
+            predicate: std::ptr::null_mut(),
+            error: paimon_error::new(
+                PaimonErrorCode::InvalidInput,
+                "null datums pointer with non-zero length".to_string(),
+            ),
+        };
+    }
+
+    let slice = if datums_len > 0 {
+        std::slice::from_raw_parts(datums, datums_len)
+    } else {
+        &[]
+    };
+    let values: Result<Vec<Datum>, _> = slice.iter().map(|d| 
datum_from_c(d)).collect();
+    let values = match values {
+        Ok(v) => v,
+        Err(e) => {
+            return paimon_result_predicate {
+                predicate: std::ptr::null_mut(),
+                error: e,
+            }
+        }
+    };
+
+    let table_ref = &*((*table).inner as *const Table);
+    let fields = table_ref.schema().fields();
+
+    let values: Result<Vec<Datum>, _> = values
+        .into_iter()
+        .map(|d| coerce_integer_datum(d, fields, &col_name))
+        .collect();
+    let values = match values {
+        Ok(v) => v,
+        Err(e) => {
+            return paimon_result_predicate {
+                predicate: std::ptr::null_mut(),
+                error: e,
+            }
+        }
+    };
+
+    let pb = PredicateBuilder::new(fields);
+    match build_fn(&pb, &col_name, values) {
+        Ok(pred) => {
+            let inner = Box::into_raw(Box::new(pred)) as *mut c_void;
+            paimon_result_predicate {
+                predicate: Box::into_raw(Box::new(paimon_predicate { inner })),
+                error: std::ptr::null_mut(),
+            }
+        }
+        Err(e) => paimon_result_predicate {
+            predicate: std::ptr::null_mut(),
+            error: paimon_error::from_paimon(e),
+        },
+    }
+}
+
+/// Combine two predicates with AND. Consumes both inputs.
+///
+/// # Safety
+/// `a` and `b` must be valid pointers from predicate functions.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_predicate_and(
+    a: *mut paimon_predicate,
+    b: *mut paimon_predicate,
+) -> *mut paimon_predicate {
+    let pred_a = *Box::from_raw(Box::from_raw(a).inner as *mut Predicate);
+    let pred_b = *Box::from_raw(Box::from_raw(b).inner as *mut Predicate);
+    let combined = Predicate::and(vec![pred_a, pred_b]);
+    let inner = Box::into_raw(Box::new(combined)) as *mut c_void;
+    Box::into_raw(Box::new(paimon_predicate { inner }))
+}
+
+/// Combine two predicates with OR. Consumes both inputs.
+///
+/// # Safety
+/// `a` and `b` must be valid pointers from predicate functions.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_predicate_or(
+    a: *mut paimon_predicate,
+    b: *mut paimon_predicate,
+) -> *mut paimon_predicate {
+    let pred_a = *Box::from_raw(Box::from_raw(a).inner as *mut Predicate);
+    let pred_b = *Box::from_raw(Box::from_raw(b).inner as *mut Predicate);
+    let combined = Predicate::or(vec![pred_a, pred_b]);
+    let inner = Box::into_raw(Box::new(combined)) as *mut c_void;
+    Box::into_raw(Box::new(paimon_predicate { inner }))
+}
+
+/// Negate a predicate with NOT. Consumes the input.
+///
+/// # Safety
+/// `p` must be a valid pointer from a predicate function.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_predicate_not(p: *mut paimon_predicate) -> 
*mut paimon_predicate {
+    let pred = *Box::from_raw(Box::from_raw(p).inner as *mut Predicate);
+    let negated = Predicate::negate(pred);
+    let inner = Box::into_raw(Box::new(negated)) as *mut c_void;
+    Box::into_raw(Box::new(paimon_predicate { inner }))
+}
+
+/// Free a paimon_predicate.
+///
+/// # Safety
+/// Only call with a predicate returned from paimon predicate functions.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_predicate_free(p: *mut paimon_predicate) {
+    if !p.is_null() {
+        let wrapper = Box::from_raw(p);
+        if !wrapper.inner.is_null() {
+            drop(Box::from_raw(wrapper.inner as *mut Predicate));
+        }
+    }
+}
diff --git a/bindings/c/src/types.rs b/bindings/c/src/types.rs
index 6f09042..4df28de 100644
--- a/bindings/c/src/types.rs
+++ b/bindings/c/src/types.rs
@@ -17,7 +17,7 @@
 
 use std::ffi::c_void;
 
-use paimon::spec::DataField;
+use paimon::spec::{DataField, Predicate};
 use paimon::table::Table;
 
 /// C-compatible key-value pair for options.
@@ -78,10 +78,17 @@ pub struct paimon_read_builder {
     pub inner: *mut c_void,
 }
 
-/// Internal state for ReadBuilder that stores table and projection columns.
+/// Internal state for ReadBuilder that stores table, projection columns, and 
filter.
 pub(crate) struct ReadBuilderState {
     pub table: Table,
     pub projected_columns: Option<Vec<String>>,
+    pub filter: Option<Predicate>,
+}
+
+/// Internal state for TableScan that stores table and filter.
+pub(crate) struct TableScanState {
+    pub table: Table,
+    pub filter: Option<Predicate>,
 }
 
 #[repr(C)]
@@ -94,10 +101,11 @@ pub struct paimon_table_read {
     pub inner: *mut c_void,
 }
 
-/// Internal state for TableRead that stores table and projected read type.
+/// Internal state for TableRead that stores table, projected read type, and 
data predicates.
 pub(crate) struct TableReadState {
     pub table: Table,
     pub read_type: Vec<DataField>,
+    pub data_predicates: Vec<Predicate>,
 }
 
 #[repr(C)]
@@ -110,6 +118,53 @@ pub struct paimon_record_batch_reader {
     pub inner: *mut c_void,
 }
 
+/// Opaque wrapper around a Predicate.
+#[repr(C)]
+pub struct paimon_predicate {
+    pub inner: *mut c_void,
+}
+
+/// A typed literal value for predicate comparison, passed across FFI.
+///
+/// # Design
+///
+/// We use a tagged flat struct instead of opaque heap-allocated handles
+/// (like DuckDB's `duckdb_value`). The trade-off:
+///
+/// - **Pro**: Zero allocation — the entire datum is passed by value on the
+///   stack, with no heap round-trips or free calls needed. This keeps the
+///   FFI surface minimal and the Go/C caller simple.
+/// - **Con**: The struct is larger than any single variant needs, wasting
+///   some bytes per datum (currently ~56 bytes vs. ~16 for the largest
+///   single variant).
+///
+/// Since datums are only used for predicate construction (not a hot path),
+/// the extra size is acceptable.
+///
+/// # Tags
+///
+/// - 0: Bool, 1: TinyInt, 2: SmallInt, 3: Int, 4: Long
+/// - 5: Float, 6: Double, 7: String, 8: Date, 9: Time
+/// - 10: Timestamp, 11: LocalZonedTimestamp, 12: Decimal, 13: Bytes
+///
+/// `tag` determines which value fields are valid:
+/// - `Bool`/`TinyInt`/`SmallInt`/`Int`/`Long`/`Date`/`Time` → `int_val`
+/// - `Float`/`Double` → `double_val`
+/// - `String`/`Bytes` → `str_data` + `str_len`
+/// - `Timestamp`/`LocalZonedTimestamp` → `int_val` (millis) + `int_val2` 
(nanos)
+/// - `Decimal` → `int_val` + `int_val2` (unscaled i128) + `uint_val` 
(precision) + `uint_val2` (scale)
+#[repr(C)]
+pub struct paimon_datum {
+    pub tag: i32,
+    pub int_val: i64,
+    pub double_val: f64,
+    pub str_data: *const u8,
+    pub str_len: usize,
+    pub int_val2: i64,
+    pub uint_val: u32,
+    pub uint_val2: u32,
+}
+
 /// A single Arrow record batch exported via the Arrow C Data Interface.
 ///
 /// `array` and `schema` point to heap-allocated ArrowArray and ArrowSchema
diff --git a/bindings/go/predicate.go b/bindings/go/predicate.go
new file mode 100644
index 0000000..d39b684
--- /dev/null
+++ b/bindings/go/predicate.go
@@ -0,0 +1,595 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package paimon
+
+import (
+       "context"
+       "fmt"
+       "math"
+       "runtime"
+       "sync"
+       "unsafe"
+
+       "github.com/jupiterrider/ffi"
+)
+
+// Datum type tags (must match paimon-c datum_from_c).
+const (
+       datumTagBool                int32 = 0
+       datumTagTinyInt             int32 = 1
+       datumTagSmallInt            int32 = 2
+       datumTagInt                 int32 = 3
+       datumTagLong                int32 = 4
+       datumTagFloat               int32 = 5
+       datumTagDouble              int32 = 6
+       datumTagString              int32 = 7
+       datumTagDate                int32 = 8
+       datumTagTime                int32 = 9
+       datumTagTimestamp           int32 = 10
+       datumTagLocalZonedTimestamp int32 = 11
+       datumTagDecimal             int32 = 12
+       datumTagBytes               int32 = 13
+)
+
+// Datum is a typed literal value for predicate comparison.
+// The internal representation is hidden to allow future changes
+// (e.g. switching to opaque handles) without breaking callers.
+type Datum struct {
+       inner paimonDatumC
+}
+
+// BoolDatum creates a boolean datum.
+func BoolDatum(v bool) Datum {
+       var iv int64
+       if v {
+               iv = 1
+       }
+       return Datum{inner: paimonDatumC{tag: datumTagBool, intVal: iv}}
+}
+
+// TinyIntDatum creates a tinyint datum.
+func TinyIntDatum(v int8) Datum {
+       return Datum{inner: paimonDatumC{tag: datumTagTinyInt, intVal: 
int64(v)}}
+}
+
+// SmallIntDatum creates a smallint datum.
+func SmallIntDatum(v int16) Datum {
+       return Datum{inner: paimonDatumC{tag: datumTagSmallInt, intVal: 
int64(v)}}
+}
+
+// IntDatum creates an int datum.
+func IntDatum(v int32) Datum {
+       return Datum{inner: paimonDatumC{tag: datumTagInt, intVal: int64(v)}}
+}
+
+// LongDatum creates a long (bigint) datum.
+func LongDatum(v int64) Datum {
+       return Datum{inner: paimonDatumC{tag: datumTagLong, intVal: v}}
+}
+
+// FloatDatum creates a float datum.
+func FloatDatum(v float32) Datum {
+       return Datum{inner: paimonDatumC{tag: datumTagFloat, dblVal: 
float64(v)}}
+}
+
+// DoubleDatum creates a double datum.
+func DoubleDatum(v float64) Datum {
+       return Datum{inner: paimonDatumC{tag: datumTagDouble, dblVal: v}}
+}
+
+// StringDatum creates a string datum.
+func StringDatum(v string) Datum {
+       b := []byte(v)
+       d := paimonDatumC{tag: datumTagString, strLen: uintptr(len(b))}
+       if len(b) > 0 {
+               d.strData = &b[0]
+       }
+       return Datum{inner: d}
+}
+
+// Date represents a date value as epoch days since 1970-01-01.
+// Usage: table.PredicateEqual("dt", paimon.Date(19000))
+type Date int32
+
+// Time represents a time-of-day value as milliseconds since midnight.
+// Usage: table.PredicateEqual("t", paimon.Time(3600000))
+type Time int32
+
+// Timestamp represents a timestamp without timezone (millis + sub-millis 
nanos).
+// Usage: table.PredicateEqual("ts", paimon.Timestamp{Millis: 1700000000000, 
Nanos: 0})
+type Timestamp struct {
+       Millis int64
+       Nanos  int32
+}
+
+// LocalZonedTimestamp represents a timestamp with local timezone semantics.
+// Usage: table.PredicateEqual("lzts", paimon.LocalZonedTimestamp{Millis: 
1700000000000, Nanos: 0})
+type LocalZonedTimestamp struct {
+       Millis int64
+       Nanos  int32
+}
+
+// Decimal represents a fixed-precision decimal value up to DECIMAL(38, s).
+//
+// The unscaled value is stored as a little-endian i128 split into two int64
+// halves: Lo (low 64 bits, unsigned interpretation) and Hi (high 64 bits,
+// sign-extended). For values that fit in int64, use [NewDecimal].
+//
+// Usage:
+//
+//     paimon.NewDecimal(12345, 10, 2)          // 123.45 as DECIMAL(10,2)
+//     paimon.Decimal{Lo: lo, Hi: hi, ...}      // full i128
+type Decimal struct {
+       Lo        int64 // low 64 bits of unscaled i128 (unsigned 
interpretation)
+       Hi        int64 // high 64 bits of unscaled i128 (sign extension)
+       Precision uint32
+       Scale     uint32
+}
+
+// NewDecimal creates a Decimal from an int64 unscaled value.
+// For unscaled values that exceed int64 range, construct Decimal directly
+// with Lo/Hi fields.
+func NewDecimal(unscaled int64, precision, scale uint32) Decimal {
+       hi := int64(0)
+       if unscaled < 0 {
+               hi = -1
+       }
+       return Decimal{Lo: unscaled, Hi: hi, Precision: precision, Scale: scale}
+}
+
+// Bytes represents a binary value.
+// Usage: table.PredicateEqual("data", paimon.Bytes(someSlice))
+type Bytes []byte
+
+// toDatum converts a Go value to a Datum for predicate comparison.
+//
+// Supported Go types and their Paimon mappings:
+//   - bool                 → Bool
+//   - int8                 → TinyInt
+//   - int16                → SmallInt
+//   - int32                → Int
+//   - int                  → Int (if fits int32) or Long
+//   - int64                → Long
+//   - float32              → Float
+//   - float64              → Double
+//   - string               → String
+//   - Date                 → Date
+//   - Time                 → Time
+//   - Timestamp            → Timestamp
+//   - LocalZonedTimestamp  → LocalZonedTimestamp
+//   - Decimal              → Decimal
+//   - Bytes                → Bytes
+//   - Datum                → passed through
+func toDatum(v any) (Datum, error) {
+       switch val := v.(type) {
+       case bool:
+               return BoolDatum(val), nil
+       case int8:
+               return TinyIntDatum(val), nil
+       case int16:
+               return SmallIntDatum(val), nil
+       case int32:
+               return IntDatum(val), nil
+       case int:
+               if val >= math.MinInt32 && val <= math.MaxInt32 {
+                       return IntDatum(int32(val)), nil
+               }
+               return LongDatum(int64(val)), nil
+       case int64:
+               return LongDatum(val), nil
+       case float32:
+               return FloatDatum(val), nil
+       case float64:
+               return DoubleDatum(val), nil
+       case string:
+               return StringDatum(val), nil
+       case Date:
+               return Datum{inner: paimonDatumC{tag: datumTagDate, intVal: 
int64(val)}}, nil
+       case Time:
+               return Datum{inner: paimonDatumC{tag: datumTagTime, intVal: 
int64(val)}}, nil
+       case Timestamp:
+               return Datum{inner: paimonDatumC{tag: datumTagTimestamp, 
intVal: val.Millis, intVal2: int64(val.Nanos)}}, nil
+       case LocalZonedTimestamp:
+               return Datum{inner: paimonDatumC{tag: 
datumTagLocalZonedTimestamp, intVal: val.Millis, intVal2: int64(val.Nanos)}}, 
nil
+       case Decimal:
+               return Datum{inner: paimonDatumC{
+                       tag: datumTagDecimal, intVal: val.Lo, intVal2: val.Hi,
+                       uintVal: val.Precision, uintVal2: val.Scale,
+               }}, nil
+       case Bytes:
+               d := paimonDatumC{tag: datumTagBytes, strLen: uintptr(len(val))}
+               if len(val) > 0 {
+                       d.strData = &val[0]
+               }
+               return Datum{inner: d}, nil
+       case Datum:
+               return val, nil
+       default:
+               return Datum{}, fmt.Errorf("unsupported datum type: %T", v)
+       }
+}
+
+// Predicate is an opaque filter predicate for scan planning.
+type Predicate struct {
+       ctx       context.Context
+       lib       *libRef
+       inner     *paimonPredicate
+       closeOnce sync.Once
+}
+
+// Close releases the predicate resources. Safe to call multiple times.
+// Note: predicates passed to WithFilter or combinators (And/Or/Not) are 
consumed
+// and should NOT be closed by the caller.
+func (p *Predicate) Close() {
+       p.closeOnce.Do(func() {
+               if p.inner != nil {
+                       ffiPredicateFree.symbol(p.ctx)(p.inner)
+                       p.inner = nil
+                       p.lib.release()
+               }
+       })
+}
+
+// errConsumedPredicate is returned when a consumed or nil predicate is reused.
+var errConsumedPredicate = fmt.Errorf("paimon: predicate already consumed or 
nil")
+
+// PredicateBuilder creates filter predicates for a table.
+// It holds a Go-level reference to the Table and does not own any C resources,
+// so there is no Close() method.
+type PredicateBuilder struct {
+       table *Table
+}
+
+// Eq creates an equality predicate: column = value.
+func (pb *PredicateBuilder) Eq(column string, value any) (*Predicate, error) {
+       datum, err := toDatum(value)
+       if err != nil {
+               return nil, err
+       }
+       return pb.buildLeafPredicate(ffiPredicateEqual, column, datum)
+}
+
+// NotEq creates a not-equal predicate: column != value.
+func (pb *PredicateBuilder) NotEq(column string, value any) (*Predicate, 
error) {
+       datum, err := toDatum(value)
+       if err != nil {
+               return nil, err
+       }
+       return pb.buildLeafPredicate(ffiPredicateNotEqual, column, datum)
+}
+
+// Lt creates a less-than predicate: column < value.
+func (pb *PredicateBuilder) Lt(column string, value any) (*Predicate, error) {
+       datum, err := toDatum(value)
+       if err != nil {
+               return nil, err
+       }
+       return pb.buildLeafPredicate(ffiPredicateLessThan, column, datum)
+}
+
+// Le creates a less-or-equal predicate: column <= value.
+func (pb *PredicateBuilder) Le(column string, value any) (*Predicate, error) {
+       datum, err := toDatum(value)
+       if err != nil {
+               return nil, err
+       }
+       return pb.buildLeafPredicate(ffiPredicateLessOrEqual, column, datum)
+}
+
+// Gt creates a greater-than predicate: column > value.
+func (pb *PredicateBuilder) Gt(column string, value any) (*Predicate, error) {
+       datum, err := toDatum(value)
+       if err != nil {
+               return nil, err
+       }
+       return pb.buildLeafPredicate(ffiPredicateGreaterThan, column, datum)
+}
+
+// Ge creates a greater-or-equal predicate: column >= value.
+func (pb *PredicateBuilder) Ge(column string, value any) (*Predicate, error) {
+       datum, err := toDatum(value)
+       if err != nil {
+               return nil, err
+       }
+       return pb.buildLeafPredicate(ffiPredicateGreaterOrEqual, column, datum)
+}
+
+// IsNull creates an IS NULL predicate.
+func (pb *PredicateBuilder) IsNull(column string) (*Predicate, error) {
+       return pb.buildNullPredicate(ffiPredicateIsNull, column)
+}
+
+// IsNotNull creates an IS NOT NULL predicate.
+func (pb *PredicateBuilder) IsNotNull(column string) (*Predicate, error) {
+       return pb.buildNullPredicate(ffiPredicateIsNotNull, column)
+}
+
+// In creates an IN predicate: column IN (values...).
+func (pb *PredicateBuilder) In(column string, values ...any) (*Predicate, 
error) {
+       return pb.buildInPredicate(ffiPredicateIsIn, column, values)
+}
+
+// NotIn creates a NOT IN predicate: column NOT IN (values...).
+func (pb *PredicateBuilder) NotIn(column string, values ...any) (*Predicate, 
error) {
+       return pb.buildInPredicate(ffiPredicateIsNotIn, column, values)
+}
+
+// buildLeafPredicate is a helper for comparison predicates that take (table, 
column, datum).
+func (pb *PredicateBuilder) buildLeafPredicate(
+       ffiVar *FFI[func(*paimonTable, *byte, paimonDatumC) (*paimonPredicate, 
error)],
+       column string, datum Datum,
+) (*Predicate, error) {
+       t := pb.table
+       if t.inner == nil {
+               return nil, ErrClosed
+       }
+       createFn := ffiVar.symbol(t.ctx)
+       cCol := append([]byte(column), 0)
+       inner, err := createFn(t.inner, &cCol[0], datum.inner)
+       runtime.KeepAlive(cCol)
+       runtime.KeepAlive(datum)
+       if err != nil {
+               return nil, err
+       }
+       t.lib.acquire()
+       return &Predicate{ctx: t.ctx, lib: t.lib, inner: inner}, nil
+}
+
+// buildNullPredicate is a helper for IS NULL / IS NOT NULL predicates.
+func (pb *PredicateBuilder) buildNullPredicate(
+       ffiVar *FFI[func(*paimonTable, *byte) (*paimonPredicate, error)],
+       column string,
+) (*Predicate, error) {
+       t := pb.table
+       if t.inner == nil {
+               return nil, ErrClosed
+       }
+       createFn := ffiVar.symbol(t.ctx)
+       cCol := append([]byte(column), 0)
+       inner, err := createFn(t.inner, &cCol[0])
+       runtime.KeepAlive(cCol)
+       if err != nil {
+               return nil, err
+       }
+       t.lib.acquire()
+       return &Predicate{ctx: t.ctx, lib: t.lib, inner: inner}, nil
+}
+
+// buildInPredicate is a helper for IS IN / IS NOT IN predicates.
+func (pb *PredicateBuilder) buildInPredicate(
+       ffiVar *FFI[func(*paimonTable, *byte, unsafe.Pointer, uintptr) 
(*paimonPredicate, error)],
+       column string, values []any,
+) (*Predicate, error) {
+       t := pb.table
+       if t.inner == nil {
+               return nil, ErrClosed
+       }
+       datums := make([]paimonDatumC, len(values))
+       for i, v := range values {
+               d, err := toDatum(v)
+               if err != nil {
+                       return nil, err
+               }
+               datums[i] = d.inner
+       }
+       createFn := ffiVar.symbol(t.ctx)
+       cCol := append([]byte(column), 0)
+       var datumsPtr unsafe.Pointer
+       if len(datums) > 0 {
+               datumsPtr = unsafe.Pointer(&datums[0])
+       }
+       inner, err := createFn(t.inner, &cCol[0], datumsPtr, 
uintptr(len(datums)))
+       runtime.KeepAlive(cCol)
+       runtime.KeepAlive(datums)
+       runtime.KeepAlive(values)
+       if err != nil {
+               return nil, err
+       }
+       t.lib.acquire()
+       return &Predicate{ctx: t.ctx, lib: t.lib, inner: inner}, nil
+}
+
+// combinePredicate is a shared helper for And/Or.
+func (p *Predicate) combinePredicate(
+       other *Predicate,
+       ffiVar *FFI[func(*paimonPredicate, *paimonPredicate) *paimonPredicate],
+) (*Predicate, error) {
+       if p == nil || p.inner == nil {
+               return nil, errConsumedPredicate
+       }
+       if other == nil || other.inner == nil {
+               return nil, errConsumedPredicate
+       }
+       if p == other {
+               return nil, fmt.Errorf("paimon: cannot combine a predicate with 
itself")
+       }
+       combineFn := ffiVar.symbol(p.ctx)
+       p.inner = combineFn(p.inner, other.inner)
+       other.inner = nil
+       other.lib.release()
+       return p, nil
+}
+
+// And combines this predicate with another using AND. Consumes both predicates
+// (callers must NOT close either after this call).
+func (p *Predicate) And(other *Predicate) (*Predicate, error) {
+       return p.combinePredicate(other, ffiPredicateAnd)
+}
+
+// Or combines this predicate with another using OR. Consumes both predicates
+// (callers must NOT close either after this call).
+func (p *Predicate) Or(other *Predicate) (*Predicate, error) {
+       return p.combinePredicate(other, ffiPredicateOr)
+}
+
+// Not negates this predicate. Consumes the input
+// (caller must NOT close it after this call).
+func (p *Predicate) Not() (*Predicate, error) {
+       if p == nil || p.inner == nil {
+               return nil, errConsumedPredicate
+       }
+       negateFn := ffiPredicateNot.symbol(p.ctx)
+       p.inner = negateFn(p.inner)
+       return p, nil
+}
+
+// FFI wrappers for predicate functions.
+
+var ffiPredicateFree = newFFI(ffiOpts{
+       sym:    "paimon_predicate_free",
+       rType:  &ffi.TypeVoid,
+       aTypes: []*ffi.Type{&ffi.TypePointer},
+}, func(_ context.Context, ffiCall ffiCall) func(p *paimonPredicate) {
+       return func(p *paimonPredicate) {
+               ffiCall(nil, unsafe.Pointer(&p))
+       }
+})
+
+var ffiPredicateEqual = newPredicateLeafFFI("paimon_predicate_equal")
+var ffiPredicateNotEqual = newPredicateLeafFFI("paimon_predicate_not_equal")
+var ffiPredicateLessThan = newPredicateLeafFFI("paimon_predicate_less_than")
+var ffiPredicateLessOrEqual = 
newPredicateLeafFFI("paimon_predicate_less_or_equal")
+var ffiPredicateGreaterThan = 
newPredicateLeafFFI("paimon_predicate_greater_than")
+var ffiPredicateGreaterOrEqual = 
newPredicateLeafFFI("paimon_predicate_greater_or_equal")
+
+// newPredicateLeafFFI creates an FFI wrapper for comparison predicate 
functions
+// with signature: (table, column, datum) -> result_predicate.
+func newPredicateLeafFFI(sym string) *FFI[func(*paimonTable, *byte, 
paimonDatumC) (*paimonPredicate, error)] {
+       return newFFI(ffiOpts{
+               sym:    contextKey(sym),
+               rType:  &typeResultPredicate,
+               aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer, 
&typePaimonDatum},
+       }, func(ctx context.Context, ffiCall ffiCall) func(*paimonTable, *byte, 
paimonDatumC) (*paimonPredicate, error) {
+               return func(table *paimonTable, column *byte, datum 
paimonDatumC) (*paimonPredicate, error) {
+                       var result resultPredicate
+                       ffiCall(
+                               unsafe.Pointer(&result),
+                               unsafe.Pointer(&table),
+                               unsafe.Pointer(&column),
+                               unsafe.Pointer(&datum),
+                       )
+                       if result.error != nil {
+                               return nil, parseError(ctx, result.error)
+                       }
+                       return result.predicate, nil
+               }
+       })
+}
+
+var ffiPredicateIsNull = newPredicateNullFFI("paimon_predicate_is_null")
+var ffiPredicateIsNotNull = newPredicateNullFFI("paimon_predicate_is_not_null")
+
+// newPredicateNullFFI creates an FFI wrapper for null-check predicate 
functions
+// with signature: (table, column) -> result_predicate.
+func newPredicateNullFFI(sym string) *FFI[func(*paimonTable, *byte) 
(*paimonPredicate, error)] {
+       return newFFI(ffiOpts{
+               sym:    contextKey(sym),
+               rType:  &typeResultPredicate,
+               aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer},
+       }, func(ctx context.Context, ffiCall ffiCall) func(*paimonTable, *byte) 
(*paimonPredicate, error) {
+               return func(table *paimonTable, column *byte) 
(*paimonPredicate, error) {
+                       var result resultPredicate
+                       ffiCall(
+                               unsafe.Pointer(&result),
+                               unsafe.Pointer(&table),
+                               unsafe.Pointer(&column),
+                       )
+                       if result.error != nil {
+                               return nil, parseError(ctx, result.error)
+                       }
+                       return result.predicate, nil
+               }
+       })
+}
+
+var ffiPredicateIsIn = newPredicateInFFI("paimon_predicate_is_in")
+var ffiPredicateIsNotIn = newPredicateInFFI("paimon_predicate_is_not_in")
+
+// newPredicateInFFI creates an FFI wrapper for IN/NOT IN predicate functions
+// with signature: (table, column, datums, datums_len) -> result_predicate.
+func newPredicateInFFI(sym string) *FFI[func(*paimonTable, *byte, 
unsafe.Pointer, uintptr) (*paimonPredicate, error)] {
+       return newFFI(ffiOpts{
+               sym:    contextKey(sym),
+               rType:  &typeResultPredicate,
+               aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer, 
&ffi.TypePointer, &ffi.TypePointer},
+       }, func(ctx context.Context, ffiCall ffiCall) func(*paimonTable, *byte, 
unsafe.Pointer, uintptr) (*paimonPredicate, error) {
+               return func(table *paimonTable, column *byte, datums 
unsafe.Pointer, datumsLen uintptr) (*paimonPredicate, error) {
+                       var result resultPredicate
+                       ffiCall(
+                               unsafe.Pointer(&result),
+                               unsafe.Pointer(&table),
+                               unsafe.Pointer(&column),
+                               unsafe.Pointer(&datums),
+                               unsafe.Pointer(&datumsLen),
+                       )
+                       if result.error != nil {
+                               return nil, parseError(ctx, result.error)
+                       }
+                       return result.predicate, nil
+               }
+       })
+}
+
+var ffiPredicateAnd = newFFI(ffiOpts{
+       sym:    "paimon_predicate_and",
+       rType:  &ffi.TypePointer,
+       aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer},
+}, func(_ context.Context, ffiCall ffiCall) func(*paimonPredicate, 
*paimonPredicate) *paimonPredicate {
+       return func(a, b *paimonPredicate) *paimonPredicate {
+               var result *paimonPredicate
+               ffiCall(
+                       unsafe.Pointer(&result),
+                       unsafe.Pointer(&a),
+                       unsafe.Pointer(&b),
+               )
+               return result
+       }
+})
+
+var ffiPredicateOr = newFFI(ffiOpts{
+       sym:    "paimon_predicate_or",
+       rType:  &ffi.TypePointer,
+       aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer},
+}, func(_ context.Context, ffiCall ffiCall) func(*paimonPredicate, 
*paimonPredicate) *paimonPredicate {
+       return func(a, b *paimonPredicate) *paimonPredicate {
+               var result *paimonPredicate
+               ffiCall(
+                       unsafe.Pointer(&result),
+                       unsafe.Pointer(&a),
+                       unsafe.Pointer(&b),
+               )
+               return result
+       }
+})
+
+var ffiPredicateNot = newFFI(ffiOpts{
+       sym:    "paimon_predicate_not",
+       rType:  &ffi.TypePointer,
+       aTypes: []*ffi.Type{&ffi.TypePointer},
+}, func(_ context.Context, ffiCall ffiCall) func(*paimonPredicate) 
*paimonPredicate {
+       return func(p *paimonPredicate) *paimonPredicate {
+               var result *paimonPredicate
+               ffiCall(
+                       unsafe.Pointer(&result),
+                       unsafe.Pointer(&p),
+               )
+               return result
+       }
+})
diff --git a/bindings/go/predicate/predicate.go 
b/bindings/go/predicate/predicate.go
new file mode 100644
index 0000000..046fa78
--- /dev/null
+++ b/bindings/go/predicate/predicate.go
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// Package predicate provides convenience functions for combining Paimon 
filter predicates.
+package predicate
+
+import (
+       "fmt"
+
+       paimon "github.com/apache/paimon-rust/bindings/go"
+)
+
+// Predicate is an alias for paimon.Predicate.
+type Predicate = paimon.Predicate
+
+// And combines two or more predicates with AND. Consumes all inputs
+// (callers must NOT close them after this call).
+func And(preds ...*Predicate) (*Predicate, error) {
+       if len(preds) < 2 {
+               return nil, fmt.Errorf("predicate.And requires at least 2 
predicates, got %d", len(preds))
+       }
+       result := preds[0]
+       for _, p := range preds[1:] {
+               r, err := result.And(p)
+               if err != nil {
+                       return nil, err
+               }
+               result = r
+       }
+       return result, nil
+}
+
+// Or combines two or more predicates with OR. Consumes all inputs
+// (callers must NOT close them after this call).
+func Or(preds ...*Predicate) (*Predicate, error) {
+       if len(preds) < 2 {
+               return nil, fmt.Errorf("predicate.Or requires at least 2 
predicates, got %d", len(preds))
+       }
+       result := preds[0]
+       for _, p := range preds[1:] {
+               r, err := result.Or(p)
+               if err != nil {
+                       return nil, err
+               }
+               result = r
+       }
+       return result, nil
+}
+
+// Not negates a predicate. Consumes the input
+// (caller must NOT close it after this call).
+func Not(p *Predicate) (*Predicate, error) {
+       return p.Not()
+}
diff --git a/bindings/go/read_builder.go b/bindings/go/read_builder.go
index fde6bf5..177c595 100644
--- a/bindings/go/read_builder.go
+++ b/bindings/go/read_builder.go
@@ -56,6 +56,47 @@ func (rb *ReadBuilder) WithProjection(columns []string) 
error {
        return projFn(rb.inner, columns)
 }
 
+// WithFilter sets a filter predicate for scan planning and read-side pruning.
+//
+// The predicate is used in two phases:
+//   - Scan planning: prunes partitions, buckets, and data files based on
+//     file-level statistics (min/max). This is conservative — files whose
+//     statistics are inconclusive are kept.
+//   - Read-side: applies row-level filtering via Parquet native row filters
+//     for supported leaf predicates (Eq, NotEq, Lt, Le, Gt, Ge, IsNull,
+//     IsNotNull, In, NotIn).
+//
+// Row-level filtering is exact for most common types (Bool, Int, Long, Float,
+// Double, String, Date, Decimal, Binary). However, the following cases are NOT
+// filtered at the row level and may return non-matching rows:
+//   - Compound predicates (And/Or/Not) — not yet implemented for row-level 
filtering.
+//   - Time, Timestamp, and LocalZonedTimestamp columns (not yet implemented).
+//   - Schema-evolution: the predicate column does not exist in older data 
files.
+//   - Data-evolution mode (data-evolution.enabled = true).
+//
+// In these cases callers should apply residual filtering on the returned 
records.
+//
+// The predicate is consumed (ownership transferred to the read builder);
+// the caller must NOT close it after this call.
+// Passing nil is a no-op.
+func (rb *ReadBuilder) WithFilter(p *Predicate) error {
+       if rb.inner == nil {
+               return ErrClosed
+       }
+       if p == nil {
+               return nil
+       }
+       if p.inner == nil {
+               return errConsumedPredicate
+       }
+       filterFn := ffiReadBuilderWithFilter.symbol(rb.ctx)
+       err := filterFn(rb.inner, p.inner)
+       // Ownership transferred; prevent double-free.
+       p.inner = nil
+       p.lib.release()
+       return err
+}
+
 // NewScan creates a TableScan for planning which data files to read.
 func (rb *ReadBuilder) NewScan() (*TableScan, error) {
        if rb.inner == nil {
@@ -136,6 +177,25 @@ var ffiReadBuilderWithProjection = newFFI(ffiOpts{
        }
 })
 
+var ffiReadBuilderWithFilter = newFFI(ffiOpts{
+       sym:    "paimon_read_builder_with_filter",
+       rType:  &ffi.TypePointer,
+       aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer},
+}, func(ctx context.Context, ffiCall ffiCall) func(rb *paimonReadBuilder, p 
*paimonPredicate) error {
+       return func(rb *paimonReadBuilder, p *paimonPredicate) error {
+               var errPtr *paimonError
+               ffiCall(
+                       unsafe.Pointer(&errPtr),
+                       unsafe.Pointer(&rb),
+                       unsafe.Pointer(&p),
+               )
+               if errPtr != nil {
+                       return parseError(ctx, errPtr)
+               }
+               return nil
+       }
+})
+
 var ffiReadBuilderNewScan = newFFI(ffiOpts{
        sym:    "paimon_read_builder_new_scan",
        rType:  &typeResultTableScan,
diff --git a/bindings/go/table.go b/bindings/go/table.go
index 17ff7af..e6008a2 100644
--- a/bindings/go/table.go
+++ b/bindings/go/table.go
@@ -44,6 +44,11 @@ func (t *Table) Close() {
        })
 }
 
+// PredicateBuilder returns a builder for creating filter predicates on this 
table.
+func (t *Table) PredicateBuilder() *PredicateBuilder {
+       return &PredicateBuilder{table: t}
+}
+
 // NewReadBuilder creates a ReadBuilder for this table.
 func (t *Table) NewReadBuilder() (*ReadBuilder, error) {
        if t.inner == nil {
diff --git a/bindings/go/tests/paimon_test.go b/bindings/go/tests/paimon_test.go
index e83f151..e9ee196 100644
--- a/bindings/go/tests/paimon_test.go
+++ b/bindings/go/tests/paimon_test.go
@@ -30,41 +30,14 @@ import (
        paimon "github.com/apache/paimon-rust/bindings/go"
 )
 
-// TestReadLogTable reads the test table and verifies the data matches 
expected values.
-//
-// The table was populated by Docker provisioning with:
-//
-//     (1, 'alice'), (2, 'bob'), (3, 'carol')
-func TestReadLogTable(t *testing.T) {
-       warehouse := os.Getenv("PAIMON_TEST_WAREHOUSE")
-       if warehouse == "" {
-               warehouse = "/tmp/paimon-warehouse"
-       }
-
-       if _, err := os.Stat(warehouse); os.IsNotExist(err) {
-               t.Skipf("Skipping: warehouse %s does not exist (run 'make 
docker-up' first)", warehouse)
-       }
-
-       // Use NewCatalog with options
-       catalog, err := paimon.NewCatalog(map[string]string{
-               "warehouse": warehouse,
-       })
-       if err != nil {
-               t.Fatalf("Failed to create catalog: %v", err)
-       }
-       defer catalog.Close()
+type row struct {
+       id   int32
+       name string
+}
 
-       table, err := catalog.GetTable(paimon.NewIdentifier("default", 
"simple_log_table"))
-       if err != nil {
-               t.Fatalf("Failed to get table: %v", err)
-       }
-       defer table.Close()
-
-       rb, err := table.NewReadBuilder()
-       if err != nil {
-               t.Fatalf("Failed to create read builder: %v", err)
-       }
-       defer rb.Close()
+// readRows scans and reads all (id, name) rows from a ReadBuilder.
+func readRows(t *testing.T, rb *paimon.ReadBuilder) []row {
+       t.Helper()
 
        scan, err := rb.NewScan()
        if err != nil {
@@ -80,7 +53,7 @@ func TestReadLogTable(t *testing.T) {
 
        splits := plan.Splits()
        if len(splits) == 0 {
-               t.Fatal("Expected at least one split")
+               return nil
        }
 
        read, err := rb.NewRead()
@@ -95,13 +68,6 @@ func TestReadLogTable(t *testing.T) {
        }
        defer reader.Close()
 
-       // Import Arrow batches via C Data Interface and collect rows.
-       // Strings are copied before Release because arrow-go's String.Value()
-       // returns zero-copy references into the Arrow buffer.
-       type row struct {
-               id   int32
-               name string
-       }
        var rows []row
        batchIdx := 0
        for {
@@ -132,58 +98,130 @@ func TestReadLogTable(t *testing.T) {
                record.Release()
                batchIdx++
        }
-
-       if len(rows) == 0 {
-               t.Fatal("Expected at least one row, got 0")
-       }
-
-       sort.Slice(rows, func(i, j int) bool {
-               return rows[i].id < rows[j].id
-       })
-
-       expected := []row{
-               {1, "alice"},
-               {2, "bob"},
-               {3, "carol"},
-       }
-
-       if len(rows) != len(expected) {
-               t.Fatalf("Expected %d rows, got %d: %v", len(expected), 
len(rows), rows)
-       }
-
-       for i, exp := range expected {
-               if rows[i] != exp {
-                       t.Errorf("Row %d: expected %v, got %v", i, exp, rows[i])
-               }
-       }
+       return rows
 }
 
-// TestReadWithProjection reads only the "id" column via WithProjection and
-// verifies that only the projected column is returned with correct values.
-func TestReadWithProjection(t *testing.T) {
+// openTestTable creates a catalog, opens the simple_log_table, and returns
+// the table along with a cleanup function. Skips the test if the warehouse
+// does not exist.
+func openTestTable(t *testing.T) *paimon.Table {
+       t.Helper()
+
        warehouse := os.Getenv("PAIMON_TEST_WAREHOUSE")
        if warehouse == "" {
                warehouse = "/tmp/paimon-warehouse"
        }
-
        if _, err := os.Stat(warehouse); os.IsNotExist(err) {
                t.Skipf("Skipping: warehouse %s does not exist (run 'make 
docker-up' first)", warehouse)
        }
 
-       // Use NewCatalog with options
        catalog, err := paimon.NewCatalog(map[string]string{
                "warehouse": warehouse,
        })
        if err != nil {
                t.Fatalf("Failed to create catalog: %v", err)
        }
-       defer catalog.Close()
+       t.Cleanup(func() { catalog.Close() })
 
        table, err := catalog.GetTable(paimon.NewIdentifier("default", 
"simple_log_table"))
        if err != nil {
                t.Fatalf("Failed to get table: %v", err)
        }
-       defer table.Close()
+       t.Cleanup(func() { table.Close() })
+
+       return table
+}
+
+// TestReadLogTable reads the test table and verifies the data matches 
expected values.
+//
+// The table was populated by Docker provisioning with:
+//
+//     (1, 'alice'), (2, 'bob'), (3, 'carol')
+func TestReadLogTable(t *testing.T) {
+       table := openTestTable(t)
+
+       rb, err := table.NewReadBuilder()
+       if err != nil {
+               t.Fatalf("Failed to create read builder: %v", err)
+       }
+       defer rb.Close()
+
+       rows := readRows(t, rb)
+       if len(rows) == 0 {
+               t.Fatal("Expected at least one row, got 0")
+       }
+
+       sort.Slice(rows, func(i, j int) bool { return rows[i].id < rows[j].id })
+
+       expected := []row{{1, "alice"}, {2, "bob"}, {3, "carol"}}
+       if len(rows) != len(expected) {
+               t.Fatalf("Expected %d rows, got %d: %v", len(expected), 
len(rows), rows)
+       }
+       for i, exp := range expected {
+               if rows[i] != exp {
+                       t.Errorf("Row %d: expected %v, got %v", i, exp, rows[i])
+               }
+       }
+}
+
+// TestReadWithFilter exercises filter push-down through several sub-tests.
+func TestReadWithFilter(t *testing.T) {
+       table := openTestTable(t)
+
+       t.Run("EqualById", func(t *testing.T) {
+               rb, err := table.NewReadBuilder()
+               if err != nil {
+                       t.Fatalf("Failed to create read builder: %v", err)
+               }
+               defer rb.Close()
+
+               // id = 1
+               pb := table.PredicateBuilder()
+               pred, err := pb.Eq("id", 1)
+               if err != nil {
+                       t.Fatalf("Failed to create predicate: %v", err)
+               }
+               if err := rb.WithFilter(pred); err != nil {
+                       t.Fatalf("Failed to set filter: %v", err)
+               }
+
+               rows := readRows(t, rb)
+               expected := []row{{1, "alice"}}
+               if len(rows) != len(expected) {
+                       t.Fatalf("Expected %d rows, got %d: %v", len(expected), 
len(rows), rows)
+               }
+               if rows[0] != expected[0] {
+                       t.Errorf("Expected %v, got %v", expected[0], rows[0])
+               }
+       })
+
+       t.Run("EmptyStringEqual", func(t *testing.T) {
+               rb, err := table.NewReadBuilder()
+               if err != nil {
+                       t.Fatalf("Failed to create read builder: %v", err)
+               }
+               defer rb.Close()
+
+               pb := table.PredicateBuilder()
+               pred, err := pb.Eq("name", "")
+               if err != nil {
+                       t.Fatalf("Eq with empty string failed: %v", err)
+               }
+               if err := rb.WithFilter(pred); err != nil {
+                       t.Fatalf("WithFilter failed: %v", err)
+               }
+
+               rows := readRows(t, rb)
+               if len(rows) != 0 {
+                       t.Fatalf("Expected 0 rows for empty string filter, got 
%d: %v", len(rows), rows)
+               }
+       })
+}
+
+// TestReadWithProjection reads only the "id" column via WithProjection and
+// verifies that only the projected column is returned with correct values.
+func TestReadWithProjection(t *testing.T) {
+       table := openTestTable(t)
 
        rb, err := table.NewReadBuilder()
        if err != nil {
@@ -191,7 +229,6 @@ func TestReadWithProjection(t *testing.T) {
        }
        defer rb.Close()
 
-       // Set projection to only read "id" column
        if err := rb.WithProjection([]string{"id"}); err != nil {
                t.Fatalf("Failed to set projection: %v", err)
        }
@@ -236,7 +273,6 @@ func TestReadWithProjection(t *testing.T) {
                        t.Fatalf("Batch %d: failed to read next record: %v", 
batchIdx, err)
                }
 
-               // Verify schema only contains the projected column
                schema := record.Schema()
                if schema.NumFields() != 1 {
                        record.Release()
diff --git a/bindings/go/types.go b/bindings/go/types.go
index fdc8990..7d943cb 100644
--- a/bindings/go/types.go
+++ b/bindings/go/types.go
@@ -120,6 +120,34 @@ var (
                }[0],
        }
 
+       // paimon_result_predicate { predicate: *paimon_predicate, error: 
*paimon_error }
+       typeResultPredicate = ffi.Type{
+               Type: ffi.Struct,
+               Elements: &[]*ffi.Type{
+                       &ffi.TypePointer,
+                       &ffi.TypePointer,
+                       nil,
+               }[0],
+       }
+
+       // paimon_datum { tag: i32, int_val: i64, double_val: f64, str_data: 
*u8, str_len: usize,
+       //                int_val2: i64, uint_val: u32, uint_val2: u32 }
+       typePaimonDatum = ffi.Type{
+               Type: ffi.Struct,
+               Elements: &[]*ffi.Type{
+                       &ffi.TypeSint32,  // tag
+                       &ffi.TypeSint32,  // padding
+                       &ffi.TypeSint64,  // int_val
+                       &ffi.TypeDouble,  // double_val
+                       &ffi.TypePointer, // str_data
+                       &ffi.TypePointer, // str_len (usize)
+                       &ffi.TypeSint64,  // int_val2
+                       &ffi.TypeUint32,  // uint_val
+                       &ffi.TypeUint32,  // uint_val2
+                       nil,
+               }[0],
+       }
+
        // paimon_result_next_batch { batch: paimon_arrow_batch, error: 
*paimon_error }
        typeResultNextBatch = ffi.Type{
                Type: ffi.Struct,
@@ -153,6 +181,7 @@ type paimonTableScan struct{}
 type paimonTableRead struct{}
 type paimonPlan struct{}
 type paimonRecordBatchReader struct{}
+type paimonPredicate struct{}
 
 // Result types matching the C repr structs
 type resultCatalogNew struct {
@@ -195,6 +224,24 @@ type resultRecordBatchReader struct {
        error  *paimonError
 }
 
+type resultPredicate struct {
+       predicate *paimonPredicate
+       error     *paimonError
+}
+
+// paimonDatumC mirrors the C paimon_datum struct.
+type paimonDatumC struct {
+       tag      int32
+       _pad0    [4]byte // padding for alignment
+       intVal   int64
+       dblVal   float64
+       strData  *byte
+       strLen   uintptr
+       intVal2  int64
+       uintVal  uint32
+       uintVal2 uint32
+}
+
 // arrowBatch holds a single Arrow record batch via the Arrow C Data Interface.
 type arrowBatch struct {
        ctx      context.Context
diff --git a/crates/paimon/src/table/read_builder.rs 
b/crates/paimon/src/table/read_builder.rs
index c9be32c..c8c3d86 100644
--- a/crates/paimon/src/table/read_builder.rs
+++ b/crates/paimon/src/table/read_builder.rs
@@ -272,6 +272,11 @@ impl<'a> TableRead<'a> {
         &self.read_type
     }
 
+    /// Data predicates for read-side pruning.
+    pub fn data_predicates(&self) -> &[Predicate] {
+        &self.data_predicates
+    }
+
     /// Table for this read.
     pub fn table(&self) -> &Table {
         self.table
diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml
index 37d95e2..2b756e2 100644
--- a/docs/mkdocs.yml
+++ b/docs/mkdocs.yml
@@ -49,6 +49,7 @@ nav:
   - Home: index.md
   - Getting Started: getting-started.md
   - DataFusion Integration: datafusion.md
+  - Go Integration: go-binding.md
   - Architecture: architecture.md
   - Releases: releases.md
   - Contributing: contributing.md
diff --git a/docs/src/go-binding.md b/docs/src/go-binding.md
new file mode 100644
index 0000000..4157111
--- /dev/null
+++ b/docs/src/go-binding.md
@@ -0,0 +1,397 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Go Integration
+
+The Go integration is a binding built on top of Apache Paimon Rust, allowing 
you to access Paimon tables from Go programs. It uses the [Arrow C Data 
Interface](https://arrow.apache.org/docs/format/CDataInterface.html) for 
zero-copy data transfer.
+
+## Prerequisites
+
+- Go 1.22.4 or later
+- Supported platforms: Linux (amd64, arm64), macOS (amd64, arm64)
+
+## Installation
+
+```bash
+go get github.com/apache/paimon-rust/bindings/go
+```
+
+The pre-built native library is embedded in the package and automatically 
loaded at runtime — no manual build step is needed.
+
+## Creating a Catalog
+
+Use `NewCatalog` with a map of options to create a catalog. The catalog type 
is determined by the `metastore` option (default: `filesystem`).
+
+```go
+import paimon "github.com/apache/paimon-rust/bindings/go"
+
+// Local filesystem
+catalog, err := paimon.NewCatalog(map[string]string{
+    "warehouse": "/path/to/warehouse",
+})
+if err != nil {
+    log.Fatal(err)
+}
+defer catalog.Close()
+```
+
+### Alibaba Cloud OSS
+
+```go
+catalog, err := paimon.NewCatalog(map[string]string{
+    "warehouse":              "oss://bucket/warehouse",
+    "fs.oss.accessKeyId":     "your-access-key-id",
+    "fs.oss.accessKeySecret": "your-access-key-secret",
+    "fs.oss.endpoint":        "oss-cn-hangzhou.aliyuncs.com",
+})
+```
+
+### REST Catalog
+
+```go
+catalog, err := paimon.NewCatalog(map[string]string{
+    "metastore": "rest",
+    "uri":       "http://localhost:8080";,
+    "warehouse": "my_warehouse",
+})
+```
+
+## Reading a Table
+
+Paimon Go uses a **scan-then-read** pattern: first scan the table to produce 
splits, then read data from those splits as Arrow RecordBatches.
+
+```go
+import (
+    "errors"
+    "fmt"
+    "io"
+
+    "github.com/apache/arrow-go/v18/arrow/array"
+    paimon "github.com/apache/paimon-rust/bindings/go"
+)
+
+// Get a table from the catalog
+table, err := catalog.GetTable(paimon.NewIdentifier("default", "my_table"))
+if err != nil {
+    log.Fatal(err)
+}
+defer table.Close()
+
+// Create a read builder
+rb, err := table.NewReadBuilder()
+if err != nil {
+    log.Fatal(err)
+}
+defer rb.Close()
+
+// Step 1: Scan — produces a Plan containing DataSplits
+scan, err := rb.NewScan()
+if err != nil {
+    log.Fatal(err)
+}
+defer scan.Close()
+
+plan, err := scan.Plan()
+if err != nil {
+    log.Fatal(err)
+}
+defer plan.Close()
+
+splits := plan.Splits()
+
+// Step 2: Read — consumes splits and returns Arrow RecordBatches
+read, err := rb.NewRead()
+if err != nil {
+    log.Fatal(err)
+}
+defer read.Close()
+
+reader, err := read.NewRecordBatchReader(splits)
+if err != nil {
+    log.Fatal(err)
+}
+defer reader.Close()
+
+for {
+    record, err := reader.NextRecord()
+    if errors.Is(err, io.EOF) {
+        break
+    }
+    if err != nil {
+        log.Fatal(err)
+    }
+    fmt.Println(record)
+    record.Release()
+}
+```
+
+## Column Projection
+
+Use `WithProjection` to select specific columns. Only the requested columns 
are read, reducing I/O.
+
+```go
+rb, err := table.NewReadBuilder()
+if err != nil {
+    log.Fatal(err)
+}
+defer rb.Close()
+
+// Only read the "id" and "name" columns
+if err := rb.WithProjection([]string{"id", "name"}); err != nil {
+    log.Fatal(err)
+}
+
+// Continue with scan-then-read as above...
+```
+
+## Filter Push-Down
+
+Filter push-down prunes data at two levels:
+
+1. **Scan planning** — skips partitions, buckets, and data files based on 
file-level statistics (min/max).
+2. **Read-side** — applies row-level filtering via Parquet native row filters 
for leaf predicates.
+
+!!! warning
+    Filter push-down is a **best-effort** optimization. The returned results 
may still contain rows that do not satisfy the filter condition. Callers should 
always apply residual filtering on the returned records to ensure correctness.
+
+### Building Predicates
+
+Create predicates through the `PredicateBuilder` obtained from a table:
+
+```go
+pb := table.PredicateBuilder()
+
+// Comparison predicates
+pred, err := pb.Eq("id", 1)           // id = 1
+pred, err := pb.NotEq("name", "bob")  // name != "bob"
+pred, err := pb.Lt("id", 3)           // id < 3
+pred, err := pb.Le("id", 2)           // id <= 2
+pred, err := pb.Gt("id", 1)           // id > 1
+pred, err := pb.Ge("id", 2)           // id >= 2
+
+// Null checks
+pred, err := pb.IsNull("name")        // name IS NULL
+pred, err := pb.IsNotNull("name")     // name IS NOT NULL
+
+// IN / NOT IN
+pred, err := pb.In("id", 1, 2, 3)          // id IN (1, 2, 3)
+pred, err := pb.NotIn("name", "x", "y")    // name NOT IN ("x", "y")
+```
+
+### Applying Filters
+
+Pass a predicate to `WithFilter` on the `ReadBuilder`:
+
+```go
+rb, err := table.NewReadBuilder()
+if err != nil {
+    log.Fatal(err)
+}
+defer rb.Close()
+
+pb := table.PredicateBuilder()
+pred, err := pb.Eq("id", 1)
+if err != nil {
+    log.Fatal(err)
+}
+
+// Ownership of pred is transferred — do NOT close it after this call
+if err := rb.WithFilter(pred); err != nil {
+    log.Fatal(err)
+}
+
+// Continue with scan-then-read...
+```
+
+### Compound Predicates
+
+Combine predicates with `And`, `Or`, and `Not`. The `predicate` sub-package 
provides variadic helpers:
+
+```go
+import (
+    paimon "github.com/apache/paimon-rust/bindings/go"
+    "github.com/apache/paimon-rust/bindings/go/predicate"
+)
+
+pb := table.PredicateBuilder()
+
+p1, _ := pb.Ge("id", 1)
+p2, _ := pb.Le("id", 3)
+p3, _ := pb.Eq("name", "alice")
+
+// id >= 1 AND id <= 3
+combined, err := predicate.And(p1, p2)
+
+// (id >= 1 AND id <= 3) OR name = "alice"
+combined, err = predicate.Or(combined, p3)
+
+// NOT (...)
+negated, err := predicate.Not(combined)
+```
+
+!!! note "Predicate Ownership"
+    Predicates follow a **move** ownership model. After passing a predicate to 
`WithFilter`, `And`, `Or`, or `Not`, the predicate is consumed and must NOT be 
closed or reused by the caller.
+
+### Supported Datum Types
+
+Predicate values are automatically converted from Go types:
+
+| Go Type                     | Paimon Type          |
+|-----------------------------|----------------------|
+| `bool`                      | Bool                 |
+| `int8`                      | TinyInt              |
+| `int16`                     | SmallInt             |
+| `int32`                     | Int                  |
+| `int` / `int64`             | Int or Long          |
+| `float32`                   | Float                |
+| `float64`                   | Double               |
+| `string`                    | String               |
+| `paimon.Date`               | Date (epoch days)    |
+| `paimon.Time`               | Time (millis)        |
+| `paimon.Timestamp`          | Timestamp            |
+| `paimon.LocalZonedTimestamp` | LocalZonedTimestamp |
+| `paimon.Decimal`            | Decimal              |
+| `paimon.Bytes`              | Binary               |
+
+For special types, use the dedicated constructors:
+
+```go
+// Date as epoch days since 1970-01-01
+pred, _ := pb.Eq("dt", paimon.Date(19000))
+
+// Decimal(123.45) as DECIMAL(10,2)
+pred, _ := pb.Eq("amount", paimon.NewDecimal(12345, 10, 2))
+
+// Timestamp
+pred, _ := pb.Eq("ts", paimon.Timestamp{Millis: 1700000000000, Nanos: 0})
+```
+
+## Resource Management
+
+All Paimon objects (`Catalog`, `Table`, `ReadBuilder`, `TableScan`, `Plan`, 
`TableRead`, `RecordBatchReader`) hold native resources and must be closed when 
no longer needed. Use `defer` to ensure cleanup:
+
+```go
+catalog, err := paimon.NewCatalog(opts)
+if err != nil { log.Fatal(err) }
+defer catalog.Close()
+
+table, err := catalog.GetTable(id)
+if err != nil { log.Fatal(err) }
+defer table.Close()
+
+// ... and so on for ReadBuilder, TableScan, Plan, TableRead, RecordBatchReader
+```
+
+All `Close()` methods are safe to call multiple times.
+
+## Complete Example
+
+```go
+package main
+
+import (
+    "errors"
+    "fmt"
+    "io"
+    "log"
+
+    "github.com/apache/arrow-go/v18/arrow/array"
+    paimon "github.com/apache/paimon-rust/bindings/go"
+)
+
+func main() {
+    // 1. Open catalog and table
+    catalog, err := paimon.NewCatalog(map[string]string{
+        "warehouse": "/tmp/paimon-warehouse",
+    })
+    if err != nil {
+        log.Fatal(err)
+    }
+    defer catalog.Close()
+
+    table, err := catalog.GetTable(paimon.NewIdentifier("default", "my_table"))
+    if err != nil {
+        log.Fatal(err)
+    }
+    defer table.Close()
+
+    // 2. Configure read: projection + filter
+    rb, err := table.NewReadBuilder()
+    if err != nil {
+        log.Fatal(err)
+    }
+    defer rb.Close()
+
+    if err := rb.WithProjection([]string{"id", "name"}); err != nil {
+        log.Fatal(err)
+    }
+
+    pb := table.PredicateBuilder()
+    pred, err := pb.Gt("id", 0)
+    if err != nil {
+        log.Fatal(err)
+    }
+    if err := rb.WithFilter(pred); err != nil {
+        log.Fatal(err)
+    }
+
+    // 3. Scan
+    scan, err := rb.NewScan()
+    if err != nil {
+        log.Fatal(err)
+    }
+    defer scan.Close()
+
+    plan, err := scan.Plan()
+    if err != nil {
+        log.Fatal(err)
+    }
+    defer plan.Close()
+
+    // 4. Read
+    read, err := rb.NewRead()
+    if err != nil {
+        log.Fatal(err)
+    }
+    defer read.Close()
+
+    reader, err := read.NewRecordBatchReader(plan.Splits())
+    if err != nil {
+        log.Fatal(err)
+    }
+    defer reader.Close()
+
+    for {
+        record, err := reader.NextRecord()
+        if errors.Is(err, io.EOF) {
+            break
+        }
+        if err != nil {
+            log.Fatal(err)
+        }
+
+        idCol := record.Column(0).(*array.Int32)
+        nameCol := record.Column(1).(*array.String)
+        for i := 0; i < int(record.NumRows()); i++ {
+            fmt.Printf("id=%d name=%s\n", idCol.Value(i), nameCol.Value(i))
+        }
+        record.Release()
+    }
+}
+```

Reply via email to