This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new e0d1f69  feat(tantivy): Add Tantivy full-text search with on-demand 
archive reading (#231)
e0d1f69 is described below

commit e0d1f6913ed6ddbacdf4a24dd6c599e1266b193e
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Apr 9 19:56:37 2026 +0800

    feat(tantivy): Add Tantivy full-text search with on-demand archive reading 
(#231)
    
    Introduce a complete Tantivy-based full-text search pipeline for global
    indexes, with on-demand I/O throughout:
    
    - ArchiveDirectory: reads only the archive header eagerly; file data is
      loaded via async FileRead when Tantivy requests it (sync-to-async
      bridge using std::thread::scope).
    - TantivyFullTextWriter: streams the packed archive directly to an
      OutputFile instead of buffering in memory.
    - TantivyFullTextReader: opens from InputFile/FileRead, never loads the
      full archive into memory.
    - FullTextSearchBuilder: self-contained builder on Table that reads the
      index manifest, evaluates searches against multiple Tantivy indexes
      in parallel (try_join_all), and returns ScoredGlobalIndexResult.
    - ScoredGlobalIndexResult + bitmap_to_ranges moved to table/source.rs
      (alongside RowRange) so vector search can reuse them later.
    - TableScan.with_row_ranges(): generic row-range filtering, decoupled
      from full-text specifics.
    - DataFusion full_text_search UDTF integration with test data.
---
 .github/workflows/ci.yml                           |   6 +-
 crates/integrations/datafusion/Cargo.toml          |   6 +
 .../datafusion/src/full_text_search.rs             | 256 +++++++++++++
 crates/integrations/datafusion/src/lib.rs          |   4 +
 crates/integrations/datafusion/src/table/mod.rs    |  92 ++---
 .../testdata/test_tantivy_fulltext.tar.gz          | Bin 0 -> 6121 bytes
 .../integrations/datafusion/tests/read_tables.rs   | 123 +++++++
 crates/paimon/Cargo.toml                           |   5 +-
 crates/paimon/src/lib.rs                           |   2 +
 .../paimon/src/table/full_text_search_builder.rs   | 207 +++++++++++
 crates/paimon/src/table/mod.rs                     |  12 +
 crates/paimon/src/table/table_scan.rs              |  13 +
 crates/paimon/src/tantivy/directory.rs             | 394 +++++++++++++++++++++
 crates/paimon/src/tantivy/full_text_search.rs      | 176 +++++++++
 crates/paimon/src/{lib.rs => tantivy/mod.rs}       |  32 +-
 crates/paimon/src/tantivy/reader.rs                | 198 +++++++++++
 crates/paimon/src/tantivy/writer.rs                | 242 +++++++++++++
 17 files changed, 1697 insertions(+), 71 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index ad308be..4ff2528 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -53,7 +53,7 @@ jobs:
         run: cargo fmt --all -- --check
 
       - name: Clippy
-        run: cargo clippy --all-targets --workspace -- -D warnings
+        run: cargo clippy --all-targets --workspace --features fulltext -- -D 
warnings
 
   build:
     runs-on: ${{ matrix.os }}
@@ -66,7 +66,7 @@ jobs:
     steps:
       - uses: actions/checkout@v6
       - name: Build
-        run: cargo build
+        run: cargo build --features fulltext
 
   unit:
     runs-on: ${{ matrix.os }}
@@ -80,7 +80,7 @@ jobs:
       - uses: actions/checkout@v6
 
       - name: Test
-        run: cargo test -p paimon --all-targets
+        run: cargo test -p paimon --all-targets --features fulltext
         env:
           RUST_LOG: DEBUG
           RUST_BACKTRACE: full
diff --git a/crates/integrations/datafusion/Cargo.toml 
b/crates/integrations/datafusion/Cargo.toml
index 370279c..ddfa2c0 100644
--- a/crates/integrations/datafusion/Cargo.toml
+++ b/crates/integrations/datafusion/Cargo.toml
@@ -19,6 +19,7 @@
 name = "paimon-datafusion"
 edition.workspace = true
 version.workspace = true
+exclude = ["testdata/"]
 license.workspace = true
 homepage = "https://paimon.apache.org/docs/rust/datafusion/";
 documentation = "https://docs.rs/paimon-datafusion";
@@ -26,6 +27,9 @@ description = "Apache Paimon DataFusion Integration"
 categories = ["database"]
 keywords = ["paimon", "datafusion", "integrations"]
 
+[features]
+fulltext = ["paimon/fulltext"]
+
 [dependencies]
 async-trait = "0.1"
 chrono = "0.4"
@@ -37,8 +41,10 @@ tokio = { workspace = true, features = ["rt", "time", "fs"] }
 [dev-dependencies]
 arrow-array = { workspace = true }
 arrow-schema = { workspace = true }
+flate2 = "1"
 parquet = { workspace = true }
 serde = "1"
 serde_json = "1"
+tar = "0.4"
 tempfile = "3"
 tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
diff --git a/crates/integrations/datafusion/src/full_text_search.rs 
b/crates/integrations/datafusion/src/full_text_search.rs
new file mode 100644
index 0000000..8d4c037
--- /dev/null
+++ b/crates/integrations/datafusion/src/full_text_search.rs
@@ -0,0 +1,256 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! `full_text_search` table-valued function for DataFusion.
+//!
+//! Usage:
+//! ```sql
+//! SELECT * FROM full_text_search('table_name', 'column_name', 'query text', 
10)
+//! ```
+//!
+//! Reference: 
[PaimonTableValuedFunctions.scala](https://github.com/apache/paimon/blob/master/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonTableValuedFunctions.scala)
+
+use std::any::Any;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
+use datafusion::catalog::Session;
+use datafusion::catalog::TableFunctionImpl;
+use datafusion::datasource::{TableProvider, TableType};
+use datafusion::error::Result as DFResult;
+use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::SessionContext;
+use paimon::catalog::{Catalog, Identifier};
+
+use crate::error::to_datafusion_error;
+use crate::runtime::{await_with_runtime, block_on_with_runtime};
+use crate::table::{build_paimon_scan, PaimonTableProvider};
+
+/// Register the `full_text_search` table-valued function on a 
[`SessionContext`].
+pub fn register_full_text_search(
+    ctx: &SessionContext,
+    catalog: Arc<dyn Catalog>,
+    default_database: &str,
+) {
+    ctx.register_udtf(
+        "full_text_search",
+        Arc::new(FullTextSearchFunction::new(catalog, default_database)),
+    );
+}
+
+/// Table function that performs full-text search on a Paimon table.
+///
+/// Arguments: `(table_name STRING, column_name STRING, query_text STRING, 
limit INT)`
+pub struct FullTextSearchFunction {
+    catalog: Arc<dyn Catalog>,
+    default_database: String,
+}
+
+impl Debug for FullTextSearchFunction {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("FullTextSearchFunction")
+            .field("default_database", &self.default_database)
+            .finish()
+    }
+}
+
+impl FullTextSearchFunction {
+    pub fn new(catalog: Arc<dyn Catalog>, default_database: &str) -> Self {
+        Self {
+            catalog,
+            default_database: default_database.to_string(),
+        }
+    }
+}
+
+impl TableFunctionImpl for FullTextSearchFunction {
+    fn call(&self, args: &[Expr]) -> DFResult<Arc<dyn TableProvider>> {
+        if args.len() != 4 {
+            return Err(datafusion::error::DataFusionError::Plan(
+                "full_text_search requires 4 arguments: (table_name, 
column_name, query_text, limit)".to_string(),
+            ));
+        }
+
+        let table_name = extract_string_literal(&args[0], "table_name")?;
+        let column_name = extract_string_literal(&args[1], "column_name")?;
+        let query_text = extract_string_literal(&args[2], "query_text")?;
+        let limit = extract_int_literal(&args[3], "limit")?;
+
+        if limit <= 0 {
+            return Err(datafusion::error::DataFusionError::Plan(
+                "full_text_search: limit must be positive".to_string(),
+            ));
+        }
+
+        let identifier = parse_table_identifier(&table_name, 
&self.default_database)?;
+
+        let catalog = Arc::clone(&self.catalog);
+        let table = block_on_with_runtime(
+            async move { catalog.get_table(&identifier).await },
+            "full_text_search: catalog access thread panicked",
+        )
+        .map_err(to_datafusion_error)?;
+
+        let inner = PaimonTableProvider::try_new(table)?;
+
+        Ok(Arc::new(FullTextSearchTableProvider {
+            inner,
+            column_name,
+            query_text,
+            limit: limit as usize,
+        }))
+    }
+}
+
+/// A wrapper around [`PaimonTableProvider`] that injects full-text search
+/// row filtering into the scan path.
+#[derive(Debug)]
+struct FullTextSearchTableProvider {
+    inner: PaimonTableProvider,
+    column_name: String,
+    query_text: String,
+    limit: usize,
+}
+
+#[async_trait]
+impl TableProvider for FullTextSearchTableProvider {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> ArrowSchemaRef {
+        self.inner.schema()
+    }
+
+    fn table_type(&self) -> TableType {
+        TableType::Base
+    }
+
+    async fn scan(
+        &self,
+        state: &dyn Session,
+        projection: Option<&Vec<usize>>,
+        _filters: &[Expr],
+        limit: Option<usize>,
+    ) -> DFResult<Arc<dyn ExecutionPlan>> {
+        let table = self.inner.table();
+
+        // Use FullTextSearchBuilder to execute the search.
+        let row_ranges = await_with_runtime(async {
+            let mut builder = table.new_full_text_search_builder();
+            builder
+                .with_text_column(&self.column_name)
+                .with_query_text(&self.query_text)
+                .with_limit(self.limit);
+            builder.execute().await.map_err(to_datafusion_error)
+        })
+        .await?;
+
+        // Convert search results to row ranges and inject into the scan.
+        let mut read_builder = table.new_read_builder();
+        if let Some(limit) = limit {
+            read_builder.with_limit(limit);
+        }
+        let scan = if row_ranges.is_empty() {
+            read_builder.new_scan()
+        } else {
+            read_builder.new_scan().with_row_ranges(row_ranges)
+        };
+        let plan = await_with_runtime(scan.plan())
+            .await
+            .map_err(to_datafusion_error)?;
+
+        let target = state.config_options().execution.target_partitions;
+        build_paimon_scan(
+            table,
+            &self.schema(),
+            &plan,
+            projection,
+            None,
+            limit,
+            target,
+        )
+    }
+
+    fn supports_filters_pushdown(
+        &self,
+        filters: &[&Expr],
+    ) -> DFResult<Vec<TableProviderFilterPushDown>> {
+        Ok(vec![
+            TableProviderFilterPushDown::Unsupported;
+            filters.len()
+        ])
+    }
+}
+
+fn extract_string_literal(expr: &Expr, name: &str) -> DFResult<String> {
+    match expr {
+        Expr::Literal(scalar, _) => {
+            let s = scalar.try_as_str().flatten().ok_or_else(|| {
+                datafusion::error::DataFusionError::Plan(format!(
+                    "full_text_search: {name} must be a string literal, got: 
{expr}"
+                ))
+            })?;
+            Ok(s.to_string())
+        }
+        _ => Err(datafusion::error::DataFusionError::Plan(format!(
+            "full_text_search: {name} must be a literal, got: {expr}"
+        ))),
+    }
+}
+
+fn extract_int_literal(expr: &Expr, name: &str) -> DFResult<i64> {
+    use datafusion::common::ScalarValue;
+    match expr {
+        Expr::Literal(scalar, _) => match scalar {
+            ScalarValue::Int8(Some(v)) => Ok(*v as i64),
+            ScalarValue::Int16(Some(v)) => Ok(*v as i64),
+            ScalarValue::Int32(Some(v)) => Ok(*v as i64),
+            ScalarValue::Int64(Some(v)) => Ok(*v),
+            ScalarValue::UInt8(Some(v)) => Ok(*v as i64),
+            ScalarValue::UInt16(Some(v)) => Ok(*v as i64),
+            ScalarValue::UInt32(Some(v)) => Ok(*v as i64),
+            ScalarValue::UInt64(Some(v)) => i64::try_from(*v).map_err(|_| {
+                datafusion::error::DataFusionError::Plan(format!(
+                    "full_text_search: {name} value {v} exceeds i64 range"
+                ))
+            }),
+            _ => Err(datafusion::error::DataFusionError::Plan(format!(
+                "full_text_search: {name} must be an integer literal, got: 
{expr}"
+            ))),
+        },
+        _ => Err(datafusion::error::DataFusionError::Plan(format!(
+            "full_text_search: {name} must be a literal, got: {expr}"
+        ))),
+    }
+}
+
+fn parse_table_identifier(name: &str, default_database: &str) -> 
DFResult<Identifier> {
+    let parts: Vec<&str> = name.split('.').collect();
+    match parts.len() {
+        1 => Ok(Identifier::new(default_database, parts[0])),
+        2 => Ok(Identifier::new(parts[0], parts[1])),
+        // 3-part name: catalog.database.table — ignore catalog prefix
+        3 => Ok(Identifier::new(parts[1], parts[2])),
+        _ => Err(datafusion::error::DataFusionError::Plan(format!(
+            "full_text_search: invalid table name '{name}', expected 'table', 
'database.table', or 'catalog.database.table'"
+        ))),
+    }
+}
diff --git a/crates/integrations/datafusion/src/lib.rs 
b/crates/integrations/datafusion/src/lib.rs
index 8454bf7..abcf744 100644
--- a/crates/integrations/datafusion/src/lib.rs
+++ b/crates/integrations/datafusion/src/lib.rs
@@ -39,6 +39,8 @@
 mod catalog;
 mod error;
 mod filter_pushdown;
+#[cfg(feature = "fulltext")]
+mod full_text_search;
 mod physical_plan;
 mod relation_planner;
 pub mod runtime;
@@ -46,6 +48,8 @@ mod table;
 
 pub use catalog::{PaimonCatalogProvider, PaimonSchemaProvider};
 pub use error::to_datafusion_error;
+#[cfg(feature = "fulltext")]
+pub use full_text_search::{register_full_text_search, FullTextSearchFunction};
 pub use physical_plan::PaimonTableScan;
 pub use relation_planner::PaimonRelationPlanner;
 pub use table::PaimonTableProvider;
diff --git a/crates/integrations/datafusion/src/table/mod.rs 
b/crates/integrations/datafusion/src/table/mod.rs
index fdc4f0e..65eb07f 100644
--- a/crates/integrations/datafusion/src/table/mod.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -74,7 +74,7 @@ impl PaimonTableProvider {
 }
 
 /// Distribute `items` into `num_buckets` groups using round-robin assignment.
-fn bucket_round_robin<T>(items: Vec<T>, num_buckets: usize) -> Vec<Vec<T>> {
+pub(crate) fn bucket_round_robin<T>(items: Vec<T>, num_buckets: usize) -> 
Vec<Vec<T>> {
     let mut buckets: Vec<Vec<T>> = (0..num_buckets).map(|_| 
Vec::new()).collect();
     for (i, item) in items.into_iter().enumerate() {
         buckets[i % num_buckets].push(item);
@@ -82,6 +82,49 @@ fn bucket_round_robin<T>(items: Vec<T>, num_buckets: usize) 
-> Vec<Vec<T>> {
     buckets
 }
 
+/// Build a [`PaimonTableScan`] from a planned [`paimon::table::Plan`].
+///
+/// Shared by [`PaimonTableProvider`] and the full-text search UDTF to avoid
+/// duplicating projection, partition distribution, and scan construction.
+pub(crate) fn build_paimon_scan(
+    table: &Table,
+    schema: &ArrowSchemaRef,
+    plan: &paimon::table::Plan,
+    projection: Option<&Vec<usize>>,
+    pushed_predicate: Option<paimon::spec::Predicate>,
+    limit: Option<usize>,
+    target_partitions: usize,
+) -> DFResult<Arc<dyn ExecutionPlan>> {
+    let (projected_schema, projected_columns) = if let Some(indices) = 
projection {
+        let fields: Vec<Field> = indices.iter().map(|&i| 
schema.field(i).clone()).collect();
+        let column_names: Vec<String> = fields.iter().map(|f| 
f.name().clone()).collect();
+        (Arc::new(Schema::new(fields)), Some(column_names))
+    } else {
+        let column_names: Vec<String> = schema.fields().iter().map(|f| 
f.name().clone()).collect();
+        (schema.clone(), Some(column_names))
+    };
+
+    let splits = plan.splits().to_vec();
+    let planned_partitions: Vec<Arc<[_]>> = if splits.is_empty() {
+        vec![Arc::from(Vec::new())]
+    } else {
+        let num_partitions = splits.len().min(target_partitions.max(1));
+        bucket_round_robin(splits, num_partitions)
+            .into_iter()
+            .map(Arc::from)
+            .collect()
+    };
+
+    Ok(Arc::new(PaimonTableScan::new(
+        projected_schema,
+        table.clone(),
+        projected_columns,
+        pushed_predicate,
+        planned_partitions,
+        limit,
+    )))
+}
+
 #[async_trait]
 impl TableProvider for PaimonTableProvider {
     fn as_any(&self) -> &dyn Any {
@@ -103,23 +146,6 @@ impl TableProvider for PaimonTableProvider {
         filters: &[Expr],
         limit: Option<usize>,
     ) -> DFResult<Arc<dyn ExecutionPlan>> {
-        let (projected_schema, projected_columns) = if let Some(indices) = 
projection {
-            let fields: Vec<Field> = indices
-                .iter()
-                .map(|&i| self.schema.field(i).clone())
-                .collect();
-            let column_names: Vec<String> = fields.iter().map(|f| 
f.name().clone()).collect();
-            (Arc::new(Schema::new(fields)), Some(column_names))
-        } else {
-            let column_names: Vec<String> = self
-                .schema
-                .fields()
-                .iter()
-                .map(|f| f.name().clone())
-                .collect();
-            (self.schema.clone(), Some(column_names))
-        };
-
         // Plan splits eagerly so we know partition count upfront.
         let pushed_predicate = build_pushed_predicate(filters, 
self.table.schema().fields());
         let mut read_builder = self.table.new_read_builder();
@@ -140,30 +166,16 @@ impl TableProvider for PaimonTableProvider {
             .await
             .map_err(to_datafusion_error)?;
 
-        // Distribute splits across DataFusion partitions, capped by the
-        // session's target_partitions to avoid over-sharding with many small 
splits.
-        // Each partition's splits are wrapped in Arc to avoid deep-cloning in 
execute().
-        let splits = plan.splits().to_vec();
-        let planned_partitions: Vec<Arc<[_]>> = if splits.is_empty() {
-            // Empty plans get a single empty partition to avoid 0-partition 
edge cases.
-            vec![Arc::from(Vec::new())]
-        } else {
-            let target = state.config_options().execution.target_partitions;
-            let num_partitions = splits.len().min(target.max(1));
-            bucket_round_robin(splits, num_partitions)
-                .into_iter()
-                .map(Arc::from)
-                .collect()
-        };
-
-        Ok(Arc::new(PaimonTableScan::new(
-            projected_schema,
-            self.table.clone(),
-            projected_columns,
+        let target = state.config_options().execution.target_partitions;
+        build_paimon_scan(
+            &self.table,
+            &self.schema,
+            &plan,
+            projection,
             pushed_predicate,
-            planned_partitions,
             limit,
-        )))
+            target,
+        )
     }
 
     fn supports_filters_pushdown(
diff --git 
a/crates/integrations/datafusion/testdata/test_tantivy_fulltext.tar.gz 
b/crates/integrations/datafusion/testdata/test_tantivy_fulltext.tar.gz
new file mode 100644
index 0000000..3fb2f07
Binary files /dev/null and 
b/crates/integrations/datafusion/testdata/test_tantivy_fulltext.tar.gz differ
diff --git a/crates/integrations/datafusion/tests/read_tables.rs 
b/crates/integrations/datafusion/tests/read_tables.rs
index 5a20712..4fc26d6 100644
--- a/crates/integrations/datafusion/tests/read_tables.rs
+++ b/crates/integrations/datafusion/tests/read_tables.rs
@@ -754,3 +754,126 @@ async fn test_filter_row_id_from_data_evolution_table() {
         }
     }
 }
+
+// ======================= Full-Text Search Tests =======================
+
+#[cfg(feature = "fulltext")]
+mod fulltext_tests {
+    use std::sync::Arc;
+
+    use datafusion::arrow::array::{Int32Array, StringArray};
+    use datafusion::prelude::SessionContext;
+    use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options};
+    use paimon_datafusion::{register_full_text_search, PaimonCatalogProvider};
+
+    /// Extract the bundled tar.gz into a temp dir and return (tempdir, 
warehouse_path).
+    fn extract_test_warehouse() -> (tempfile::TempDir, String) {
+        let archive_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
+            .join("testdata/test_tantivy_fulltext.tar.gz");
+        let file = std::fs::File::open(&archive_path)
+            .unwrap_or_else(|e| panic!("Failed to open {}: {e}", 
archive_path.display()));
+        let decoder = flate2::read::GzDecoder::new(file);
+        let mut archive = tar::Archive::new(decoder);
+
+        let tmp = tempfile::tempdir().expect("Failed to create temp dir");
+        let db_dir = tmp.path().join("default.db");
+        std::fs::create_dir_all(&db_dir).unwrap();
+        archive.unpack(&db_dir).unwrap();
+
+        let warehouse = format!("file://{}", tmp.path().display());
+        (tmp, warehouse)
+    }
+
+    async fn create_fulltext_context() -> (SessionContext, tempfile::TempDir) {
+        let (tmp, warehouse) = extract_test_warehouse();
+        let mut options = Options::new();
+        options.set(CatalogOptions::WAREHOUSE, warehouse);
+        let catalog = FileSystemCatalog::new(options).expect("Failed to create 
catalog");
+        let catalog: Arc<dyn Catalog> = Arc::new(catalog);
+
+        let ctx = SessionContext::new();
+        ctx.register_catalog(
+            "paimon",
+            Arc::new(PaimonCatalogProvider::new(Arc::clone(&catalog))),
+        );
+        register_full_text_search(&ctx, catalog, "default");
+        (ctx, tmp)
+    }
+
+    fn extract_id_content_rows(
+        batches: &[datafusion::arrow::record_batch::RecordBatch],
+    ) -> Vec<(i32, String)> {
+        let mut rows = Vec::new();
+        for batch in batches {
+            let id_array = batch
+                .column_by_name("id")
+                .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+                .expect("Expected Int32Array for id");
+            let content_array = batch
+                .column_by_name("content")
+                .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+                .expect("Expected StringArray for content");
+            for i in 0..batch.num_rows() {
+                rows.push((id_array.value(i), 
content_array.value(i).to_string()));
+            }
+        }
+        rows.sort_by_key(|(id, _)| *id);
+        rows
+    }
+
+    /// Search for 'paimon' — rows 0, 2, 4 mention "paimon".
+    #[tokio::test]
+    async fn test_full_text_search_paimon() {
+        let (ctx, _tmp) = create_fulltext_context().await;
+        let batches = ctx
+            .sql("SELECT id, content FROM 
full_text_search('paimon.default.test_tantivy_fulltext', 'content', 'paimon', 
10)")
+            .await
+            .expect("SQL should parse")
+            .collect()
+            .await
+            .expect("query should execute");
+
+        let rows = extract_id_content_rows(&batches);
+        let ids: Vec<i32> = rows.iter().map(|(id, _)| *id).collect();
+        assert_eq!(
+            ids,
+            vec![0, 2, 4],
+            "Searching 'paimon' should match rows 0, 2, 4"
+        );
+    }
+
+    /// Search for 'tantivy' — only row 1.
+    #[tokio::test]
+    async fn test_full_text_search_tantivy() {
+        let (ctx, _tmp) = create_fulltext_context().await;
+        let batches = ctx
+            .sql("SELECT id, content FROM 
full_text_search('paimon.default.test_tantivy_fulltext', 'content', 'tantivy', 
10)")
+            .await
+            .expect("SQL should parse")
+            .collect()
+            .await
+            .expect("query should execute");
+
+        let rows = extract_id_content_rows(&batches);
+        let ids: Vec<i32> = rows.iter().map(|(id, _)| *id).collect();
+        assert_eq!(ids, vec![1], "Searching 'tantivy' should match row 1");
+    }
+
+    /// Search for 'search' — rows 1, 3 mention "full-text search".
+    #[tokio::test]
+    async fn test_full_text_search_search() {
+        let (ctx, _tmp) = create_fulltext_context().await;
+        let batches = ctx
+            .sql("SELECT id, content FROM 
full_text_search('paimon.default.test_tantivy_fulltext', 'content', 'search', 
10)")
+            .await
+            .expect("SQL should parse")
+            .collect()
+            .await
+            .expect("query should execute");
+
+        let rows = extract_id_content_rows(&batches);
+        let ids: Vec<i32> = rows.iter().map(|(id, _)| *id).collect();
+        assert!(ids.contains(&1), "Searching 'search' should match row 1");
+        assert!(ids.contains(&3), "Searching 'search' should match row 3");
+    }
+}
diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index 682adab..10e5fa0 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -20,6 +20,7 @@ categories = ["database"]
 description = "The rust implementation of Apache Paimon"
 documentation = "https://docs.rs/paimon";
 name = "paimon"
+exclude = ["testdata/"]
 
 homepage.workspace = true
 repository.workspace = true
@@ -30,6 +31,7 @@ version.workspace = true
 [features]
 default = ["storage-memory", "storage-fs", "storage-oss"]
 storage-all = ["storage-memory", "storage-fs", "storage-oss", "storage-s3"]
+fulltext = ["tantivy", "tempfile"]
 
 storage-memory = ["opendal/services-memory"]
 storage-fs = ["opendal/services-fs"]
@@ -77,9 +79,10 @@ md-5 = "0.10"
 regex = "1"
 uuid = { version = "1", features = ["v4"] }
 urlencoding = "2.1"
+tantivy = { version = "0.22", optional = true }
+tempfile = { version = "3", optional = true }
 
 [dev-dependencies]
 axum = { version = "0.7", features = ["macros", "tokio", "http1", "http2"] }
 rand = "0.8.5"
-
 tempfile = "3"
diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs
index f2deafe..a68ba6b 100644
--- a/crates/paimon/src/lib.rs
+++ b/crates/paimon/src/lib.rs
@@ -34,6 +34,8 @@ pub mod io;
 mod predicate_stats;
 pub mod spec;
 pub mod table;
+#[cfg(feature = "fulltext")]
+pub mod tantivy;
 
 pub use catalog::Catalog;
 pub use catalog::CatalogFactory;
diff --git a/crates/paimon/src/table/full_text_search_builder.rs 
b/crates/paimon/src/table/full_text_search_builder.rs
new file mode 100644
index 0000000..783bd57
--- /dev/null
+++ b/crates/paimon/src/table/full_text_search_builder.rs
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Full-text search builder for Paimon tables.
+//!
+//! Reference: 
[FullTextSearchBuilderImpl.java](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextSearchBuilderImpl.java)
+
+use crate::spec::{DataField, FileKind, IndexManifest};
+use crate::table::snapshot_manager::SnapshotManager;
+use crate::table::{RowRange, Table};
+use crate::tantivy::full_text_search::{FullTextSearch, SearchResult};
+use crate::tantivy::reader::TantivyFullTextReader;
+
+const INDEX_DIR: &str = "index";
+const TANTIVY_FULLTEXT_INDEX_TYPE: &str = "tantivy-fulltext";
+
+/// Builder for executing full-text search on a Paimon table.
+///
+/// Usage:
+/// ```ignore
+/// let result = table.new_full_text_search_builder()
+///     .with_text_column("content")
+///     .with_query_text("hello world")
+///     .with_limit(10)
+///     .execute()
+///     .await?;
+/// ```
+///
+/// Reference: `org.apache.paimon.table.source.FullTextSearchBuilder`
+pub struct FullTextSearchBuilder<'a> {
+    table: &'a Table,
+    text_column: Option<String>,
+    query_text: Option<String>,
+    limit: Option<usize>,
+}
+
+impl<'a> FullTextSearchBuilder<'a> {
+    pub(crate) fn new(table: &'a Table) -> Self {
+        Self {
+            table,
+            text_column: None,
+            query_text: None,
+            limit: None,
+        }
+    }
+
+    /// Set the text column to search.
+    pub fn with_text_column(&mut self, name: &str) -> &mut Self {
+        self.text_column = Some(name.to_string());
+        self
+    }
+
+    /// Set the query text to search for.
+    pub fn with_query_text(&mut self, query: &str) -> &mut Self {
+        self.query_text = Some(query.to_string());
+        self
+    }
+
+    /// Set the top-k limit for results.
+    pub fn with_limit(&mut self, limit: usize) -> &mut Self {
+        self.limit = Some(limit);
+        self
+    }
+
+    /// Execute the full-text search and return row ranges.
+    ///
+    /// This reads the latest snapshot, loads the index manifest, and evaluates
+    /// the search against Tantivy indexes.
+    ///
+    /// Reference: `FullTextSearchBuilder.executeLocal()`
+    pub async fn execute(&self) -> crate::Result<Vec<RowRange>> {
+        let text_column =
+            self.text_column
+                .as_deref()
+                .ok_or_else(|| crate::Error::ConfigInvalid {
+                    message: "Text column must be set via 
with_text_column()".to_string(),
+                })?;
+        let query_text = self
+            .query_text
+            .as_deref()
+            .ok_or_else(|| crate::Error::ConfigInvalid {
+                message: "Query text must be set via 
with_query_text()".to_string(),
+            })?;
+        let limit = self.limit.ok_or_else(|| crate::Error::ConfigInvalid {
+            message: "Limit must be set via with_limit()".to_string(),
+        })?;
+
+        let search = FullTextSearch::new(query_text.to_string(), limit, 
text_column.to_string())?;
+
+        let snapshot_manager = SnapshotManager::new(
+            self.table.file_io().clone(),
+            self.table.location().to_string(),
+        );
+
+        let snapshot = match snapshot_manager.get_latest_snapshot().await? {
+            Some(s) => s,
+            None => return Ok(Vec::new()),
+        };
+
+        let index_manifest_name = match snapshot.index_manifest() {
+            Some(name) => name.to_string(),
+            None => return Ok(Vec::new()),
+        };
+
+        let manifest_path = format!(
+            "{}/manifest/{}",
+            self.table.location().trim_end_matches('/'),
+            index_manifest_name
+        );
+        let index_entries = IndexManifest::read(self.table.file_io(), 
&manifest_path).await?;
+
+        evaluate_full_text_search(
+            self.table.file_io(),
+            self.table.location(),
+            &index_entries,
+            &search,
+            self.table.schema().fields(),
+        )
+        .await
+    }
+}
+
+/// Evaluate a full-text search query against Tantivy indexes found in the 
index manifest.
+async fn evaluate_full_text_search(
+    file_io: &crate::io::FileIO,
+    table_path: &str,
+    index_entries: &[crate::spec::IndexManifestEntry],
+    search: &FullTextSearch,
+    schema_fields: &[DataField],
+) -> crate::Result<Vec<RowRange>> {
+    let table_path = table_path.trim_end_matches('/');
+
+    let field_id = match find_field_id_by_name(schema_fields, 
&search.field_name) {
+        Some(id) => id,
+        None => return Ok(Vec::new()),
+    };
+
+    // Collect tantivy fulltext entries for the target field.
+    let fulltext_entries: Vec<_> = index_entries
+        .iter()
+        .filter(|e| {
+            e.kind == FileKind::Add
+                && e.index_file.index_type == TANTIVY_FULLTEXT_INDEX_TYPE
+                && e.index_file
+                    .global_index_meta
+                    .as_ref()
+                    .is_some_and(|m| m.index_field_id == field_id)
+        })
+        .collect();
+
+    if fulltext_entries.is_empty() {
+        return Ok(Vec::new());
+    }
+
+    let futures: Vec<_> = fulltext_entries
+        .into_iter()
+        .map(|entry| {
+            let global_meta = 
entry.index_file.global_index_meta.as_ref().unwrap();
+            let path = format!("{table_path}/{INDEX_DIR}/{}", 
entry.index_file.file_name);
+            let file_name = entry.index_file.file_name.clone();
+            let query_text = search.query_text.clone();
+            let limit = search.limit;
+            let row_range_start = global_meta.row_range_start;
+            let input = file_io.new_input(&path);
+            async move {
+                let input = input?;
+                let reader = TantivyFullTextReader::from_input_file(&input)
+                    .await
+                    .map_err(|e| crate::Error::UnexpectedError {
+                        message: format!(
+                            "Failed to open Tantivy full-text index '{}': {}",
+                            file_name, e
+                        ),
+                        source: None,
+                    })?;
+                let result = reader.search(&query_text, limit)?;
+                Ok::<_, crate::Error>(result.offset(row_range_start))
+            }
+        })
+        .collect();
+
+    let results = futures::future::try_join_all(futures).await?;
+    let mut merged = SearchResult::empty();
+    for r in &results {
+        merged = merged.or(r);
+    }
+
+    Ok(merged.top_k(search.limit).to_row_ranges())
+}
+
+fn find_field_id_by_name(fields: &[DataField], name: &str) -> Option<i32> {
+    fields.iter().find(|f| f.name() == name).map(|f| f.id())
+}
diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs
index 9b1d6f7..1b34324 100644
--- a/crates/paimon/src/table/mod.rs
+++ b/crates/paimon/src/table/mod.rs
@@ -19,6 +19,8 @@
 
 pub(crate) mod bin_pack;
 mod bucket_filter;
+#[cfg(feature = "fulltext")]
+mod full_text_search_builder;
 pub(crate) mod global_index_scanner;
 mod read_builder;
 pub(crate) mod row_id_predicate;
@@ -31,6 +33,8 @@ mod tag_manager;
 
 use crate::Result;
 use arrow_array::RecordBatch;
+#[cfg(feature = "fulltext")]
+pub use full_text_search_builder::FullTextSearchBuilder;
 use futures::stream::BoxStream;
 pub use read_builder::{ReadBuilder, TableRead};
 pub use schema_manager::SchemaManager;
@@ -106,6 +110,14 @@ impl Table {
         ReadBuilder::new(self)
     }
 
+    /// Create a full-text search builder.
+    ///
+    /// Reference: 
[FullTextSearchBuilderImpl](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextSearchBuilderImpl.java)
+    #[cfg(feature = "fulltext")]
+    pub fn new_full_text_search_builder(&self) -> FullTextSearchBuilder<'_> {
+        FullTextSearchBuilder::new(self)
+    }
+
     /// Create a copy of this table with extra options merged into the schema.
     pub fn copy_with_options(&self, extra: HashMap<String, String>) -> Self {
         Self {
diff --git a/crates/paimon/src/table/table_scan.rs 
b/crates/paimon/src/table/table_scan.rs
index 4e38958..e21f00b 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -329,6 +329,19 @@ impl<'a> TableScan<'a> {
         }
     }
 
+    /// Set row ranges for scan-time filtering.
+    ///
+    /// This replaces any existing row_ranges. Typically used to inject
+    /// results from global index lookups (e.g. full-text search).
+    pub fn with_row_ranges(mut self, ranges: Vec<RowRange>) -> Self {
+        self.row_ranges = if ranges.is_empty() {
+            None
+        } else {
+            Some(ranges)
+        };
+        self
+    }
+
     /// Plan the full scan: resolve snapshot (via options or latest), then 
read manifests and build DataSplits.
     ///
     /// Time travel is resolved from table options:
diff --git a/crates/paimon/src/tantivy/directory.rs 
b/crates/paimon/src/tantivy/directory.rs
new file mode 100644
index 0000000..32c3058
--- /dev/null
+++ b/crates/paimon/src/tantivy/directory.rs
@@ -0,0 +1,394 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Archive-backed Tantivy Directory implementation.
+//!
+//! Reads a Tantivy index packed into a single archive file. The archive format
+//! (Big-Endian, compatible with Java Paimon):
+//!
+//! ```text
+//! [fileCount: 4 bytes BE]
+//! for each file:
+//!   [nameLen: 4 bytes BE]
+//!   [name: nameLen bytes UTF-8]
+//!   [dataLen: 8 bytes BE]
+//!   [data: dataLen bytes]
+//! ```
+//!
+//! Reference: 
`org.apache.paimon.tantivy.index.TantivyFullTextGlobalIndexWriter.packIndex()`
+//! Reference: 
`org.apache.paimon.tantivy.index.TantivyFullTextGlobalIndexReader.parseArchiveHeader()`
+
+use bytes::Bytes;
+use std::collections::HashMap;
+use std::fmt;
+use std::io;
+use std::ops::Range;
+use std::path::{Path, PathBuf};
+use std::sync::{Arc, Mutex};
+use tantivy::directory::error::{DeleteError, LockError, OpenReadError, 
OpenWriteError};
+use tantivy::directory::Directory;
+use tantivy::directory::{
+    AntiCallToken, DirectoryLock, FileHandle, Lock, OwnedBytes, 
TerminatingWrite, WatchCallback,
+    WatchHandle, WritePtr,
+};
+use tantivy::HasLen;
+
+use crate::io::FileRead;
+
+/// Metadata for a single file within the archive.
+#[derive(Clone, Debug)]
+struct FileMeta {
+    /// Absolute byte offset of the file data within the archive.
+    offset: u64,
+    length: usize,
+}
+
+/// A read-only Tantivy `Directory` backed by an archive file.
+///
+/// Only the archive header (file names, offsets, lengths) is parsed eagerly.
+/// Actual file data is read on demand via `FileRead`.
+#[derive(Clone)]
+pub struct ArchiveDirectory {
+    files: Arc<HashMap<PathBuf, FileMeta>>,
+    reader: Arc<dyn FileRead>,
+    /// In-memory storage for atomic_write (used by Tantivy for meta.json).
+    atomic_data: Arc<Mutex<HashMap<PathBuf, Vec<u8>>>>,
+}
+
+impl ArchiveDirectory {
+    /// Create an `ArchiveDirectory` from an async `FileRead`.
+    ///
+    /// Only the archive header (file names, offsets, lengths) is read eagerly.
+    /// Actual file data is read on demand when Tantivy requests it.
+    pub async fn from_reader(reader: impl FileRead, file_size: u64) -> 
io::Result<Self> {
+        let reader: Arc<dyn FileRead> = Arc::new(reader);
+
+        if file_size < 4 {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidData,
+                "Archive too small to contain file count",
+            ));
+        }
+
+        // Read file count (4 bytes).
+        let buf = reader
+            .read(0..4)
+            .await
+            .map_err(|e| io::Error::other(e.to_string()))?;
+        let file_count = i32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
+        if file_count < 0 {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidData,
+                format!("Negative file count in archive: {file_count}"),
+            ));
+        }
+        let file_count = file_count as usize;
+
+        let mut pos: u64 = 4;
+        let mut files = HashMap::with_capacity(file_count);
+
+        for _ in 0..file_count {
+            // Read name_len (4 bytes).
+            let buf = reader
+                .read(pos..pos + 4)
+                .await
+                .map_err(|e| io::Error::other(e.to_string()))?;
+            let name_len = i32::from_be_bytes([buf[0], buf[1], buf[2], 
buf[3]]);
+            if name_len < 0 {
+                return Err(io::Error::new(
+                    io::ErrorKind::InvalidData,
+                    format!("Negative name length in archive: {name_len}"),
+                ));
+            }
+            let name_len = name_len as u64;
+            pos += 4;
+
+            // Read name + data_len together in a single IO call.
+            let meta_buf = reader
+                .read(pos..pos + name_len + 8)
+                .await
+                .map_err(|e| io::Error::other(e.to_string()))?;
+
+            let name = String::from_utf8(meta_buf[..name_len as 
usize].to_vec()).map_err(|e| {
+                io::Error::new(
+                    io::ErrorKind::InvalidData,
+                    format!("Invalid UTF-8 in file name: {}", e),
+                )
+            })?;
+
+            let dl = name_len as usize;
+            let data_len = i64::from_be_bytes([
+                meta_buf[dl],
+                meta_buf[dl + 1],
+                meta_buf[dl + 2],
+                meta_buf[dl + 3],
+                meta_buf[dl + 4],
+                meta_buf[dl + 5],
+                meta_buf[dl + 6],
+                meta_buf[dl + 7],
+            ]);
+            if data_len < 0 {
+                return Err(io::Error::new(
+                    io::ErrorKind::InvalidData,
+                    format!("Negative data length in archive: {data_len}"),
+                ));
+            }
+            let data_len = data_len as u64;
+            pos += name_len + 8;
+
+            let data_offset = pos;
+            files.insert(
+                PathBuf::from(&name),
+                FileMeta {
+                    offset: data_offset,
+                    length: data_len as usize,
+                },
+            );
+
+            // Skip past file data — do NOT read it.
+            pos += data_len;
+        }
+
+        Ok(Self {
+            files: Arc::new(files),
+            reader,
+            atomic_data: Arc::new(Mutex::new(HashMap::new())),
+        })
+    }
+}
+
+/// Bridge sync Tantivy `FileHandle::read_bytes` to async `FileRead::read`.
+///
+/// Uses `block_in_place` on multi-threaded tokio runtimes (no thread spawn
+/// overhead). Falls back to a scoped thread for current-thread runtimes.
+fn block_on_read(reader: &Arc<dyn FileRead>, range: Range<u64>) -> 
io::Result<Bytes> {
+    let handle = tokio::runtime::Handle::current();
+    let do_read = || {
+        handle
+            .block_on(reader.read(range.clone()))
+            .map_err(|e| io::Error::other(e.to_string()))
+    };
+
+    match handle.runtime_flavor() {
+        tokio::runtime::RuntimeFlavor::MultiThread => 
tokio::task::block_in_place(do_read),
+        _ => {
+            // Current-thread runtime: block_in_place is not available,
+            // fall back to a scoped thread.
+            let reader = Arc::clone(reader);
+            std::thread::scope(|s| {
+                s.spawn(move || {
+                    handle
+                        .block_on(reader.read(range))
+                        .map_err(|e| io::Error::other(e.to_string()))
+                })
+                .join()
+                .map_err(|_| io::Error::other("reader thread panicked"))?
+            })
+        }
+    }
+}
+
+impl fmt::Debug for ArchiveDirectory {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("ArchiveDirectory")
+            .field("files", &self.files.keys().collect::<Vec<_>>())
+            .finish()
+    }
+}
+
+/// A `FileHandle` for a single file within the archive.
+#[derive(Clone)]
+struct ArchiveFileHandle {
+    reader: Arc<dyn FileRead>,
+    file_offset: u64,
+    file_length: usize,
+}
+
+impl fmt::Debug for ArchiveFileHandle {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("ArchiveFileHandle")
+            .field("offset", &self.file_offset)
+            .field("length", &self.file_length)
+            .finish()
+    }
+}
+
+impl HasLen for ArchiveFileHandle {
+    fn len(&self) -> usize {
+        self.file_length
+    }
+}
+
+impl FileHandle for ArchiveFileHandle {
+    fn read_bytes(&self, range: Range<usize>) -> io::Result<OwnedBytes> {
+        if range.end > self.file_length {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidInput,
+                format!(
+                    "Read range {:?} exceeds file length {}",
+                    range, self.file_length
+                ),
+            ));
+        }
+
+        let abs_start = self.file_offset + range.start as u64;
+        let abs_end = self.file_offset + range.end as u64;
+        let data = block_on_read(&self.reader, abs_start..abs_end)?;
+        Ok(OwnedBytes::new(data.to_vec()))
+    }
+}
+
+impl Directory for ArchiveDirectory {
+    fn get_file_handle(&self, path: &Path) -> Result<Arc<dyn FileHandle>, 
OpenReadError> {
+        let meta = self
+            .files
+            .get(path)
+            .ok_or_else(|| 
OpenReadError::FileDoesNotExist(path.to_path_buf()))?;
+
+        Ok(Arc::new(ArchiveFileHandle {
+            reader: self.reader.clone(),
+            file_offset: meta.offset,
+            file_length: meta.length,
+        }))
+    }
+
+    fn exists(&self, path: &Path) -> Result<bool, OpenReadError> {
+        Ok(self.files.contains_key(path) || 
self.atomic_data.lock().unwrap().contains_key(path))
+    }
+
+    fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
+        if let Some(data) = self.atomic_data.lock().unwrap().get(path) {
+            return Ok(data.clone());
+        }
+        let meta = self
+            .files
+            .get(path)
+            .ok_or_else(|| 
OpenReadError::FileDoesNotExist(path.to_path_buf()))?;
+
+        let data = block_on_read(&self.reader, meta.offset..meta.offset + 
meta.length as u64)
+            .map_err(|e| OpenReadError::wrap_io_error(e, path.to_path_buf()))?;
+        Ok(data.to_vec())
+    }
+
+    fn atomic_write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
+        self.atomic_data
+            .lock()
+            .unwrap()
+            .insert(path.to_path_buf(), data.to_vec());
+        Ok(())
+    }
+
+    fn delete(&self, _path: &Path) -> Result<(), DeleteError> {
+        Ok(())
+    }
+
+    fn open_write(&self, _path: &Path) -> Result<WritePtr, OpenWriteError> {
+        Ok(io::BufWriter::new(Box::new(
+            VecTerminatingWrite(Vec::new()),
+        )))
+    }
+
+    fn sync_directory(&self) -> io::Result<()> {
+        Ok(())
+    }
+
+    fn acquire_lock(&self, _lock: &Lock) -> Result<DirectoryLock, LockError> {
+        Ok(DirectoryLock::from(Box::new(())))
+    }
+
+    fn watch(&self, _watch_callback: WatchCallback) -> 
tantivy::Result<WatchHandle> {
+        Ok(WatchHandle::empty())
+    }
+}
+
+/// Dummy writer for lock file support.
+struct VecTerminatingWrite(Vec<u8>);
+
+impl io::Write for VecTerminatingWrite {
+    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+        self.0.extend_from_slice(buf);
+        Ok(buf.len())
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        Ok(())
+    }
+}
+
+impl TerminatingWrite for VecTerminatingWrite {
+    fn terminate_ref(&mut self, _token: AntiCallToken) -> io::Result<()> {
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::io::FileIOBuilder;
+    use crate::tantivy::writer::TantivyFullTextWriter;
+
+    async fn make_test_dir() -> ArchiveDirectory {
+        let file_io = FileIOBuilder::new("memory").build().unwrap();
+
+        let mut writer = TantivyFullTextWriter::new().unwrap();
+        writer.add_document(0, Some("hello")).unwrap();
+        writer.add_document(1, Some("world")).unwrap();
+        let output = file_io.new_output("/test_archive.bin").unwrap();
+        writer.finish(&output).await.unwrap();
+
+        let input = output.to_input_file();
+        let metadata = input.metadata().await.unwrap();
+        let reader = input.reader().await.unwrap();
+        ArchiveDirectory::from_reader(reader, metadata.size)
+            .await
+            .unwrap()
+    }
+
+    #[tokio::test]
+    async fn test_parse_archive() {
+        let dir = make_test_dir().await;
+        // Tantivy index files should be present.
+        assert!(!dir.files.is_empty());
+    }
+
+    #[tokio::test]
+    async fn test_read_file_from_archive() {
+        let dir = make_test_dir().await;
+        // Read any file from the archive and verify it's non-empty.
+        let first_path = dir.files.keys().next().unwrap().clone();
+        let handle = dir.get_file_handle(&first_path).unwrap();
+        assert!(handle.len() > 0);
+        let data = handle.read_bytes(0..handle.len()).unwrap();
+        assert_eq!(data.len(), handle.len());
+    }
+
+    #[tokio::test]
+    async fn test_atomic_read_write() {
+        let dir = make_test_dir().await;
+
+        // atomic_write + atomic_read
+        dir.atomic_write(Path::new("meta.json"), b"{}").unwrap();
+        let data = dir.atomic_read(Path::new("meta.json")).unwrap();
+        assert_eq!(&data, b"{}");
+    }
+
+    #[tokio::test]
+    async fn test_file_not_found() {
+        let dir = make_test_dir().await;
+        assert!(dir.get_file_handle(Path::new("missing.txt")).is_err());
+    }
+}
diff --git a/crates/paimon/src/tantivy/full_text_search.rs 
b/crates/paimon/src/tantivy/full_text_search.rs
new file mode 100644
index 0000000..77bcd03
--- /dev/null
+++ b/crates/paimon/src/tantivy/full_text_search.rs
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Full-text search types for global index.
+//!
+//! Reference: 
[org.apache.paimon.predicate.FullTextSearch](https://github.com/apache/paimon/blob/master/paimon-common/src/main/java/org/apache/paimon/predicate/FullTextSearch.java)
+
+/// Full-text search predicate.
+///
+/// Reference: `org.apache.paimon.predicate.FullTextSearch`
+#[derive(Debug, Clone)]
+pub struct FullTextSearch {
+    pub query_text: String,
+    pub field_name: String,
+    pub limit: usize,
+}
+
+impl FullTextSearch {
+    pub fn new(query_text: String, limit: usize, field_name: String) -> 
crate::Result<Self> {
+        if query_text.is_empty() {
+            return Err(crate::Error::ConfigInvalid {
+                message: "Query text cannot be empty".to_string(),
+            });
+        }
+        if limit == 0 {
+            return Err(crate::Error::ConfigInvalid {
+                message: "Limit must be positive".to_string(),
+            });
+        }
+        if field_name.is_empty() {
+            return Err(crate::Error::ConfigInvalid {
+                message: "Field name cannot be empty".to_string(),
+            });
+        }
+        Ok(Self {
+            query_text,
+            field_name,
+            limit,
+        })
+    }
+}
+
+/// Search result containing parallel arrays of row IDs and scores.
+#[derive(Debug, Clone)]
+pub struct SearchResult {
+    pub row_ids: Vec<u64>,
+    pub scores: Vec<f32>,
+}
+
+impl SearchResult {
+    pub fn new(row_ids: Vec<u64>, scores: Vec<f32>) -> Self {
+        assert_eq!(row_ids.len(), scores.len());
+        Self { row_ids, scores }
+    }
+
+    pub fn empty() -> Self {
+        Self {
+            row_ids: Vec::new(),
+            scores: Vec::new(),
+        }
+    }
+
+    pub fn len(&self) -> usize {
+        self.row_ids.len()
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.row_ids.is_empty()
+    }
+
+    /// Apply an offset to all row IDs.
+    pub fn offset(&self, offset: i64) -> Self {
+        if offset == 0 {
+            return self.clone();
+        }
+        let row_ids = self
+            .row_ids
+            .iter()
+            .map(|&id| {
+                if offset >= 0 {
+                    id.saturating_add(offset as u64)
+                } else {
+                    id.saturating_sub(offset.unsigned_abs())
+                }
+            })
+            .collect();
+        Self {
+            row_ids,
+            scores: self.scores.clone(),
+        }
+    }
+
+    /// Merge two search results.
+    pub fn or(&self, other: &SearchResult) -> Self {
+        let mut row_ids = self.row_ids.clone();
+        let mut scores = self.scores.clone();
+        row_ids.extend_from_slice(&other.row_ids);
+        scores.extend_from_slice(&other.scores);
+        Self { row_ids, scores }
+    }
+
+    /// Return top-k results by score (descending).
+    pub fn top_k(&self, k: usize) -> Self {
+        if self.row_ids.len() <= k {
+            return self.clone();
+        }
+        let mut indices: Vec<usize> = (0..self.row_ids.len()).collect();
+        indices.sort_by(|&a, &b| {
+            self.scores[b]
+                .partial_cmp(&self.scores[a])
+                .unwrap_or(std::cmp::Ordering::Equal)
+        });
+        indices.truncate(k);
+        let row_ids = indices.iter().map(|&i| self.row_ids[i]).collect();
+        let scores = indices.iter().map(|&i| self.scores[i]).collect();
+        Self { row_ids, scores }
+    }
+
+    /// Convert to sorted, merged row ranges.
+    pub fn to_row_ranges(&self) -> Vec<crate::table::RowRange> {
+        if self.row_ids.is_empty() {
+            return Vec::new();
+        }
+        let mut sorted: Vec<u64> = self.row_ids.clone();
+        sorted.sort_unstable();
+        sorted.dedup();
+        let mut ranges = Vec::new();
+        let mut start = sorted[0] as i64;
+        let mut end = start;
+        for &id in &sorted[1..] {
+            let id = id as i64;
+            if id == end + 1 {
+                end = id;
+            } else {
+                ranges.push(crate::table::RowRange::new(start, end));
+                start = id;
+                end = id;
+            }
+        }
+        ranges.push(crate::table::RowRange::new(start, end));
+        ranges
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_full_text_search_new() {
+        let fts = FullTextSearch::new("hello".into(), 10, 
"text".into()).unwrap();
+        assert_eq!(fts.query_text, "hello");
+        assert_eq!(fts.limit, 10);
+        assert_eq!(fts.field_name, "text");
+    }
+
+    #[test]
+    fn test_full_text_search_empty_query() {
+        let result = FullTextSearch::new("".into(), 10, "text".into());
+        assert!(result.is_err());
+    }
+}
diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/tantivy/mod.rs
similarity index 57%
copy from crates/paimon/src/lib.rs
copy to crates/paimon/src/tantivy/mod.rs
index f2deafe..0922937 100644
--- a/crates/paimon/src/lib.rs
+++ b/crates/paimon/src/tantivy/mod.rs
@@ -15,31 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod error;
-pub use error::Error;
-pub use error::Result;
+//! Tantivy-based full-text search for global index.
 
-pub mod common;
-pub use common::{CatalogOptions, Options};
-
-pub mod api;
-pub use api::rest_api::RESTApi;
-
-pub mod arrow;
-pub mod btree;
-pub mod catalog;
-mod deletion_vector;
-pub mod file_index;
-pub mod io;
-mod predicate_stats;
-pub mod spec;
-pub mod table;
-
-pub use catalog::Catalog;
-pub use catalog::CatalogFactory;
-pub use catalog::FileSystemCatalog;
-
-pub use table::{
-    DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, 
ReadBuilder, RowRange,
-    SnapshotManager, Table, TableRead, TableScan, TagManager,
-};
+pub(crate) mod directory;
+pub mod full_text_search;
+pub mod reader;
+pub mod writer;
diff --git a/crates/paimon/src/tantivy/reader.rs 
b/crates/paimon/src/tantivy/reader.rs
new file mode 100644
index 0000000..1748850
--- /dev/null
+++ b/crates/paimon/src/tantivy/reader.rs
@@ -0,0 +1,198 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tantivy full-text index reader.
+//!
+//! Reads a Tantivy index from an archive (packed by `TantivyFullTextWriter`)
+//! and performs full-text search queries.
+//!
+//! Reference: 
`org.apache.paimon.tantivy.index.TantivyFullTextGlobalIndexReader`
+
+use crate::io::{FileRead, InputFile};
+use crate::tantivy::directory::ArchiveDirectory;
+use crate::tantivy::full_text_search::SearchResult;
+use tantivy::collector::TopDocs;
+use tantivy::query::QueryParser;
+use tantivy::{Index, IndexReader, ReloadPolicy};
+
+/// Reader for a Tantivy full-text index stored in archive format.
+pub struct TantivyFullTextReader {
+    reader: IndexReader,
+    index: Index,
+}
+
+impl TantivyFullTextReader {
+    /// Open a reader from an `InputFile` (on-demand reading, no full load).
+    pub async fn from_input_file(input: &InputFile) -> crate::Result<Self> {
+        let metadata = input.metadata().await?;
+        let reader = input.reader().await?;
+        Self::from_reader(reader, metadata.size).await
+    }
+
+    /// Open a reader from an async `FileRead` and file size.
+    pub async fn from_reader(reader: impl FileRead, file_size: u64) -> 
crate::Result<Self> {
+        let directory = ArchiveDirectory::from_reader(reader, file_size)
+            .await
+            .map_err(|e| crate::Error::UnexpectedError {
+                message: format!("Failed to parse Tantivy archive: {}", e),
+                source: None,
+            })?;
+
+        let index = Index::open(directory).map_err(|e| 
crate::Error::UnexpectedError {
+            message: format!("Failed to open Tantivy index from archive: {}", 
e),
+            source: None,
+        })?;
+
+        let reader = index
+            .reader_builder()
+            .reload_policy(ReloadPolicy::Manual)
+            .try_into()
+            .map_err(|e| crate::Error::UnexpectedError {
+                message: format!("Failed to create Tantivy reader: {}", e),
+                source: None,
+            })?;
+
+        Ok(Self { reader, index })
+    }
+
+    /// Search the index and return top-N results ranked by score.
+    pub fn search(&self, query_text: &str, limit: usize) -> 
crate::Result<SearchResult> {
+        let schema = self.index.schema();
+        let text_field = schema
+            .get_field("text")
+            .map_err(|_| crate::Error::UnexpectedError {
+                message: "Tantivy schema missing 'text' field".to_string(),
+                source: None,
+            })?;
+
+        let searcher = self.reader.searcher();
+        let query_parser = QueryParser::for_index(&self.index, 
vec![text_field]);
+        let query =
+            query_parser
+                .parse_query(query_text)
+                .map_err(|e| crate::Error::UnexpectedError {
+                    message: format!("Failed to parse query '{}': {}", 
query_text, e),
+                    source: None,
+                })?;
+
+        let top_docs = searcher
+            .search(&query, &TopDocs::with_limit(limit))
+            .map_err(|e| crate::Error::UnexpectedError {
+                message: format!("Tantivy search failed: {}", e),
+                source: None,
+            })?;
+
+        let mut row_ids = Vec::with_capacity(top_docs.len());
+        let mut scores = Vec::with_capacity(top_docs.len());
+
+        for (score, doc_address) in &top_docs {
+            let segment_reader = 
searcher.segment_reader(doc_address.segment_ord);
+            let fast_fields = 
segment_reader.fast_fields().u64("row_id").map_err(|e| {
+                crate::Error::UnexpectedError {
+                    message: format!("Failed to get row_id fast field: {}", e),
+                    source: None,
+                }
+            })?;
+            let row_id = fast_fields.first(doc_address.doc_id).ok_or_else(|| {
+                crate::Error::UnexpectedError {
+                    message: format!(
+                        "Missing row_id for doc_id {} in segment {}",
+                        doc_address.doc_id, doc_address.segment_ord
+                    ),
+                    source: None,
+                }
+            })?;
+            row_ids.push(row_id);
+            scores.push(*score);
+        }
+
+        Ok(SearchResult::new(row_ids, scores))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::io::FileIOBuilder;
+    use crate::tantivy::writer::TantivyFullTextWriter;
+
+    async fn create_test_reader() -> TantivyFullTextReader {
+        let file_io = FileIOBuilder::new("memory").build().unwrap();
+        let mut writer = TantivyFullTextWriter::new().unwrap();
+        writer
+            .add_document(0, Some("the quick brown fox jumps over the lazy 
dog"))
+            .unwrap();
+        writer
+            .add_document(1, Some("rust programming language"))
+            .unwrap();
+        writer
+            .add_document(2, Some("apache paimon data lake"))
+            .unwrap();
+        writer
+            .add_document(3, Some("full text search with tantivy"))
+            .unwrap();
+        writer
+            .add_document(4, Some("the fox is quick and brown"))
+            .unwrap();
+        let output = file_io.new_output("/test_reader_index.archive").unwrap();
+        writer.finish(&output).await.unwrap();
+        TantivyFullTextReader::from_input_file(&output.to_input_file())
+            .await
+            .unwrap()
+    }
+
+    #[tokio::test]
+    async fn test_search_basic() {
+        let reader = create_test_reader().await;
+        let result = reader.search("fox", 10).unwrap();
+        assert_eq!(result.len(), 2);
+        assert!(result.row_ids.contains(&0));
+        assert!(result.row_ids.contains(&4));
+    }
+
+    #[tokio::test]
+    async fn test_search_limit() {
+        let reader = create_test_reader().await;
+        let result = reader.search("fox", 1).unwrap();
+        assert_eq!(result.len(), 1);
+    }
+
+    #[tokio::test]
+    async fn test_search_no_match() {
+        let reader = create_test_reader().await;
+        let result = reader.search("nonexistent", 10).unwrap();
+        assert!(result.is_empty());
+    }
+
+    #[tokio::test]
+    async fn test_search_scored() {
+        let reader = create_test_reader().await;
+        let result = reader.search("tantivy", 10).unwrap();
+        assert_eq!(result.len(), 1);
+        assert!(result.row_ids.contains(&3));
+        assert!(result.scores[0] > 0.0);
+    }
+
+    #[tokio::test]
+    async fn test_search_with_offset() {
+        let reader = create_test_reader().await;
+        let result = reader.search("fox", 10).unwrap();
+        let offset_result = result.offset(1000);
+        assert!(offset_result.row_ids.contains(&1000));
+        assert!(offset_result.row_ids.contains(&1004));
+    }
+}
diff --git a/crates/paimon/src/tantivy/writer.rs 
b/crates/paimon/src/tantivy/writer.rs
new file mode 100644
index 0000000..a12ed30
--- /dev/null
+++ b/crates/paimon/src/tantivy/writer.rs
@@ -0,0 +1,242 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tantivy full-text index writer.
+//!
+//! Writes documents (rowId + text) to a local Tantivy index, then packs the
+//! index directory into a single archive file (Big-Endian, Java-compatible)
+//! and writes it to an `OutputFile`.
+//!
+//! Reference: 
`org.apache.paimon.tantivy.index.TantivyFullTextGlobalIndexWriter`
+
+use bytes::Bytes;
+use std::io::Read;
+
+use crate::io::OutputFile;
+use tantivy::schema::{Field, NumericOptions, Schema, TEXT};
+use tantivy::{Index, IndexWriter, TantivyDocument};
+
+/// Builds the fixed schema: `row_id` (u64 fast+stored+indexed) + `text` 
(full-text).
+fn build_schema() -> (Schema, Field, Field) {
+    let mut builder = Schema::builder();
+    let row_id_field = builder.add_u64_field(
+        "row_id",
+        NumericOptions::default()
+            .set_stored()
+            .set_indexed()
+            .set_fast(),
+    );
+    let text_field = builder.add_text_field("text", TEXT);
+    (builder.build(), row_id_field, text_field)
+}
+
+/// Writer for creating a Tantivy full-text index and packing it into an 
archive.
+pub struct TantivyFullTextWriter {
+    writer: IndexWriter,
+    row_id_field: Field,
+    text_field: Field,
+    temp_dir: tempfile::TempDir,
+    row_count: u64,
+}
+
+impl TantivyFullTextWriter {
+    /// Create a new writer. The index is built in a temporary directory.
+    pub fn new() -> crate::Result<Self> {
+        let temp_dir = tempfile::tempdir().map_err(|e| 
crate::Error::UnexpectedError {
+            message: format!("Failed to create temp directory for Tantivy 
index: {}", e),
+            source: None,
+        })?;
+
+        let (schema, row_id_field, text_field) = build_schema();
+        let index = Index::create_in_dir(temp_dir.path(), schema).map_err(|e| {
+            crate::Error::UnexpectedError {
+                message: format!("Failed to create Tantivy index: {}", e),
+                source: None,
+            }
+        })?;
+        let writer = index
+            .writer(50_000_000)
+            .map_err(|e| crate::Error::UnexpectedError {
+                message: format!("Failed to create Tantivy writer: {}", e),
+                source: None,
+            })?;
+
+        Ok(Self {
+            writer,
+            row_id_field,
+            text_field,
+            temp_dir,
+            row_count: 0,
+        })
+    }
+
+    /// Add a document with the given row ID and text content.
+    /// If text is None, the row ID is still incremented (null value).
+    pub fn add_document(&mut self, row_id: u64, text: Option<&str>) -> 
crate::Result<()> {
+        if let Some(text) = text {
+            let mut doc = TantivyDocument::new();
+            doc.add_u64(self.row_id_field, row_id);
+            doc.add_text(self.text_field, text);
+            self.writer
+                .add_document(doc)
+                .map_err(|e| crate::Error::UnexpectedError {
+                    message: format!("Failed to add document: {}", e),
+                    source: None,
+                })?;
+        }
+        self.row_count += 1;
+        Ok(())
+    }
+
+    /// Commit, pack the index into an archive, and write it to the given 
`OutputFile`.
+    ///
+    /// Returns `false` if no documents were written (nothing is written to 
the file).
+    ///
+    /// Reference: `TantivyFullTextGlobalIndexWriter.packIndex()`
+    pub async fn finish(mut self, output: &OutputFile) -> crate::Result<bool> {
+        if self.row_count == 0 {
+            return Ok(false);
+        }
+
+        self.writer
+            .commit()
+            .map_err(|e| crate::Error::UnexpectedError {
+                message: format!("Failed to commit Tantivy index: {}", e),
+                source: None,
+            })?;
+
+        // Drop the writer to release file locks before packing.
+        drop(self.writer);
+
+        // Stream the archive directly to the OutputFile.
+        let mut file_writer = output.writer().await?;
+
+        // Collect file entries from the temp directory.
+        let mut entries: Vec<(String, std::path::PathBuf)> = Vec::new();
+        for entry in
+            std::fs::read_dir(self.temp_dir.path()).map_err(|e| 
crate::Error::UnexpectedError {
+                message: format!("Failed to read Tantivy index directory: {}", 
e),
+                source: None,
+            })?
+        {
+            let entry = entry.map_err(|e| crate::Error::UnexpectedError {
+                message: format!("Failed to read directory entry: {}", e),
+                source: None,
+            })?;
+            if entry.file_type().map(|t| t.is_file()).unwrap_or(false) {
+                let name = entry.file_name().to_string_lossy().to_string();
+                entries.push((name, entry.path()));
+            }
+        }
+
+        // Write file count (4 bytes BE).
+        file_writer
+            .write(Bytes::from((entries.len() as i32).to_be_bytes().to_vec()))
+            .await?;
+
+        // Write each file: name_len + name + data_len + data (chunked).
+        const CHUNK_SIZE: usize = 4 * 1024 * 1024; // 4 MiB
+        for (name, path) in &entries {
+            let name_bytes = name.as_bytes();
+            file_writer
+                .write(Bytes::from(
+                    (name_bytes.len() as i32).to_be_bytes().to_vec(),
+                ))
+                .await?;
+            file_writer.write(Bytes::from(name_bytes.to_vec())).await?;
+
+            let file_len = std::fs::metadata(path)
+                .map_err(|e| crate::Error::UnexpectedError {
+                    message: format!("Failed to stat index file '{}': {}", 
name, e),
+                    source: None,
+                })?
+                .len();
+            file_writer
+                .write(Bytes::from((file_len as i64).to_be_bytes().to_vec()))
+                .await?;
+
+            let mut file =
+                std::fs::File::open(path).map_err(|e| 
crate::Error::UnexpectedError {
+                    message: format!("Failed to open index file '{}': {}", 
name, e),
+                    source: None,
+                })?;
+            let mut buf = vec![0u8; CHUNK_SIZE];
+            loop {
+                let n = file
+                    .read(&mut buf)
+                    .map_err(|e| crate::Error::UnexpectedError {
+                        message: format!("Failed to read index file '{}': {}", 
name, e),
+                        source: None,
+                    })?;
+                if n == 0 {
+                    break;
+                }
+                file_writer.write(Bytes::copy_from_slice(&buf[..n])).await?;
+            }
+        }
+
+        file_writer.close().await?;
+        Ok(true)
+    }
+
+    /// Number of rows processed (including nulls).
+    pub fn row_count(&self) -> u64 {
+        self.row_count
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::io::FileIOBuilder;
+    use crate::tantivy::reader::TantivyFullTextReader;
+
+    #[tokio::test]
+    async fn test_write_and_read_roundtrip() {
+        let file_io = FileIOBuilder::new("memory").build().unwrap();
+
+        let mut writer = TantivyFullTextWriter::new().unwrap();
+        writer.add_document(0, Some("hello world")).unwrap();
+        writer.add_document(1, Some("foo bar baz")).unwrap();
+        writer.add_document(2, None).unwrap();
+        writer.add_document(3, Some("hello again")).unwrap();
+
+        let output = file_io.new_output("/test_index.archive").unwrap();
+        let written = writer.finish(&output).await.unwrap();
+        assert!(written);
+
+        let input = output.to_input_file();
+        let reader = TantivyFullTextReader::from_input_file(&input)
+            .await
+            .unwrap();
+        let result = reader.search("hello", 10).unwrap();
+        assert_eq!(result.len(), 2);
+        assert!(result.row_ids.contains(&0));
+        assert!(result.row_ids.contains(&3));
+    }
+
+    #[tokio::test]
+    async fn test_empty_writer() {
+        let file_io = FileIOBuilder::new("memory").build().unwrap();
+        let output = file_io.new_output("/empty_index.archive").unwrap();
+
+        let writer = TantivyFullTextWriter::new().unwrap();
+        let written = writer.finish(&output).await.unwrap();
+        assert!(!written);
+        assert!(!output.exists().await.unwrap());
+    }
+}

Reply via email to