This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new cc13352 feat(go): add filter push-down and predicate API for Go
binding (#216)
cc13352 is described below
commit cc1335207d5755f4b20e71c08c0209ea13135b88
Author: yuxia Luo <[email protected]>
AuthorDate: Wed Apr 8 12:13:33 2026 +0800
feat(go): add filter push-down and predicate API for Go binding (#216)
Add complete predicate support to the Go binding, enabling filter
push-down for table scans. This includes:
- C FFI layer: predicate construction functions (equal, not_equal,
less_than, greater_than, is_null, is_in, etc.), combinators
(and/or/not), with_filter on ReadBuilder, and fix scan to
propagate filters via TableScanState
- Go binding: Predicate type, Go-native value types (Date, Time,
Timestamp, LocalZonedTimestamp, Decimal, Bytes), automatic
type inference from Go literals (int, string, bool, etc.),
and ReadBuilder.WithFilter method
- Tests: TestReadWithFilter with shared test helpers
---
bindings/c/src/result.rs | 6 +
bindings/c/src/table.rs | 586 ++++++++++++++++++++++++++++++-
bindings/c/src/types.rs | 61 +++-
bindings/go/predicate.go | 595 ++++++++++++++++++++++++++++++++
bindings/go/predicate/predicate.go | 70 ++++
bindings/go/read_builder.go | 60 ++++
bindings/go/table.go | 5 +
bindings/go/tests/paimon_test.go | 186 ++++++----
bindings/go/types.go | 47 +++
crates/paimon/src/table/read_builder.rs | 5 +
docs/mkdocs.yml | 1 +
docs/src/go-binding.md | 397 +++++++++++++++++++++
12 files changed, 1925 insertions(+), 94 deletions(-)
diff --git a/bindings/c/src/result.rs b/bindings/c/src/result.rs
index a4fd62b..19d523c 100644
--- a/bindings/c/src/result.rs
+++ b/bindings/c/src/result.rs
@@ -66,6 +66,12 @@ pub struct paimon_result_record_batch_reader {
pub error: *mut paimon_error,
}
+#[repr(C)]
+pub struct paimon_result_predicate {
+ pub predicate: *mut paimon_predicate,
+ pub error: *mut paimon_error,
+}
+
#[repr(C)]
pub struct paimon_result_next_batch {
pub batch: paimon_arrow_batch,
diff --git a/bindings/c/src/table.rs b/bindings/c/src/table.rs
index 6f6637d..c6496f6 100644
--- a/bindings/c/src/table.rs
+++ b/bindings/c/src/table.rs
@@ -20,23 +20,18 @@ use std::ffi::c_void;
use arrow_array::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
use arrow_array::{Array, StructArray};
use futures::StreamExt;
+use paimon::spec::{DataField, DataType, Datum, Predicate, PredicateBuilder};
use paimon::table::{ArrowRecordBatchStream, Table};
use paimon::Plan;
-use crate::error::{check_non_null, paimon_error};
+use crate::error::{check_non_null, paimon_error, validate_cstr,
PaimonErrorCode};
use crate::result::{
- paimon_result_new_read, paimon_result_next_batch, paimon_result_plan,
+ paimon_result_new_read, paimon_result_next_batch, paimon_result_plan,
paimon_result_predicate,
paimon_result_read_builder, paimon_result_record_batch_reader,
paimon_result_table_scan,
};
use crate::runtime;
use crate::types::*;
-// Helper to box a Table clone into a wrapper struct and return a raw pointer.
-unsafe fn box_table_wrapper<T>(table: &Table, make: impl FnOnce(*mut c_void)
-> T) -> *mut T {
- let inner = Box::into_raw(Box::new(table.clone())) as *mut c_void;
- Box::into_raw(Box::new(make(inner)))
-}
-
// Helper to free a wrapper struct that contains a Table clone.
unsafe fn free_table_wrapper<T>(ptr: *mut T, get_inner: impl FnOnce(&T) ->
*mut c_void) {
if !ptr.is_null() {
@@ -89,6 +84,7 @@ pub unsafe extern "C" fn paimon_table_new_read_builder(
let state = ReadBuilderState {
table: table_ref.clone(),
projected_columns: None,
+ filter: None,
};
paimon_result_read_builder {
read_builder: box_read_builder_state(state),
@@ -157,6 +153,36 @@ pub unsafe extern "C" fn
paimon_read_builder_with_projection(
std::ptr::null_mut()
}
+/// Set a filter predicate for scan planning.
+///
+/// The predicate is consumed (ownership transferred to the read builder).
+/// Pass null to clear any previously set filter.
+///
+/// # Safety
+/// `rb` must be a valid pointer from `paimon_table_new_read_builder`, or null
(returns error).
+/// `predicate` must be a valid pointer from a `paimon_predicate_*` function,
or null.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_read_builder_with_filter(
+ rb: *mut paimon_read_builder,
+ predicate: *mut paimon_predicate,
+) -> *mut paimon_error {
+ if let Err(e) = check_non_null(rb, "rb") {
+ return e;
+ }
+
+ let state = &mut *((*rb).inner as *mut ReadBuilderState);
+
+ if predicate.is_null() {
+ state.filter = None;
+ return std::ptr::null_mut();
+ }
+
+ let pred_wrapper = Box::from_raw(predicate);
+ let pred = Box::from_raw(pred_wrapper.inner as *mut Predicate);
+ state.filter = Some(*pred);
+ std::ptr::null_mut()
+}
+
/// Create a new TableScan from a ReadBuilder.
///
/// # Safety
@@ -172,8 +198,13 @@ pub unsafe extern "C" fn paimon_read_builder_new_scan(
};
}
let state = &*((*rb).inner as *const ReadBuilderState);
+ let scan_state = TableScanState {
+ table: state.table.clone(),
+ filter: state.filter.clone(),
+ };
+ let inner = Box::into_raw(Box::new(scan_state)) as *mut c_void;
paimon_result_table_scan {
- scan: box_table_wrapper(&state.table, |inner| paimon_table_scan {
inner }),
+ scan: Box::into_raw(Box::new(paimon_table_scan { inner })),
error: std::ptr::null_mut(),
}
}
@@ -201,11 +232,17 @@ pub unsafe extern "C" fn paimon_read_builder_new_read(
rb_rust.with_projection(&col_refs);
}
+ // Apply filter if set
+ if let Some(ref filter) = state.filter {
+ rb_rust.with_filter(filter.clone());
+ }
+
match rb_rust.new_read() {
Ok(table_read) => {
let read_state = TableReadState {
table: state.table.clone(),
read_type: table_read.read_type().to_vec(),
+ data_predicates: table_read.data_predicates().to_vec(),
};
paimon_result_new_read {
read: box_table_read_state(read_state),
@@ -227,7 +264,12 @@ pub unsafe extern "C" fn paimon_read_builder_new_read(
/// Only call with a scan returned from `paimon_read_builder_new_scan`.
#[no_mangle]
pub unsafe extern "C" fn paimon_table_scan_free(scan: *mut paimon_table_scan) {
- free_table_wrapper(scan, |s| s.inner);
+ if !scan.is_null() {
+ let wrapper = Box::from_raw(scan);
+ if !wrapper.inner.is_null() {
+ drop(Box::from_raw(wrapper.inner as *mut TableScanState));
+ }
+ }
}
/// Execute a scan plan to get splits.
@@ -244,8 +286,11 @@ pub unsafe extern "C" fn paimon_table_scan_plan(
error: e,
};
}
- let table = &*((*scan).inner as *const Table);
- let rb = table.new_read_builder();
+ let scan_state = &*((*scan).inner as *const TableScanState);
+ let mut rb = scan_state.table.new_read_builder();
+ if let Some(ref filter) = scan_state.filter {
+ rb.with_filter(filter.clone());
+ }
let table_scan = rb.new_scan();
match runtime().block_on(table_scan.plan()) {
@@ -349,10 +394,11 @@ pub unsafe extern "C" fn paimon_table_read_to_arrow(
let end = (offset.saturating_add(length)).min(all_splits.len());
let selected = &all_splits[start..end];
- // C bindings currently persist only the projection, so reconstructing the
- // read uses an empty predicate set.
- let table_read =
- paimon::table::TableRead::new(&state.table, state.read_type.clone(),
Vec::new());
+ let table_read = paimon::table::TableRead::new(
+ &state.table,
+ state.read_type.clone(),
+ state.data_predicates.clone(),
+ );
match table_read.to_arrow(selected) {
Ok(stream) => {
@@ -476,3 +522,511 @@ pub unsafe extern "C" fn paimon_arrow_batch_free(batch:
paimon_arrow_batch) {
drop(Box::from_raw(batch.schema as *mut FFI_ArrowSchema));
}
}
+
+// ======================= Predicate ===============================
+
+/// Convert a C datum to a Rust Datum.
+unsafe fn datum_from_c(d: &paimon_datum) -> Result<Datum, *mut paimon_error> {
+ match d.tag {
+ 0 => Ok(Datum::Bool(d.int_val != 0)),
+ 1 => Ok(Datum::TinyInt(d.int_val as i8)),
+ 2 => Ok(Datum::SmallInt(d.int_val as i16)),
+ 3 => Ok(Datum::Int(d.int_val as i32)),
+ 4 => Ok(Datum::Long(d.int_val)),
+ 5 => Ok(Datum::Float(d.double_val as f32)),
+ 6 => Ok(Datum::Double(d.double_val)),
+ 7 => {
+ if d.str_len == 0 {
+ return Ok(Datum::String(String::new()));
+ }
+ if d.str_data.is_null() {
+ return Err(paimon_error::new(
+ PaimonErrorCode::InvalidInput,
+ "null string data in datum with non-zero
length".to_string(),
+ ));
+ }
+ let bytes = std::slice::from_raw_parts(d.str_data, d.str_len);
+ let s = std::str::from_utf8(bytes).map_err(|e| {
+ paimon_error::new(
+ PaimonErrorCode::InvalidInput,
+ format!("invalid UTF-8 in datum string: {e}"),
+ )
+ })?;
+ Ok(Datum::String(s.to_string()))
+ }
+ 8 => Ok(Datum::Date(d.int_val as i32)),
+ 9 => Ok(Datum::Time(d.int_val as i32)),
+ 10 => Ok(Datum::Timestamp {
+ millis: d.int_val,
+ nanos: d.int_val2 as i32,
+ }),
+ 11 => Ok(Datum::LocalZonedTimestamp {
+ millis: d.int_val,
+ nanos: d.int_val2 as i32,
+ }),
+ 12 => {
+ let unscaled = ((d.int_val2 as i128) << 64) | (d.int_val as u64 as
i128);
+ Ok(Datum::Decimal {
+ unscaled,
+ precision: d.uint_val,
+ scale: d.uint_val2,
+ })
+ }
+ 13 => {
+ if d.str_data.is_null() && d.str_len > 0 {
+ return Err(paimon_error::new(
+ PaimonErrorCode::InvalidInput,
+ "null bytes data in datum".to_string(),
+ ));
+ }
+ let bytes = if d.str_len > 0 {
+ std::slice::from_raw_parts(d.str_data, d.str_len).to_vec()
+ } else {
+ Vec::new()
+ };
+ Ok(Datum::Bytes(bytes))
+ }
+ _ => Err(paimon_error::new(
+ PaimonErrorCode::InvalidInput,
+ format!("unknown datum tag: {}", d.tag),
+ )),
+ }
+}
+
+/// Coerce an integer-family datum to match the target column's integer type.
+///
+/// FFI callers (e.g. Go) often pass a narrower integer literal (Int) for a
+/// wider column (BigInt). This function widens or narrows the datum to match,
+/// checking range for narrowing conversions.
+///
+/// Non-integer datums or non-integer columns are returned as-is.
+fn coerce_integer_datum(
+ datum: Datum,
+ fields: &[DataField],
+ column: &str,
+) -> Result<Datum, *mut paimon_error> {
+ let val = match &datum {
+ Datum::TinyInt(v) => *v as i64,
+ Datum::SmallInt(v) => *v as i64,
+ Datum::Int(v) => *v as i64,
+ Datum::Long(v) => *v,
+ _ => return Ok(datum),
+ };
+
+ let Some(field) = fields.iter().find(|f| f.name() == column) else {
+ // Column not found; let PredicateBuilder produce the proper error.
+ return Ok(datum);
+ };
+
+ match field.data_type() {
+ DataType::TinyInt(_) if !matches!(datum, Datum::TinyInt(_)) => {
+ if val < i8::MIN as i64 || val > i8::MAX as i64 {
+ Err(paimon_error::new(
+ PaimonErrorCode::InvalidInput,
+ format!("value {val} out of range for TinyInt column
'{column}'"),
+ ))
+ } else {
+ Ok(Datum::TinyInt(val as i8))
+ }
+ }
+ DataType::SmallInt(_) if !matches!(datum, Datum::SmallInt(_)) => {
+ if val < i16::MIN as i64 || val > i16::MAX as i64 {
+ Err(paimon_error::new(
+ PaimonErrorCode::InvalidInput,
+ format!("value {val} out of range for SmallInt column
'{column}'"),
+ ))
+ } else {
+ Ok(Datum::SmallInt(val as i16))
+ }
+ }
+ DataType::Int(_) if !matches!(datum, Datum::Int(_)) => {
+ if val < i32::MIN as i64 || val > i32::MAX as i64 {
+ Err(paimon_error::new(
+ PaimonErrorCode::InvalidInput,
+ format!("value {val} out of range for Int column
'{column}'"),
+ ))
+ } else {
+ Ok(Datum::Int(val as i32))
+ }
+ }
+ DataType::BigInt(_) if !matches!(datum, Datum::Long(_)) =>
Ok(Datum::Long(val)),
+ _ => Ok(datum),
+ }
+}
+
+/// Helper to build a leaf predicate that takes a datum, via PredicateBuilder.
+unsafe fn build_leaf_predicate_datum(
+ table: *const paimon_table,
+ column: *const std::ffi::c_char,
+ datum: &paimon_datum,
+ build_fn: impl FnOnce(&PredicateBuilder, &str, Datum) ->
paimon::Result<Predicate>,
+) -> paimon_result_predicate {
+ if let Err(e) = check_non_null(table, "table") {
+ return paimon_result_predicate {
+ predicate: std::ptr::null_mut(),
+ error: e,
+ };
+ }
+ let col_name = match validate_cstr(column, "column") {
+ Ok(s) => s,
+ Err(e) => {
+ return paimon_result_predicate {
+ predicate: std::ptr::null_mut(),
+ error: e,
+ }
+ }
+ };
+
+ let d = match datum_from_c(datum) {
+ Ok(d) => d,
+ Err(e) => {
+ return paimon_result_predicate {
+ predicate: std::ptr::null_mut(),
+ error: e,
+ }
+ }
+ };
+
+ let table_ref = &*((*table).inner as *const Table);
+ let fields = table_ref.schema().fields();
+
+ let d = match coerce_integer_datum(d, fields, &col_name) {
+ Ok(d) => d,
+ Err(e) => {
+ return paimon_result_predicate {
+ predicate: std::ptr::null_mut(),
+ error: e,
+ }
+ }
+ };
+
+ let pb = PredicateBuilder::new(fields);
+ match build_fn(&pb, &col_name, d) {
+ Ok(pred) => {
+ let inner = Box::into_raw(Box::new(pred)) as *mut c_void;
+ paimon_result_predicate {
+ predicate: Box::into_raw(Box::new(paimon_predicate { inner })),
+ error: std::ptr::null_mut(),
+ }
+ }
+ Err(e) => paimon_result_predicate {
+ predicate: std::ptr::null_mut(),
+ error: paimon_error::from_paimon(e),
+ },
+ }
+}
+
+/// Helper to build a leaf predicate without a datum (IS NULL / IS NOT NULL).
+unsafe fn build_leaf_predicate(
+ table: *const paimon_table,
+ column: *const std::ffi::c_char,
+ build_fn: impl FnOnce(&PredicateBuilder, &str) ->
paimon::Result<Predicate>,
+) -> paimon_result_predicate {
+ if let Err(e) = check_non_null(table, "table") {
+ return paimon_result_predicate {
+ predicate: std::ptr::null_mut(),
+ error: e,
+ };
+ }
+ let col_name = match validate_cstr(column, "column") {
+ Ok(s) => s,
+ Err(e) => {
+ return paimon_result_predicate {
+ predicate: std::ptr::null_mut(),
+ error: e,
+ }
+ }
+ };
+ let table_ref = &*((*table).inner as *const Table);
+ let pb = PredicateBuilder::new(table_ref.schema().fields());
+ match build_fn(&pb, &col_name) {
+ Ok(pred) => {
+ let inner = Box::into_raw(Box::new(pred)) as *mut c_void;
+ paimon_result_predicate {
+ predicate: Box::into_raw(Box::new(paimon_predicate { inner })),
+ error: std::ptr::null_mut(),
+ }
+ }
+ Err(e) => paimon_result_predicate {
+ predicate: std::ptr::null_mut(),
+ error: paimon_error::from_paimon(e),
+ },
+ }
+}
+
+/// Create an equality predicate: `column = datum`.
+///
+/// # Safety
+/// `table` and `column` must be valid pointers.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_predicate_equal(
+ table: *const paimon_table,
+ column: *const std::ffi::c_char,
+ datum: paimon_datum,
+) -> paimon_result_predicate {
+ build_leaf_predicate_datum(table, column, &datum, |pb, col, d|
pb.equal(col, d))
+}
+
+/// Create a not-equal predicate: `column != datum`.
+///
+/// # Safety
+/// `table` and `column` must be valid pointers.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_predicate_not_equal(
+ table: *const paimon_table,
+ column: *const std::ffi::c_char,
+ datum: paimon_datum,
+) -> paimon_result_predicate {
+ build_leaf_predicate_datum(table, column, &datum, |pb, col, d|
pb.not_equal(col, d))
+}
+
+/// Create a less-than predicate: `column < datum`.
+///
+/// # Safety
+/// `table` and `column` must be valid pointers.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_predicate_less_than(
+ table: *const paimon_table,
+ column: *const std::ffi::c_char,
+ datum: paimon_datum,
+) -> paimon_result_predicate {
+ build_leaf_predicate_datum(table, column, &datum, |pb, col, d|
pb.less_than(col, d))
+}
+
+/// Create a less-or-equal predicate: `column <= datum`.
+///
+/// # Safety
+/// `table` and `column` must be valid pointers.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_predicate_less_or_equal(
+ table: *const paimon_table,
+ column: *const std::ffi::c_char,
+ datum: paimon_datum,
+) -> paimon_result_predicate {
+ build_leaf_predicate_datum(table, column, &datum, |pb, col, d|
pb.less_or_equal(col, d))
+}
+
+/// Create a greater-than predicate: `column > datum`.
+///
+/// # Safety
+/// `table` and `column` must be valid pointers.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_predicate_greater_than(
+ table: *const paimon_table,
+ column: *const std::ffi::c_char,
+ datum: paimon_datum,
+) -> paimon_result_predicate {
+ build_leaf_predicate_datum(table, column, &datum, |pb, col, d|
pb.greater_than(col, d))
+}
+
+/// Create a greater-or-equal predicate: `column >= datum`.
+///
+/// # Safety
+/// `table` and `column` must be valid pointers.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_predicate_greater_or_equal(
+ table: *const paimon_table,
+ column: *const std::ffi::c_char,
+ datum: paimon_datum,
+) -> paimon_result_predicate {
+ build_leaf_predicate_datum(table, column, &datum, |pb, col, d| {
+ pb.greater_or_equal(col, d)
+ })
+}
+
+/// Create an IS NULL predicate.
+///
+/// # Safety
+/// `table` and `column` must be valid pointers.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_predicate_is_null(
+ table: *const paimon_table,
+ column: *const std::ffi::c_char,
+) -> paimon_result_predicate {
+ build_leaf_predicate(table, column, |pb, col| pb.is_null(col))
+}
+
+/// Create an IS NOT NULL predicate.
+///
+/// # Safety
+/// `table` and `column` must be valid pointers.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_predicate_is_not_null(
+ table: *const paimon_table,
+ column: *const std::ffi::c_char,
+) -> paimon_result_predicate {
+ build_leaf_predicate(table, column, |pb, col| pb.is_not_null(col))
+}
+
+/// Create an IN predicate: `column IN (datum1, datum2, ...)`.
+///
+/// # Safety
+/// `table`, `column`, and `datums` must be valid pointers. `datums_len` must
be the length.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_predicate_is_in(
+ table: *const paimon_table,
+ column: *const std::ffi::c_char,
+ datums: *const paimon_datum,
+ datums_len: usize,
+) -> paimon_result_predicate {
+ build_leaf_predicate_datums(table, column, datums, datums_len, |pb, col,
values| {
+ pb.is_in(col, values)
+ })
+}
+
+/// Create a NOT IN predicate: `column NOT IN (datum1, datum2, ...)`.
+///
+/// # Safety
+/// `table`, `column`, and `datums` must be valid pointers. `datums_len` must
be the length.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_predicate_is_not_in(
+ table: *const paimon_table,
+ column: *const std::ffi::c_char,
+ datums: *const paimon_datum,
+ datums_len: usize,
+) -> paimon_result_predicate {
+ build_leaf_predicate_datums(table, column, datums, datums_len, |pb, col,
values| {
+ pb.is_not_in(col, values)
+ })
+}
+
+/// Helper to build an IN/NOT IN predicate with a datum array.
+unsafe fn build_leaf_predicate_datums(
+ table: *const paimon_table,
+ column: *const std::ffi::c_char,
+ datums: *const paimon_datum,
+ datums_len: usize,
+ build_fn: impl FnOnce(&PredicateBuilder, &str, Vec<Datum>) ->
paimon::Result<Predicate>,
+) -> paimon_result_predicate {
+ if let Err(e) = check_non_null(table, "table") {
+ return paimon_result_predicate {
+ predicate: std::ptr::null_mut(),
+ error: e,
+ };
+ }
+ let col_name = match validate_cstr(column, "column") {
+ Ok(s) => s,
+ Err(e) => {
+ return paimon_result_predicate {
+ predicate: std::ptr::null_mut(),
+ error: e,
+ }
+ }
+ };
+
+ if datums.is_null() && datums_len > 0 {
+ return paimon_result_predicate {
+ predicate: std::ptr::null_mut(),
+ error: paimon_error::new(
+ PaimonErrorCode::InvalidInput,
+ "null datums pointer with non-zero length".to_string(),
+ ),
+ };
+ }
+
+ let slice = if datums_len > 0 {
+ std::slice::from_raw_parts(datums, datums_len)
+ } else {
+ &[]
+ };
+ let values: Result<Vec<Datum>, _> = slice.iter().map(|d|
datum_from_c(d)).collect();
+ let values = match values {
+ Ok(v) => v,
+ Err(e) => {
+ return paimon_result_predicate {
+ predicate: std::ptr::null_mut(),
+ error: e,
+ }
+ }
+ };
+
+ let table_ref = &*((*table).inner as *const Table);
+ let fields = table_ref.schema().fields();
+
+ let values: Result<Vec<Datum>, _> = values
+ .into_iter()
+ .map(|d| coerce_integer_datum(d, fields, &col_name))
+ .collect();
+ let values = match values {
+ Ok(v) => v,
+ Err(e) => {
+ return paimon_result_predicate {
+ predicate: std::ptr::null_mut(),
+ error: e,
+ }
+ }
+ };
+
+ let pb = PredicateBuilder::new(fields);
+ match build_fn(&pb, &col_name, values) {
+ Ok(pred) => {
+ let inner = Box::into_raw(Box::new(pred)) as *mut c_void;
+ paimon_result_predicate {
+ predicate: Box::into_raw(Box::new(paimon_predicate { inner })),
+ error: std::ptr::null_mut(),
+ }
+ }
+ Err(e) => paimon_result_predicate {
+ predicate: std::ptr::null_mut(),
+ error: paimon_error::from_paimon(e),
+ },
+ }
+}
+
+/// Combine two predicates with AND. Consumes both inputs.
+///
+/// # Safety
+/// `a` and `b` must be valid pointers from predicate functions.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_predicate_and(
+ a: *mut paimon_predicate,
+ b: *mut paimon_predicate,
+) -> *mut paimon_predicate {
+ let pred_a = *Box::from_raw(Box::from_raw(a).inner as *mut Predicate);
+ let pred_b = *Box::from_raw(Box::from_raw(b).inner as *mut Predicate);
+ let combined = Predicate::and(vec![pred_a, pred_b]);
+ let inner = Box::into_raw(Box::new(combined)) as *mut c_void;
+ Box::into_raw(Box::new(paimon_predicate { inner }))
+}
+
+/// Combine two predicates with OR. Consumes both inputs.
+///
+/// # Safety
+/// `a` and `b` must be valid pointers from predicate functions.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_predicate_or(
+ a: *mut paimon_predicate,
+ b: *mut paimon_predicate,
+) -> *mut paimon_predicate {
+ let pred_a = *Box::from_raw(Box::from_raw(a).inner as *mut Predicate);
+ let pred_b = *Box::from_raw(Box::from_raw(b).inner as *mut Predicate);
+ let combined = Predicate::or(vec![pred_a, pred_b]);
+ let inner = Box::into_raw(Box::new(combined)) as *mut c_void;
+ Box::into_raw(Box::new(paimon_predicate { inner }))
+}
+
+/// Negate a predicate with NOT. Consumes the input.
+///
+/// # Safety
+/// `p` must be a valid pointer from a predicate function.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_predicate_not(p: *mut paimon_predicate) ->
*mut paimon_predicate {
+ let pred = *Box::from_raw(Box::from_raw(p).inner as *mut Predicate);
+ let negated = Predicate::negate(pred);
+ let inner = Box::into_raw(Box::new(negated)) as *mut c_void;
+ Box::into_raw(Box::new(paimon_predicate { inner }))
+}
+
+/// Free a paimon_predicate.
+///
+/// # Safety
+/// Only call with a predicate returned from paimon predicate functions.
+#[no_mangle]
+pub unsafe extern "C" fn paimon_predicate_free(p: *mut paimon_predicate) {
+ if !p.is_null() {
+ let wrapper = Box::from_raw(p);
+ if !wrapper.inner.is_null() {
+ drop(Box::from_raw(wrapper.inner as *mut Predicate));
+ }
+ }
+}
diff --git a/bindings/c/src/types.rs b/bindings/c/src/types.rs
index 6f09042..4df28de 100644
--- a/bindings/c/src/types.rs
+++ b/bindings/c/src/types.rs
@@ -17,7 +17,7 @@
use std::ffi::c_void;
-use paimon::spec::DataField;
+use paimon::spec::{DataField, Predicate};
use paimon::table::Table;
/// C-compatible key-value pair for options.
@@ -78,10 +78,17 @@ pub struct paimon_read_builder {
pub inner: *mut c_void,
}
-/// Internal state for ReadBuilder that stores table and projection columns.
+/// Internal state for ReadBuilder that stores table, projection columns, and
filter.
pub(crate) struct ReadBuilderState {
pub table: Table,
pub projected_columns: Option<Vec<String>>,
+ pub filter: Option<Predicate>,
+}
+
+/// Internal state for TableScan that stores table and filter.
+pub(crate) struct TableScanState {
+ pub table: Table,
+ pub filter: Option<Predicate>,
}
#[repr(C)]
@@ -94,10 +101,11 @@ pub struct paimon_table_read {
pub inner: *mut c_void,
}
-/// Internal state for TableRead that stores table and projected read type.
+/// Internal state for TableRead that stores table, projected read type, and
data predicates.
pub(crate) struct TableReadState {
pub table: Table,
pub read_type: Vec<DataField>,
+ pub data_predicates: Vec<Predicate>,
}
#[repr(C)]
@@ -110,6 +118,53 @@ pub struct paimon_record_batch_reader {
pub inner: *mut c_void,
}
+/// Opaque wrapper around a Predicate.
+#[repr(C)]
+pub struct paimon_predicate {
+ pub inner: *mut c_void,
+}
+
+/// A typed literal value for predicate comparison, passed across FFI.
+///
+/// # Design
+///
+/// We use a tagged flat struct instead of opaque heap-allocated handles
+/// (like DuckDB's `duckdb_value`). The trade-off:
+///
+/// - **Pro**: Zero allocation — the entire datum is passed by value on the
+/// stack, with no heap round-trips or free calls needed. This keeps the
+/// FFI surface minimal and the Go/C caller simple.
+/// - **Con**: The struct is larger than any single variant needs, wasting
+/// some bytes per datum (currently ~56 bytes vs. ~16 for the largest
+/// single variant).
+///
+/// Since datums are only used for predicate construction (not a hot path),
+/// the extra size is acceptable.
+///
+/// # Tags
+///
+/// - 0: Bool, 1: TinyInt, 2: SmallInt, 3: Int, 4: Long
+/// - 5: Float, 6: Double, 7: String, 8: Date, 9: Time
+/// - 10: Timestamp, 11: LocalZonedTimestamp, 12: Decimal, 13: Bytes
+///
+/// `tag` determines which value fields are valid:
+/// - `Bool`/`TinyInt`/`SmallInt`/`Int`/`Long`/`Date`/`Time` → `int_val`
+/// - `Float`/`Double` → `double_val`
+/// - `String`/`Bytes` → `str_data` + `str_len`
+/// - `Timestamp`/`LocalZonedTimestamp` → `int_val` (millis) + `int_val2`
(nanos)
+/// - `Decimal` → `int_val` + `int_val2` (unscaled i128) + `uint_val`
(precision) + `uint_val2` (scale)
+#[repr(C)]
+pub struct paimon_datum {
+ pub tag: i32,
+ pub int_val: i64,
+ pub double_val: f64,
+ pub str_data: *const u8,
+ pub str_len: usize,
+ pub int_val2: i64,
+ pub uint_val: u32,
+ pub uint_val2: u32,
+}
+
/// A single Arrow record batch exported via the Arrow C Data Interface.
///
/// `array` and `schema` point to heap-allocated ArrowArray and ArrowSchema
diff --git a/bindings/go/predicate.go b/bindings/go/predicate.go
new file mode 100644
index 0000000..d39b684
--- /dev/null
+++ b/bindings/go/predicate.go
@@ -0,0 +1,595 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package paimon
+
+import (
+ "context"
+ "fmt"
+ "math"
+ "runtime"
+ "sync"
+ "unsafe"
+
+ "github.com/jupiterrider/ffi"
+)
+
+// Datum type tags (must match paimon-c datum_from_c).
+const (
+ datumTagBool int32 = 0
+ datumTagTinyInt int32 = 1
+ datumTagSmallInt int32 = 2
+ datumTagInt int32 = 3
+ datumTagLong int32 = 4
+ datumTagFloat int32 = 5
+ datumTagDouble int32 = 6
+ datumTagString int32 = 7
+ datumTagDate int32 = 8
+ datumTagTime int32 = 9
+ datumTagTimestamp int32 = 10
+ datumTagLocalZonedTimestamp int32 = 11
+ datumTagDecimal int32 = 12
+ datumTagBytes int32 = 13
+)
+
+// Datum is a typed literal value for predicate comparison.
+// The internal representation is hidden to allow future changes
+// (e.g. switching to opaque handles) without breaking callers.
+type Datum struct {
+ inner paimonDatumC
+}
+
+// BoolDatum creates a boolean datum.
+func BoolDatum(v bool) Datum {
+ var iv int64
+ if v {
+ iv = 1
+ }
+ return Datum{inner: paimonDatumC{tag: datumTagBool, intVal: iv}}
+}
+
+// TinyIntDatum creates a tinyint datum.
+func TinyIntDatum(v int8) Datum {
+ return Datum{inner: paimonDatumC{tag: datumTagTinyInt, intVal:
int64(v)}}
+}
+
+// SmallIntDatum creates a smallint datum.
+func SmallIntDatum(v int16) Datum {
+ return Datum{inner: paimonDatumC{tag: datumTagSmallInt, intVal:
int64(v)}}
+}
+
+// IntDatum creates an int datum.
+func IntDatum(v int32) Datum {
+ return Datum{inner: paimonDatumC{tag: datumTagInt, intVal: int64(v)}}
+}
+
+// LongDatum creates a long (bigint) datum.
+func LongDatum(v int64) Datum {
+ return Datum{inner: paimonDatumC{tag: datumTagLong, intVal: v}}
+}
+
+// FloatDatum creates a float datum.
+func FloatDatum(v float32) Datum {
+ return Datum{inner: paimonDatumC{tag: datumTagFloat, dblVal:
float64(v)}}
+}
+
+// DoubleDatum creates a double datum.
+func DoubleDatum(v float64) Datum {
+ return Datum{inner: paimonDatumC{tag: datumTagDouble, dblVal: v}}
+}
+
+// StringDatum creates a string datum.
+func StringDatum(v string) Datum {
+ b := []byte(v)
+ d := paimonDatumC{tag: datumTagString, strLen: uintptr(len(b))}
+ if len(b) > 0 {
+ d.strData = &b[0]
+ }
+ return Datum{inner: d}
+}
+
+// Date represents a date value as epoch days since 1970-01-01.
+// Usage: table.PredicateEqual("dt", paimon.Date(19000))
+type Date int32
+
+// Time represents a time-of-day value as milliseconds since midnight.
+// Usage: table.PredicateEqual("t", paimon.Time(3600000))
+type Time int32
+
+// Timestamp represents a timestamp without timezone (millis + sub-millis
nanos).
+// Usage: table.PredicateEqual("ts", paimon.Timestamp{Millis: 1700000000000,
Nanos: 0})
+type Timestamp struct {
+ Millis int64
+ Nanos int32
+}
+
+// LocalZonedTimestamp represents a timestamp with local timezone semantics.
+// Usage: table.PredicateEqual("lzts", paimon.LocalZonedTimestamp{Millis:
1700000000000, Nanos: 0})
+type LocalZonedTimestamp struct {
+ Millis int64
+ Nanos int32
+}
+
+// Decimal represents a fixed-precision decimal value up to DECIMAL(38, s).
+//
+// The unscaled value is stored as a little-endian i128 split into two int64
+// halves: Lo (low 64 bits, unsigned interpretation) and Hi (high 64 bits,
+// sign-extended). For values that fit in int64, use [NewDecimal].
+//
+// Usage:
+//
+// paimon.NewDecimal(12345, 10, 2) // 123.45 as DECIMAL(10,2)
+// paimon.Decimal{Lo: lo, Hi: hi, ...} // full i128
+type Decimal struct {
+ Lo int64 // low 64 bits of unscaled i128 (unsigned
interpretation)
+ Hi int64 // high 64 bits of unscaled i128 (sign extension)
+ Precision uint32
+ Scale uint32
+}
+
+// NewDecimal creates a Decimal from an int64 unscaled value.
+// For unscaled values that exceed int64 range, construct Decimal directly
+// with Lo/Hi fields.
+func NewDecimal(unscaled int64, precision, scale uint32) Decimal {
+ hi := int64(0)
+ if unscaled < 0 {
+ hi = -1
+ }
+ return Decimal{Lo: unscaled, Hi: hi, Precision: precision, Scale: scale}
+}
+
+// Bytes represents a binary value.
+// Usage: table.PredicateEqual("data", paimon.Bytes(someSlice))
+type Bytes []byte
+
+// toDatum converts a Go value to a Datum for predicate comparison.
+//
+// Supported Go types and their Paimon mappings:
+// - bool → Bool
+// - int8 → TinyInt
+// - int16 → SmallInt
+// - int32 → Int
+// - int → Int (if fits int32) or Long
+// - int64 → Long
+// - float32 → Float
+// - float64 → Double
+// - string → String
+// - Date → Date
+// - Time → Time
+// - Timestamp → Timestamp
+// - LocalZonedTimestamp → LocalZonedTimestamp
+// - Decimal → Decimal
+// - Bytes → Bytes
+// - Datum → passed through
+func toDatum(v any) (Datum, error) {
+ switch val := v.(type) {
+ case bool:
+ return BoolDatum(val), nil
+ case int8:
+ return TinyIntDatum(val), nil
+ case int16:
+ return SmallIntDatum(val), nil
+ case int32:
+ return IntDatum(val), nil
+ case int:
+ if val >= math.MinInt32 && val <= math.MaxInt32 {
+ return IntDatum(int32(val)), nil
+ }
+ return LongDatum(int64(val)), nil
+ case int64:
+ return LongDatum(val), nil
+ case float32:
+ return FloatDatum(val), nil
+ case float64:
+ return DoubleDatum(val), nil
+ case string:
+ return StringDatum(val), nil
+ case Date:
+ return Datum{inner: paimonDatumC{tag: datumTagDate, intVal:
int64(val)}}, nil
+ case Time:
+ return Datum{inner: paimonDatumC{tag: datumTagTime, intVal:
int64(val)}}, nil
+ case Timestamp:
+ return Datum{inner: paimonDatumC{tag: datumTagTimestamp,
intVal: val.Millis, intVal2: int64(val.Nanos)}}, nil
+ case LocalZonedTimestamp:
+ return Datum{inner: paimonDatumC{tag:
datumTagLocalZonedTimestamp, intVal: val.Millis, intVal2: int64(val.Nanos)}},
nil
+ case Decimal:
+ return Datum{inner: paimonDatumC{
+ tag: datumTagDecimal, intVal: val.Lo, intVal2: val.Hi,
+ uintVal: val.Precision, uintVal2: val.Scale,
+ }}, nil
+ case Bytes:
+ d := paimonDatumC{tag: datumTagBytes, strLen: uintptr(len(val))}
+ if len(val) > 0 {
+ d.strData = &val[0]
+ }
+ return Datum{inner: d}, nil
+ case Datum:
+ return val, nil
+ default:
+ return Datum{}, fmt.Errorf("unsupported datum type: %T", v)
+ }
+}
+
+// Predicate is an opaque filter predicate for scan planning.
+type Predicate struct {
+ ctx context.Context
+ lib *libRef
+ inner *paimonPredicate
+ closeOnce sync.Once
+}
+
+// Close releases the predicate resources. Safe to call multiple times.
+// Note: predicates passed to WithFilter or combinators (And/Or/Not) are
consumed
+// and should NOT be closed by the caller.
+func (p *Predicate) Close() {
+ p.closeOnce.Do(func() {
+ if p.inner != nil {
+ ffiPredicateFree.symbol(p.ctx)(p.inner)
+ p.inner = nil
+ p.lib.release()
+ }
+ })
+}
+
+// errConsumedPredicate is returned when a consumed or nil predicate is reused.
+var errConsumedPredicate = fmt.Errorf("paimon: predicate already consumed or
nil")
+
+// PredicateBuilder creates filter predicates for a table.
+// It holds a Go-level reference to the Table and does not own any C resources,
+// so there is no Close() method.
+type PredicateBuilder struct {
+ table *Table
+}
+
+// Eq creates an equality predicate: column = value.
+func (pb *PredicateBuilder) Eq(column string, value any) (*Predicate, error) {
+ datum, err := toDatum(value)
+ if err != nil {
+ return nil, err
+ }
+ return pb.buildLeafPredicate(ffiPredicateEqual, column, datum)
+}
+
+// NotEq creates a not-equal predicate: column != value.
+func (pb *PredicateBuilder) NotEq(column string, value any) (*Predicate,
error) {
+ datum, err := toDatum(value)
+ if err != nil {
+ return nil, err
+ }
+ return pb.buildLeafPredicate(ffiPredicateNotEqual, column, datum)
+}
+
+// Lt creates a less-than predicate: column < value.
+func (pb *PredicateBuilder) Lt(column string, value any) (*Predicate, error) {
+ datum, err := toDatum(value)
+ if err != nil {
+ return nil, err
+ }
+ return pb.buildLeafPredicate(ffiPredicateLessThan, column, datum)
+}
+
+// Le creates a less-or-equal predicate: column <= value.
+func (pb *PredicateBuilder) Le(column string, value any) (*Predicate, error) {
+ datum, err := toDatum(value)
+ if err != nil {
+ return nil, err
+ }
+ return pb.buildLeafPredicate(ffiPredicateLessOrEqual, column, datum)
+}
+
+// Gt creates a greater-than predicate: column > value.
+func (pb *PredicateBuilder) Gt(column string, value any) (*Predicate, error) {
+ datum, err := toDatum(value)
+ if err != nil {
+ return nil, err
+ }
+ return pb.buildLeafPredicate(ffiPredicateGreaterThan, column, datum)
+}
+
+// Ge creates a greater-or-equal predicate: column >= value.
+func (pb *PredicateBuilder) Ge(column string, value any) (*Predicate, error) {
+ datum, err := toDatum(value)
+ if err != nil {
+ return nil, err
+ }
+ return pb.buildLeafPredicate(ffiPredicateGreaterOrEqual, column, datum)
+}
+
+// IsNull creates an IS NULL predicate.
+func (pb *PredicateBuilder) IsNull(column string) (*Predicate, error) {
+ return pb.buildNullPredicate(ffiPredicateIsNull, column)
+}
+
+// IsNotNull creates an IS NOT NULL predicate.
+func (pb *PredicateBuilder) IsNotNull(column string) (*Predicate, error) {
+ return pb.buildNullPredicate(ffiPredicateIsNotNull, column)
+}
+
+// In creates an IN predicate: column IN (values...).
+func (pb *PredicateBuilder) In(column string, values ...any) (*Predicate,
error) {
+ return pb.buildInPredicate(ffiPredicateIsIn, column, values)
+}
+
+// NotIn creates a NOT IN predicate: column NOT IN (values...).
+func (pb *PredicateBuilder) NotIn(column string, values ...any) (*Predicate,
error) {
+ return pb.buildInPredicate(ffiPredicateIsNotIn, column, values)
+}
+
+// buildLeafPredicate is a helper for comparison predicates that take (table,
column, datum).
+func (pb *PredicateBuilder) buildLeafPredicate(
+ ffiVar *FFI[func(*paimonTable, *byte, paimonDatumC) (*paimonPredicate,
error)],
+ column string, datum Datum,
+) (*Predicate, error) {
+ t := pb.table
+ if t.inner == nil {
+ return nil, ErrClosed
+ }
+ createFn := ffiVar.symbol(t.ctx)
+ cCol := append([]byte(column), 0)
+ inner, err := createFn(t.inner, &cCol[0], datum.inner)
+ runtime.KeepAlive(cCol)
+ runtime.KeepAlive(datum)
+ if err != nil {
+ return nil, err
+ }
+ t.lib.acquire()
+ return &Predicate{ctx: t.ctx, lib: t.lib, inner: inner}, nil
+}
+
+// buildNullPredicate is a helper for IS NULL / IS NOT NULL predicates.
+func (pb *PredicateBuilder) buildNullPredicate(
+ ffiVar *FFI[func(*paimonTable, *byte) (*paimonPredicate, error)],
+ column string,
+) (*Predicate, error) {
+ t := pb.table
+ if t.inner == nil {
+ return nil, ErrClosed
+ }
+ createFn := ffiVar.symbol(t.ctx)
+ cCol := append([]byte(column), 0)
+ inner, err := createFn(t.inner, &cCol[0])
+ runtime.KeepAlive(cCol)
+ if err != nil {
+ return nil, err
+ }
+ t.lib.acquire()
+ return &Predicate{ctx: t.ctx, lib: t.lib, inner: inner}, nil
+}
+
+// buildInPredicate is a helper for IS IN / IS NOT IN predicates.
+func (pb *PredicateBuilder) buildInPredicate(
+ ffiVar *FFI[func(*paimonTable, *byte, unsafe.Pointer, uintptr)
(*paimonPredicate, error)],
+ column string, values []any,
+) (*Predicate, error) {
+ t := pb.table
+ if t.inner == nil {
+ return nil, ErrClosed
+ }
+ datums := make([]paimonDatumC, len(values))
+ for i, v := range values {
+ d, err := toDatum(v)
+ if err != nil {
+ return nil, err
+ }
+ datums[i] = d.inner
+ }
+ createFn := ffiVar.symbol(t.ctx)
+ cCol := append([]byte(column), 0)
+ var datumsPtr unsafe.Pointer
+ if len(datums) > 0 {
+ datumsPtr = unsafe.Pointer(&datums[0])
+ }
+ inner, err := createFn(t.inner, &cCol[0], datumsPtr,
uintptr(len(datums)))
+ runtime.KeepAlive(cCol)
+ runtime.KeepAlive(datums)
+ runtime.KeepAlive(values)
+ if err != nil {
+ return nil, err
+ }
+ t.lib.acquire()
+ return &Predicate{ctx: t.ctx, lib: t.lib, inner: inner}, nil
+}
+
+// combinePredicate is a shared helper for And/Or.
+func (p *Predicate) combinePredicate(
+ other *Predicate,
+ ffiVar *FFI[func(*paimonPredicate, *paimonPredicate) *paimonPredicate],
+) (*Predicate, error) {
+ if p == nil || p.inner == nil {
+ return nil, errConsumedPredicate
+ }
+ if other == nil || other.inner == nil {
+ return nil, errConsumedPredicate
+ }
+ if p == other {
+ return nil, fmt.Errorf("paimon: cannot combine a predicate with
itself")
+ }
+ combineFn := ffiVar.symbol(p.ctx)
+ p.inner = combineFn(p.inner, other.inner)
+ other.inner = nil
+ other.lib.release()
+ return p, nil
+}
+
+// And combines this predicate with another using AND. Consumes both predicates
+// (callers must NOT close either after this call).
+func (p *Predicate) And(other *Predicate) (*Predicate, error) {
+ return p.combinePredicate(other, ffiPredicateAnd)
+}
+
+// Or combines this predicate with another using OR. Consumes both predicates
+// (callers must NOT close either after this call).
+func (p *Predicate) Or(other *Predicate) (*Predicate, error) {
+ return p.combinePredicate(other, ffiPredicateOr)
+}
+
+// Not negates this predicate. Consumes the input
+// (caller must NOT close it after this call).
+func (p *Predicate) Not() (*Predicate, error) {
+ if p == nil || p.inner == nil {
+ return nil, errConsumedPredicate
+ }
+ negateFn := ffiPredicateNot.symbol(p.ctx)
+ p.inner = negateFn(p.inner)
+ return p, nil
+}
+
+// FFI wrappers for predicate functions.
+
+var ffiPredicateFree = newFFI(ffiOpts{
+ sym: "paimon_predicate_free",
+ rType: &ffi.TypeVoid,
+ aTypes: []*ffi.Type{&ffi.TypePointer},
+}, func(_ context.Context, ffiCall ffiCall) func(p *paimonPredicate) {
+ return func(p *paimonPredicate) {
+ ffiCall(nil, unsafe.Pointer(&p))
+ }
+})
+
+var ffiPredicateEqual = newPredicateLeafFFI("paimon_predicate_equal")
+var ffiPredicateNotEqual = newPredicateLeafFFI("paimon_predicate_not_equal")
+var ffiPredicateLessThan = newPredicateLeafFFI("paimon_predicate_less_than")
+var ffiPredicateLessOrEqual =
newPredicateLeafFFI("paimon_predicate_less_or_equal")
+var ffiPredicateGreaterThan =
newPredicateLeafFFI("paimon_predicate_greater_than")
+var ffiPredicateGreaterOrEqual =
newPredicateLeafFFI("paimon_predicate_greater_or_equal")
+
+// newPredicateLeafFFI creates an FFI wrapper for comparison predicate
functions
+// with signature: (table, column, datum) -> result_predicate.
+func newPredicateLeafFFI(sym string) *FFI[func(*paimonTable, *byte,
paimonDatumC) (*paimonPredicate, error)] {
+ return newFFI(ffiOpts{
+ sym: contextKey(sym),
+ rType: &typeResultPredicate,
+ aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer,
&typePaimonDatum},
+ }, func(ctx context.Context, ffiCall ffiCall) func(*paimonTable, *byte,
paimonDatumC) (*paimonPredicate, error) {
+ return func(table *paimonTable, column *byte, datum
paimonDatumC) (*paimonPredicate, error) {
+ var result resultPredicate
+ ffiCall(
+ unsafe.Pointer(&result),
+ unsafe.Pointer(&table),
+ unsafe.Pointer(&column),
+ unsafe.Pointer(&datum),
+ )
+ if result.error != nil {
+ return nil, parseError(ctx, result.error)
+ }
+ return result.predicate, nil
+ }
+ })
+}
+
+var ffiPredicateIsNull = newPredicateNullFFI("paimon_predicate_is_null")
+var ffiPredicateIsNotNull = newPredicateNullFFI("paimon_predicate_is_not_null")
+
+// newPredicateNullFFI creates an FFI wrapper for null-check predicate
functions
+// with signature: (table, column) -> result_predicate.
+func newPredicateNullFFI(sym string) *FFI[func(*paimonTable, *byte)
(*paimonPredicate, error)] {
+ return newFFI(ffiOpts{
+ sym: contextKey(sym),
+ rType: &typeResultPredicate,
+ aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer},
+ }, func(ctx context.Context, ffiCall ffiCall) func(*paimonTable, *byte)
(*paimonPredicate, error) {
+ return func(table *paimonTable, column *byte)
(*paimonPredicate, error) {
+ var result resultPredicate
+ ffiCall(
+ unsafe.Pointer(&result),
+ unsafe.Pointer(&table),
+ unsafe.Pointer(&column),
+ )
+ if result.error != nil {
+ return nil, parseError(ctx, result.error)
+ }
+ return result.predicate, nil
+ }
+ })
+}
+
+var ffiPredicateIsIn = newPredicateInFFI("paimon_predicate_is_in")
+var ffiPredicateIsNotIn = newPredicateInFFI("paimon_predicate_is_not_in")
+
+// newPredicateInFFI creates an FFI wrapper for IN/NOT IN predicate functions
+// with signature: (table, column, datums, datums_len) -> result_predicate.
+func newPredicateInFFI(sym string) *FFI[func(*paimonTable, *byte,
unsafe.Pointer, uintptr) (*paimonPredicate, error)] {
+ return newFFI(ffiOpts{
+ sym: contextKey(sym),
+ rType: &typeResultPredicate,
+ aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer,
&ffi.TypePointer, &ffi.TypePointer},
+ }, func(ctx context.Context, ffiCall ffiCall) func(*paimonTable, *byte,
unsafe.Pointer, uintptr) (*paimonPredicate, error) {
+ return func(table *paimonTable, column *byte, datums
unsafe.Pointer, datumsLen uintptr) (*paimonPredicate, error) {
+ var result resultPredicate
+ ffiCall(
+ unsafe.Pointer(&result),
+ unsafe.Pointer(&table),
+ unsafe.Pointer(&column),
+ unsafe.Pointer(&datums),
+ unsafe.Pointer(&datumsLen),
+ )
+ if result.error != nil {
+ return nil, parseError(ctx, result.error)
+ }
+ return result.predicate, nil
+ }
+ })
+}
+
+var ffiPredicateAnd = newFFI(ffiOpts{
+ sym: "paimon_predicate_and",
+ rType: &ffi.TypePointer,
+ aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer},
+}, func(_ context.Context, ffiCall ffiCall) func(*paimonPredicate,
*paimonPredicate) *paimonPredicate {
+ return func(a, b *paimonPredicate) *paimonPredicate {
+ var result *paimonPredicate
+ ffiCall(
+ unsafe.Pointer(&result),
+ unsafe.Pointer(&a),
+ unsafe.Pointer(&b),
+ )
+ return result
+ }
+})
+
+var ffiPredicateOr = newFFI(ffiOpts{
+ sym: "paimon_predicate_or",
+ rType: &ffi.TypePointer,
+ aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer},
+}, func(_ context.Context, ffiCall ffiCall) func(*paimonPredicate,
*paimonPredicate) *paimonPredicate {
+ return func(a, b *paimonPredicate) *paimonPredicate {
+ var result *paimonPredicate
+ ffiCall(
+ unsafe.Pointer(&result),
+ unsafe.Pointer(&a),
+ unsafe.Pointer(&b),
+ )
+ return result
+ }
+})
+
+var ffiPredicateNot = newFFI(ffiOpts{
+ sym: "paimon_predicate_not",
+ rType: &ffi.TypePointer,
+ aTypes: []*ffi.Type{&ffi.TypePointer},
+}, func(_ context.Context, ffiCall ffiCall) func(*paimonPredicate)
*paimonPredicate {
+ return func(p *paimonPredicate) *paimonPredicate {
+ var result *paimonPredicate
+ ffiCall(
+ unsafe.Pointer(&result),
+ unsafe.Pointer(&p),
+ )
+ return result
+ }
+})
diff --git a/bindings/go/predicate/predicate.go
b/bindings/go/predicate/predicate.go
new file mode 100644
index 0000000..046fa78
--- /dev/null
+++ b/bindings/go/predicate/predicate.go
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// Package predicate provides convenience functions for combining Paimon
filter predicates.
+package predicate
+
+import (
+ "fmt"
+
+ paimon "github.com/apache/paimon-rust/bindings/go"
+)
+
+// Predicate is an alias for paimon.Predicate.
+type Predicate = paimon.Predicate
+
+// And combines two or more predicates with AND. Consumes all inputs
+// (callers must NOT close them after this call).
+func And(preds ...*Predicate) (*Predicate, error) {
+ if len(preds) < 2 {
+ return nil, fmt.Errorf("predicate.And requires at least 2
predicates, got %d", len(preds))
+ }
+ result := preds[0]
+ for _, p := range preds[1:] {
+ r, err := result.And(p)
+ if err != nil {
+ return nil, err
+ }
+ result = r
+ }
+ return result, nil
+}
+
+// Or combines two or more predicates with OR. Consumes all inputs
+// (callers must NOT close them after this call).
+func Or(preds ...*Predicate) (*Predicate, error) {
+ if len(preds) < 2 {
+ return nil, fmt.Errorf("predicate.Or requires at least 2
predicates, got %d", len(preds))
+ }
+ result := preds[0]
+ for _, p := range preds[1:] {
+ r, err := result.Or(p)
+ if err != nil {
+ return nil, err
+ }
+ result = r
+ }
+ return result, nil
+}
+
+// Not negates a predicate. Consumes the input
+// (caller must NOT close it after this call).
+func Not(p *Predicate) (*Predicate, error) {
+ return p.Not()
+}
diff --git a/bindings/go/read_builder.go b/bindings/go/read_builder.go
index fde6bf5..177c595 100644
--- a/bindings/go/read_builder.go
+++ b/bindings/go/read_builder.go
@@ -56,6 +56,47 @@ func (rb *ReadBuilder) WithProjection(columns []string)
error {
return projFn(rb.inner, columns)
}
+// WithFilter sets a filter predicate for scan planning and read-side pruning.
+//
+// The predicate is used in two phases:
+// - Scan planning: prunes partitions, buckets, and data files based on
+// file-level statistics (min/max). This is conservative — files whose
+// statistics are inconclusive are kept.
+// - Read-side: applies row-level filtering via Parquet native row filters
+// for supported leaf predicates (Eq, NotEq, Lt, Le, Gt, Ge, IsNull,
+// IsNotNull, In, NotIn).
+//
+// Row-level filtering is exact for most common types (Bool, Int, Long, Float,
+// Double, String, Date, Decimal, Binary). However, the following cases are NOT
+// filtered at the row level and may return non-matching rows:
+// - Compound predicates (And/Or/Not) — not yet implemented for row-level
filtering.
+// - Time, Timestamp, and LocalZonedTimestamp columns (not yet implemented).
+// - Schema-evolution: the predicate column does not exist in older data
files.
+// - Data-evolution mode (data-evolution.enabled = true).
+//
+// In these cases callers should apply residual filtering on the returned
records.
+//
+// The predicate is consumed (ownership transferred to the read builder);
+// the caller must NOT close it after this call.
+// Passing nil is a no-op.
+func (rb *ReadBuilder) WithFilter(p *Predicate) error {
+ if rb.inner == nil {
+ return ErrClosed
+ }
+ if p == nil {
+ return nil
+ }
+ if p.inner == nil {
+ return errConsumedPredicate
+ }
+ filterFn := ffiReadBuilderWithFilter.symbol(rb.ctx)
+ err := filterFn(rb.inner, p.inner)
+ // Ownership transferred; prevent double-free.
+ p.inner = nil
+ p.lib.release()
+ return err
+}
+
// NewScan creates a TableScan for planning which data files to read.
func (rb *ReadBuilder) NewScan() (*TableScan, error) {
if rb.inner == nil {
@@ -136,6 +177,25 @@ var ffiReadBuilderWithProjection = newFFI(ffiOpts{
}
})
+var ffiReadBuilderWithFilter = newFFI(ffiOpts{
+ sym: "paimon_read_builder_with_filter",
+ rType: &ffi.TypePointer,
+ aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer},
+}, func(ctx context.Context, ffiCall ffiCall) func(rb *paimonReadBuilder, p
*paimonPredicate) error {
+ return func(rb *paimonReadBuilder, p *paimonPredicate) error {
+ var errPtr *paimonError
+ ffiCall(
+ unsafe.Pointer(&errPtr),
+ unsafe.Pointer(&rb),
+ unsafe.Pointer(&p),
+ )
+ if errPtr != nil {
+ return parseError(ctx, errPtr)
+ }
+ return nil
+ }
+})
+
var ffiReadBuilderNewScan = newFFI(ffiOpts{
sym: "paimon_read_builder_new_scan",
rType: &typeResultTableScan,
diff --git a/bindings/go/table.go b/bindings/go/table.go
index 17ff7af..e6008a2 100644
--- a/bindings/go/table.go
+++ b/bindings/go/table.go
@@ -44,6 +44,11 @@ func (t *Table) Close() {
})
}
+// PredicateBuilder returns a builder for creating filter predicates on this
table.
+func (t *Table) PredicateBuilder() *PredicateBuilder {
+ return &PredicateBuilder{table: t}
+}
+
// NewReadBuilder creates a ReadBuilder for this table.
func (t *Table) NewReadBuilder() (*ReadBuilder, error) {
if t.inner == nil {
diff --git a/bindings/go/tests/paimon_test.go b/bindings/go/tests/paimon_test.go
index e83f151..e9ee196 100644
--- a/bindings/go/tests/paimon_test.go
+++ b/bindings/go/tests/paimon_test.go
@@ -30,41 +30,14 @@ import (
paimon "github.com/apache/paimon-rust/bindings/go"
)
-// TestReadLogTable reads the test table and verifies the data matches
expected values.
-//
-// The table was populated by Docker provisioning with:
-//
-// (1, 'alice'), (2, 'bob'), (3, 'carol')
-func TestReadLogTable(t *testing.T) {
- warehouse := os.Getenv("PAIMON_TEST_WAREHOUSE")
- if warehouse == "" {
- warehouse = "/tmp/paimon-warehouse"
- }
-
- if _, err := os.Stat(warehouse); os.IsNotExist(err) {
- t.Skipf("Skipping: warehouse %s does not exist (run 'make
docker-up' first)", warehouse)
- }
-
- // Use NewCatalog with options
- catalog, err := paimon.NewCatalog(map[string]string{
- "warehouse": warehouse,
- })
- if err != nil {
- t.Fatalf("Failed to create catalog: %v", err)
- }
- defer catalog.Close()
+type row struct {
+ id int32
+ name string
+}
- table, err := catalog.GetTable(paimon.NewIdentifier("default",
"simple_log_table"))
- if err != nil {
- t.Fatalf("Failed to get table: %v", err)
- }
- defer table.Close()
-
- rb, err := table.NewReadBuilder()
- if err != nil {
- t.Fatalf("Failed to create read builder: %v", err)
- }
- defer rb.Close()
+// readRows scans and reads all (id, name) rows from a ReadBuilder.
+func readRows(t *testing.T, rb *paimon.ReadBuilder) []row {
+ t.Helper()
scan, err := rb.NewScan()
if err != nil {
@@ -80,7 +53,7 @@ func TestReadLogTable(t *testing.T) {
splits := plan.Splits()
if len(splits) == 0 {
- t.Fatal("Expected at least one split")
+ return nil
}
read, err := rb.NewRead()
@@ -95,13 +68,6 @@ func TestReadLogTable(t *testing.T) {
}
defer reader.Close()
- // Import Arrow batches via C Data Interface and collect rows.
- // Strings are copied before Release because arrow-go's String.Value()
- // returns zero-copy references into the Arrow buffer.
- type row struct {
- id int32
- name string
- }
var rows []row
batchIdx := 0
for {
@@ -132,58 +98,130 @@ func TestReadLogTable(t *testing.T) {
record.Release()
batchIdx++
}
-
- if len(rows) == 0 {
- t.Fatal("Expected at least one row, got 0")
- }
-
- sort.Slice(rows, func(i, j int) bool {
- return rows[i].id < rows[j].id
- })
-
- expected := []row{
- {1, "alice"},
- {2, "bob"},
- {3, "carol"},
- }
-
- if len(rows) != len(expected) {
- t.Fatalf("Expected %d rows, got %d: %v", len(expected),
len(rows), rows)
- }
-
- for i, exp := range expected {
- if rows[i] != exp {
- t.Errorf("Row %d: expected %v, got %v", i, exp, rows[i])
- }
- }
+ return rows
}
-// TestReadWithProjection reads only the "id" column via WithProjection and
-// verifies that only the projected column is returned with correct values.
-func TestReadWithProjection(t *testing.T) {
+// openTestTable creates a catalog, opens the simple_log_table, and returns
+// the table along with a cleanup function. Skips the test if the warehouse
+// does not exist.
+func openTestTable(t *testing.T) *paimon.Table {
+ t.Helper()
+
warehouse := os.Getenv("PAIMON_TEST_WAREHOUSE")
if warehouse == "" {
warehouse = "/tmp/paimon-warehouse"
}
-
if _, err := os.Stat(warehouse); os.IsNotExist(err) {
t.Skipf("Skipping: warehouse %s does not exist (run 'make
docker-up' first)", warehouse)
}
- // Use NewCatalog with options
catalog, err := paimon.NewCatalog(map[string]string{
"warehouse": warehouse,
})
if err != nil {
t.Fatalf("Failed to create catalog: %v", err)
}
- defer catalog.Close()
+ t.Cleanup(func() { catalog.Close() })
table, err := catalog.GetTable(paimon.NewIdentifier("default",
"simple_log_table"))
if err != nil {
t.Fatalf("Failed to get table: %v", err)
}
- defer table.Close()
+ t.Cleanup(func() { table.Close() })
+
+ return table
+}
+
+// TestReadLogTable reads the test table and verifies the data matches
expected values.
+//
+// The table was populated by Docker provisioning with:
+//
+// (1, 'alice'), (2, 'bob'), (3, 'carol')
+func TestReadLogTable(t *testing.T) {
+ table := openTestTable(t)
+
+ rb, err := table.NewReadBuilder()
+ if err != nil {
+ t.Fatalf("Failed to create read builder: %v", err)
+ }
+ defer rb.Close()
+
+ rows := readRows(t, rb)
+ if len(rows) == 0 {
+ t.Fatal("Expected at least one row, got 0")
+ }
+
+ sort.Slice(rows, func(i, j int) bool { return rows[i].id < rows[j].id })
+
+ expected := []row{{1, "alice"}, {2, "bob"}, {3, "carol"}}
+ if len(rows) != len(expected) {
+ t.Fatalf("Expected %d rows, got %d: %v", len(expected),
len(rows), rows)
+ }
+ for i, exp := range expected {
+ if rows[i] != exp {
+ t.Errorf("Row %d: expected %v, got %v", i, exp, rows[i])
+ }
+ }
+}
+
+// TestReadWithFilter exercises filter push-down through several sub-tests.
+func TestReadWithFilter(t *testing.T) {
+ table := openTestTable(t)
+
+ t.Run("EqualById", func(t *testing.T) {
+ rb, err := table.NewReadBuilder()
+ if err != nil {
+ t.Fatalf("Failed to create read builder: %v", err)
+ }
+ defer rb.Close()
+
+ // id = 1
+ pb := table.PredicateBuilder()
+ pred, err := pb.Eq("id", 1)
+ if err != nil {
+ t.Fatalf("Failed to create predicate: %v", err)
+ }
+ if err := rb.WithFilter(pred); err != nil {
+ t.Fatalf("Failed to set filter: %v", err)
+ }
+
+ rows := readRows(t, rb)
+ expected := []row{{1, "alice"}}
+ if len(rows) != len(expected) {
+ t.Fatalf("Expected %d rows, got %d: %v", len(expected),
len(rows), rows)
+ }
+ if rows[0] != expected[0] {
+ t.Errorf("Expected %v, got %v", expected[0], rows[0])
+ }
+ })
+
+ t.Run("EmptyStringEqual", func(t *testing.T) {
+ rb, err := table.NewReadBuilder()
+ if err != nil {
+ t.Fatalf("Failed to create read builder: %v", err)
+ }
+ defer rb.Close()
+
+ pb := table.PredicateBuilder()
+ pred, err := pb.Eq("name", "")
+ if err != nil {
+ t.Fatalf("Eq with empty string failed: %v", err)
+ }
+ if err := rb.WithFilter(pred); err != nil {
+ t.Fatalf("WithFilter failed: %v", err)
+ }
+
+ rows := readRows(t, rb)
+ if len(rows) != 0 {
+ t.Fatalf("Expected 0 rows for empty string filter, got
%d: %v", len(rows), rows)
+ }
+ })
+}
+
+// TestReadWithProjection reads only the "id" column via WithProjection and
+// verifies that only the projected column is returned with correct values.
+func TestReadWithProjection(t *testing.T) {
+ table := openTestTable(t)
rb, err := table.NewReadBuilder()
if err != nil {
@@ -191,7 +229,6 @@ func TestReadWithProjection(t *testing.T) {
}
defer rb.Close()
- // Set projection to only read "id" column
if err := rb.WithProjection([]string{"id"}); err != nil {
t.Fatalf("Failed to set projection: %v", err)
}
@@ -236,7 +273,6 @@ func TestReadWithProjection(t *testing.T) {
t.Fatalf("Batch %d: failed to read next record: %v",
batchIdx, err)
}
- // Verify schema only contains the projected column
schema := record.Schema()
if schema.NumFields() != 1 {
record.Release()
diff --git a/bindings/go/types.go b/bindings/go/types.go
index fdc8990..7d943cb 100644
--- a/bindings/go/types.go
+++ b/bindings/go/types.go
@@ -120,6 +120,34 @@ var (
}[0],
}
+ // paimon_result_predicate { predicate: *paimon_predicate, error:
*paimon_error }
+ typeResultPredicate = ffi.Type{
+ Type: ffi.Struct,
+ Elements: &[]*ffi.Type{
+ &ffi.TypePointer,
+ &ffi.TypePointer,
+ nil,
+ }[0],
+ }
+
+ // paimon_datum { tag: i32, int_val: i64, double_val: f64, str_data:
*u8, str_len: usize,
+ // int_val2: i64, uint_val: u32, uint_val2: u32 }
+ typePaimonDatum = ffi.Type{
+ Type: ffi.Struct,
+ Elements: &[]*ffi.Type{
+ &ffi.TypeSint32, // tag
+ &ffi.TypeSint32, // padding
+ &ffi.TypeSint64, // int_val
+ &ffi.TypeDouble, // double_val
+ &ffi.TypePointer, // str_data
+ &ffi.TypePointer, // str_len (usize)
+ &ffi.TypeSint64, // int_val2
+ &ffi.TypeUint32, // uint_val
+ &ffi.TypeUint32, // uint_val2
+ nil,
+ }[0],
+ }
+
// paimon_result_next_batch { batch: paimon_arrow_batch, error:
*paimon_error }
typeResultNextBatch = ffi.Type{
Type: ffi.Struct,
@@ -153,6 +181,7 @@ type paimonTableScan struct{}
type paimonTableRead struct{}
type paimonPlan struct{}
type paimonRecordBatchReader struct{}
+type paimonPredicate struct{}
// Result types matching the C repr structs
type resultCatalogNew struct {
@@ -195,6 +224,24 @@ type resultRecordBatchReader struct {
error *paimonError
}
+type resultPredicate struct {
+ predicate *paimonPredicate
+ error *paimonError
+}
+
+// paimonDatumC mirrors the C paimon_datum struct.
+type paimonDatumC struct {
+ tag int32
+ _pad0 [4]byte // padding for alignment
+ intVal int64
+ dblVal float64
+ strData *byte
+ strLen uintptr
+ intVal2 int64
+ uintVal uint32
+ uintVal2 uint32
+}
+
// arrowBatch holds a single Arrow record batch via the Arrow C Data Interface.
type arrowBatch struct {
ctx context.Context
diff --git a/crates/paimon/src/table/read_builder.rs
b/crates/paimon/src/table/read_builder.rs
index c9be32c..c8c3d86 100644
--- a/crates/paimon/src/table/read_builder.rs
+++ b/crates/paimon/src/table/read_builder.rs
@@ -272,6 +272,11 @@ impl<'a> TableRead<'a> {
&self.read_type
}
+ /// Data predicates for read-side pruning.
+ pub fn data_predicates(&self) -> &[Predicate] {
+ &self.data_predicates
+ }
+
/// Table for this read.
pub fn table(&self) -> &Table {
self.table
diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml
index 37d95e2..2b756e2 100644
--- a/docs/mkdocs.yml
+++ b/docs/mkdocs.yml
@@ -49,6 +49,7 @@ nav:
- Home: index.md
- Getting Started: getting-started.md
- DataFusion Integration: datafusion.md
+ - Go Integration: go-binding.md
- Architecture: architecture.md
- Releases: releases.md
- Contributing: contributing.md
diff --git a/docs/src/go-binding.md b/docs/src/go-binding.md
new file mode 100644
index 0000000..4157111
--- /dev/null
+++ b/docs/src/go-binding.md
@@ -0,0 +1,397 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Go Integration
+
+The Go integration is a binding built on top of Apache Paimon Rust, allowing
you to access Paimon tables from Go programs. It uses the [Arrow C Data
Interface](https://arrow.apache.org/docs/format/CDataInterface.html) for
zero-copy data transfer.
+
+## Prerequisites
+
+- Go 1.22.4 or later
+- Supported platforms: Linux (amd64, arm64), macOS (amd64, arm64)
+
+## Installation
+
+```bash
+go get github.com/apache/paimon-rust/bindings/go
+```
+
+The pre-built native library is embedded in the package and automatically
loaded at runtime — no manual build step is needed.
+
+## Creating a Catalog
+
+Use `NewCatalog` with a map of options to create a catalog. The catalog type
is determined by the `metastore` option (default: `filesystem`).
+
+```go
+import paimon "github.com/apache/paimon-rust/bindings/go"
+
+// Local filesystem
+catalog, err := paimon.NewCatalog(map[string]string{
+ "warehouse": "/path/to/warehouse",
+})
+if err != nil {
+ log.Fatal(err)
+}
+defer catalog.Close()
+```
+
+### Alibaba Cloud OSS
+
+```go
+catalog, err := paimon.NewCatalog(map[string]string{
+ "warehouse": "oss://bucket/warehouse",
+ "fs.oss.accessKeyId": "your-access-key-id",
+ "fs.oss.accessKeySecret": "your-access-key-secret",
+ "fs.oss.endpoint": "oss-cn-hangzhou.aliyuncs.com",
+})
+```
+
+### REST Catalog
+
+```go
+catalog, err := paimon.NewCatalog(map[string]string{
+ "metastore": "rest",
+ "uri": "http://localhost:8080",
+ "warehouse": "my_warehouse",
+})
+```
+
+## Reading a Table
+
+Paimon Go uses a **scan-then-read** pattern: first scan the table to produce
splits, then read data from those splits as Arrow RecordBatches.
+
+```go
+import (
+ "errors"
+ "fmt"
+ "io"
+
+ "github.com/apache/arrow-go/v18/arrow/array"
+ paimon "github.com/apache/paimon-rust/bindings/go"
+)
+
+// Get a table from the catalog
+table, err := catalog.GetTable(paimon.NewIdentifier("default", "my_table"))
+if err != nil {
+ log.Fatal(err)
+}
+defer table.Close()
+
+// Create a read builder
+rb, err := table.NewReadBuilder()
+if err != nil {
+ log.Fatal(err)
+}
+defer rb.Close()
+
+// Step 1: Scan — produces a Plan containing DataSplits
+scan, err := rb.NewScan()
+if err != nil {
+ log.Fatal(err)
+}
+defer scan.Close()
+
+plan, err := scan.Plan()
+if err != nil {
+ log.Fatal(err)
+}
+defer plan.Close()
+
+splits := plan.Splits()
+
+// Step 2: Read — consumes splits and returns Arrow RecordBatches
+read, err := rb.NewRead()
+if err != nil {
+ log.Fatal(err)
+}
+defer read.Close()
+
+reader, err := read.NewRecordBatchReader(splits)
+if err != nil {
+ log.Fatal(err)
+}
+defer reader.Close()
+
+for {
+ record, err := reader.NextRecord()
+ if errors.Is(err, io.EOF) {
+ break
+ }
+ if err != nil {
+ log.Fatal(err)
+ }
+ fmt.Println(record)
+ record.Release()
+}
+```
+
+## Column Projection
+
+Use `WithProjection` to select specific columns. Only the requested columns
are read, reducing I/O.
+
+```go
+rb, err := table.NewReadBuilder()
+if err != nil {
+ log.Fatal(err)
+}
+defer rb.Close()
+
+// Only read the "id" and "name" columns
+if err := rb.WithProjection([]string{"id", "name"}); err != nil {
+ log.Fatal(err)
+}
+
+// Continue with scan-then-read as above...
+```
+
+## Filter Push-Down
+
+Filter push-down prunes data at two levels:
+
+1. **Scan planning** — skips partitions, buckets, and data files based on
file-level statistics (min/max).
+2. **Read-side** — applies row-level filtering via Parquet native row filters
for leaf predicates.
+
+!!! warning
+ Filter push-down is a **best-effort** optimization. The returned results
may still contain rows that do not satisfy the filter condition. Callers should
always apply residual filtering on the returned records to ensure correctness.
+
+### Building Predicates
+
+Create predicates through the `PredicateBuilder` obtained from a table:
+
+```go
+pb := table.PredicateBuilder()
+
+// Comparison predicates
+pred, err := pb.Eq("id", 1) // id = 1
+pred, err := pb.NotEq("name", "bob") // name != "bob"
+pred, err := pb.Lt("id", 3) // id < 3
+pred, err := pb.Le("id", 2) // id <= 2
+pred, err := pb.Gt("id", 1) // id > 1
+pred, err := pb.Ge("id", 2) // id >= 2
+
+// Null checks
+pred, err := pb.IsNull("name") // name IS NULL
+pred, err := pb.IsNotNull("name") // name IS NOT NULL
+
+// IN / NOT IN
+pred, err := pb.In("id", 1, 2, 3) // id IN (1, 2, 3)
+pred, err := pb.NotIn("name", "x", "y") // name NOT IN ("x", "y")
+```
+
+### Applying Filters
+
+Pass a predicate to `WithFilter` on the `ReadBuilder`:
+
+```go
+rb, err := table.NewReadBuilder()
+if err != nil {
+ log.Fatal(err)
+}
+defer rb.Close()
+
+pb := table.PredicateBuilder()
+pred, err := pb.Eq("id", 1)
+if err != nil {
+ log.Fatal(err)
+}
+
+// Ownership of pred is transferred — do NOT close it after this call
+if err := rb.WithFilter(pred); err != nil {
+ log.Fatal(err)
+}
+
+// Continue with scan-then-read...
+```
+
+### Compound Predicates
+
+Combine predicates with `And`, `Or`, and `Not`. The `predicate` sub-package
provides variadic helpers:
+
+```go
+import (
+ paimon "github.com/apache/paimon-rust/bindings/go"
+ "github.com/apache/paimon-rust/bindings/go/predicate"
+)
+
+pb := table.PredicateBuilder()
+
+p1, _ := pb.Ge("id", 1)
+p2, _ := pb.Le("id", 3)
+p3, _ := pb.Eq("name", "alice")
+
+// id >= 1 AND id <= 3
+combined, err := predicate.And(p1, p2)
+
+// (id >= 1 AND id <= 3) OR name = "alice"
+combined, err = predicate.Or(combined, p3)
+
+// NOT (...)
+negated, err := predicate.Not(combined)
+```
+
+!!! note "Predicate Ownership"
+ Predicates follow a **move** ownership model. After passing a predicate to
`WithFilter`, `And`, `Or`, or `Not`, the predicate is consumed and must NOT be
closed or reused by the caller.
+
+### Supported Datum Types
+
+Predicate values are automatically converted from Go types:
+
+| Go Type | Paimon Type |
+|-----------------------------|----------------------|
+| `bool` | Bool |
+| `int8` | TinyInt |
+| `int16` | SmallInt |
+| `int32` | Int |
+| `int` / `int64` | Int or Long |
+| `float32` | Float |
+| `float64` | Double |
+| `string` | String |
+| `paimon.Date` | Date (epoch days) |
+| `paimon.Time` | Time (millis) |
+| `paimon.Timestamp` | Timestamp |
+| `paimon.LocalZonedTimestamp` | LocalZonedTimestamp |
+| `paimon.Decimal` | Decimal |
+| `paimon.Bytes` | Binary |
+
+For special types, use the dedicated constructors:
+
+```go
+// Date as epoch days since 1970-01-01
+pred, _ := pb.Eq("dt", paimon.Date(19000))
+
+// Decimal(123.45) as DECIMAL(10,2)
+pred, _ := pb.Eq("amount", paimon.NewDecimal(12345, 10, 2))
+
+// Timestamp
+pred, _ := pb.Eq("ts", paimon.Timestamp{Millis: 1700000000000, Nanos: 0})
+```
+
+## Resource Management
+
+All Paimon objects (`Catalog`, `Table`, `ReadBuilder`, `TableScan`, `Plan`,
`TableRead`, `RecordBatchReader`) hold native resources and must be closed when
no longer needed. Use `defer` to ensure cleanup:
+
+```go
+catalog, err := paimon.NewCatalog(opts)
+if err != nil { log.Fatal(err) }
+defer catalog.Close()
+
+table, err := catalog.GetTable(id)
+if err != nil { log.Fatal(err) }
+defer table.Close()
+
+// ... and so on for ReadBuilder, TableScan, Plan, TableRead, RecordBatchReader
+```
+
+All `Close()` methods are safe to call multiple times.
+
+## Complete Example
+
+```go
+package main
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "log"
+
+ "github.com/apache/arrow-go/v18/arrow/array"
+ paimon "github.com/apache/paimon-rust/bindings/go"
+)
+
+func main() {
+ // 1. Open catalog and table
+ catalog, err := paimon.NewCatalog(map[string]string{
+ "warehouse": "/tmp/paimon-warehouse",
+ })
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer catalog.Close()
+
+ table, err := catalog.GetTable(paimon.NewIdentifier("default", "my_table"))
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer table.Close()
+
+ // 2. Configure read: projection + filter
+ rb, err := table.NewReadBuilder()
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer rb.Close()
+
+ if err := rb.WithProjection([]string{"id", "name"}); err != nil {
+ log.Fatal(err)
+ }
+
+ pb := table.PredicateBuilder()
+ pred, err := pb.Gt("id", 0)
+ if err != nil {
+ log.Fatal(err)
+ }
+ if err := rb.WithFilter(pred); err != nil {
+ log.Fatal(err)
+ }
+
+ // 3. Scan
+ scan, err := rb.NewScan()
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer scan.Close()
+
+ plan, err := scan.Plan()
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer plan.Close()
+
+ // 4. Read
+ read, err := rb.NewRead()
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer read.Close()
+
+ reader, err := read.NewRecordBatchReader(plan.Splits())
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer reader.Close()
+
+ for {
+ record, err := reader.NextRecord()
+ if errors.Is(err, io.EOF) {
+ break
+ }
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ idCol := record.Column(0).(*array.Int32)
+ nameCol := record.Column(1).(*array.String)
+ for i := 0; i < int(record.NumRows()); i++ {
+ fmt.Printf("id=%d name=%s\n", idCol.Value(i), nameCol.Value(i))
+ }
+ record.Release()
+ }
+}
+```