This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new e0d1f69 feat(tantivy): Add Tantivy full-text search with on-demand
archive reading (#231)
e0d1f69 is described below
commit e0d1f6913ed6ddbacdf4a24dd6c599e1266b193e
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Apr 9 19:56:37 2026 +0800
feat(tantivy): Add Tantivy full-text search with on-demand archive reading
(#231)
Introduce a complete Tantivy-based full-text search pipeline for global
indexes, with on-demand I/O throughout:
- ArchiveDirectory: reads only the archive header eagerly; file data is
loaded via async FileRead when Tantivy requests it (sync-to-async
bridge using std::thread::scope).
- TantivyFullTextWriter: streams the packed archive directly to an
OutputFile instead of buffering in memory.
- TantivyFullTextReader: opens from InputFile/FileRead, never loads the
full archive into memory.
- FullTextSearchBuilder: self-contained builder on Table that reads the
index manifest, evaluates searches against multiple Tantivy indexes
in parallel (try_join_all), and returns ScoredGlobalIndexResult.
- ScoredGlobalIndexResult + bitmap_to_ranges moved to table/source.rs
(alongside RowRange) so vector search can reuse them later.
- TableScan.with_row_ranges(): generic row-range filtering, decoupled
from full-text specifics.
- DataFusion full_text_search UDTF integration with test data.
---
.github/workflows/ci.yml | 6 +-
crates/integrations/datafusion/Cargo.toml | 6 +
.../datafusion/src/full_text_search.rs | 256 +++++++++++++
crates/integrations/datafusion/src/lib.rs | 4 +
crates/integrations/datafusion/src/table/mod.rs | 92 ++---
.../testdata/test_tantivy_fulltext.tar.gz | Bin 0 -> 6121 bytes
.../integrations/datafusion/tests/read_tables.rs | 123 +++++++
crates/paimon/Cargo.toml | 5 +-
crates/paimon/src/lib.rs | 2 +
.../paimon/src/table/full_text_search_builder.rs | 207 +++++++++++
crates/paimon/src/table/mod.rs | 12 +
crates/paimon/src/table/table_scan.rs | 13 +
crates/paimon/src/tantivy/directory.rs | 394 +++++++++++++++++++++
crates/paimon/src/tantivy/full_text_search.rs | 176 +++++++++
crates/paimon/src/{lib.rs => tantivy/mod.rs} | 32 +-
crates/paimon/src/tantivy/reader.rs | 198 +++++++++++
crates/paimon/src/tantivy/writer.rs | 242 +++++++++++++
17 files changed, 1697 insertions(+), 71 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index ad308be..4ff2528 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -53,7 +53,7 @@ jobs:
run: cargo fmt --all -- --check
- name: Clippy
- run: cargo clippy --all-targets --workspace -- -D warnings
+ run: cargo clippy --all-targets --workspace --features fulltext -- -D
warnings
build:
runs-on: ${{ matrix.os }}
@@ -66,7 +66,7 @@ jobs:
steps:
- uses: actions/checkout@v6
- name: Build
- run: cargo build
+ run: cargo build --features fulltext
unit:
runs-on: ${{ matrix.os }}
@@ -80,7 +80,7 @@ jobs:
- uses: actions/checkout@v6
- name: Test
- run: cargo test -p paimon --all-targets
+ run: cargo test -p paimon --all-targets --features fulltext
env:
RUST_LOG: DEBUG
RUST_BACKTRACE: full
diff --git a/crates/integrations/datafusion/Cargo.toml
b/crates/integrations/datafusion/Cargo.toml
index 370279c..ddfa2c0 100644
--- a/crates/integrations/datafusion/Cargo.toml
+++ b/crates/integrations/datafusion/Cargo.toml
@@ -19,6 +19,7 @@
name = "paimon-datafusion"
edition.workspace = true
version.workspace = true
+exclude = ["testdata/"]
license.workspace = true
homepage = "https://paimon.apache.org/docs/rust/datafusion/"
documentation = "https://docs.rs/paimon-datafusion"
@@ -26,6 +27,9 @@ description = "Apache Paimon DataFusion Integration"
categories = ["database"]
keywords = ["paimon", "datafusion", "integrations"]
+[features]
+fulltext = ["paimon/fulltext"]
+
[dependencies]
async-trait = "0.1"
chrono = "0.4"
@@ -37,8 +41,10 @@ tokio = { workspace = true, features = ["rt", "time", "fs"] }
[dev-dependencies]
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
+flate2 = "1"
parquet = { workspace = true }
serde = "1"
serde_json = "1"
+tar = "0.4"
tempfile = "3"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
diff --git a/crates/integrations/datafusion/src/full_text_search.rs
b/crates/integrations/datafusion/src/full_text_search.rs
new file mode 100644
index 0000000..8d4c037
--- /dev/null
+++ b/crates/integrations/datafusion/src/full_text_search.rs
@@ -0,0 +1,256 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! `full_text_search` table-valued function for DataFusion.
+//!
+//! Usage:
+//! ```sql
+//! SELECT * FROM full_text_search('table_name', 'column_name', 'query text',
10)
+//! ```
+//!
+//! Reference:
[PaimonTableValuedFunctions.scala](https://github.com/apache/paimon/blob/master/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonTableValuedFunctions.scala)
+
+use std::any::Any;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
+use datafusion::catalog::Session;
+use datafusion::catalog::TableFunctionImpl;
+use datafusion::datasource::{TableProvider, TableType};
+use datafusion::error::Result as DFResult;
+use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::SessionContext;
+use paimon::catalog::{Catalog, Identifier};
+
+use crate::error::to_datafusion_error;
+use crate::runtime::{await_with_runtime, block_on_with_runtime};
+use crate::table::{build_paimon_scan, PaimonTableProvider};
+
+/// Register the `full_text_search` table-valued function on a
[`SessionContext`].
+pub fn register_full_text_search(
+ ctx: &SessionContext,
+ catalog: Arc<dyn Catalog>,
+ default_database: &str,
+) {
+ ctx.register_udtf(
+ "full_text_search",
+ Arc::new(FullTextSearchFunction::new(catalog, default_database)),
+ );
+}
+
+/// Table function that performs full-text search on a Paimon table.
+///
+/// Arguments: `(table_name STRING, column_name STRING, query_text STRING,
limit INT)`
+pub struct FullTextSearchFunction {
+ catalog: Arc<dyn Catalog>,
+ default_database: String,
+}
+
+impl Debug for FullTextSearchFunction {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("FullTextSearchFunction")
+ .field("default_database", &self.default_database)
+ .finish()
+ }
+}
+
+impl FullTextSearchFunction {
+ pub fn new(catalog: Arc<dyn Catalog>, default_database: &str) -> Self {
+ Self {
+ catalog,
+ default_database: default_database.to_string(),
+ }
+ }
+}
+
+impl TableFunctionImpl for FullTextSearchFunction {
+ fn call(&self, args: &[Expr]) -> DFResult<Arc<dyn TableProvider>> {
+ if args.len() != 4 {
+ return Err(datafusion::error::DataFusionError::Plan(
+ "full_text_search requires 4 arguments: (table_name,
column_name, query_text, limit)".to_string(),
+ ));
+ }
+
+ let table_name = extract_string_literal(&args[0], "table_name")?;
+ let column_name = extract_string_literal(&args[1], "column_name")?;
+ let query_text = extract_string_literal(&args[2], "query_text")?;
+ let limit = extract_int_literal(&args[3], "limit")?;
+
+ if limit <= 0 {
+ return Err(datafusion::error::DataFusionError::Plan(
+ "full_text_search: limit must be positive".to_string(),
+ ));
+ }
+
+ let identifier = parse_table_identifier(&table_name,
&self.default_database)?;
+
+ let catalog = Arc::clone(&self.catalog);
+ let table = block_on_with_runtime(
+ async move { catalog.get_table(&identifier).await },
+ "full_text_search: catalog access thread panicked",
+ )
+ .map_err(to_datafusion_error)?;
+
+ let inner = PaimonTableProvider::try_new(table)?;
+
+ Ok(Arc::new(FullTextSearchTableProvider {
+ inner,
+ column_name,
+ query_text,
+ limit: limit as usize,
+ }))
+ }
+}
+
+/// A wrapper around [`PaimonTableProvider`] that injects full-text search
+/// row filtering into the scan path.
+#[derive(Debug)]
+struct FullTextSearchTableProvider {
+ inner: PaimonTableProvider,
+ column_name: String,
+ query_text: String,
+ limit: usize,
+}
+
+#[async_trait]
+impl TableProvider for FullTextSearchTableProvider {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> ArrowSchemaRef {
+ self.inner.schema()
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::Base
+ }
+
+ async fn scan(
+ &self,
+ state: &dyn Session,
+ projection: Option<&Vec<usize>>,
+ _filters: &[Expr],
+ limit: Option<usize>,
+ ) -> DFResult<Arc<dyn ExecutionPlan>> {
+ let table = self.inner.table();
+
+ // Use FullTextSearchBuilder to execute the search.
+ let row_ranges = await_with_runtime(async {
+ let mut builder = table.new_full_text_search_builder();
+ builder
+ .with_text_column(&self.column_name)
+ .with_query_text(&self.query_text)
+ .with_limit(self.limit);
+ builder.execute().await.map_err(to_datafusion_error)
+ })
+ .await?;
+
+ // Convert search results to row ranges and inject into the scan.
+ let mut read_builder = table.new_read_builder();
+ if let Some(limit) = limit {
+ read_builder.with_limit(limit);
+ }
+ let scan = if row_ranges.is_empty() {
+ read_builder.new_scan()
+ } else {
+ read_builder.new_scan().with_row_ranges(row_ranges)
+ };
+ let plan = await_with_runtime(scan.plan())
+ .await
+ .map_err(to_datafusion_error)?;
+
+ let target = state.config_options().execution.target_partitions;
+ build_paimon_scan(
+ table,
+ &self.schema(),
+ &plan,
+ projection,
+ None,
+ limit,
+ target,
+ )
+ }
+
+ fn supports_filters_pushdown(
+ &self,
+ filters: &[&Expr],
+ ) -> DFResult<Vec<TableProviderFilterPushDown>> {
+ Ok(vec![
+ TableProviderFilterPushDown::Unsupported;
+ filters.len()
+ ])
+ }
+}
+
+fn extract_string_literal(expr: &Expr, name: &str) -> DFResult<String> {
+ match expr {
+ Expr::Literal(scalar, _) => {
+ let s = scalar.try_as_str().flatten().ok_or_else(|| {
+ datafusion::error::DataFusionError::Plan(format!(
+ "full_text_search: {name} must be a string literal, got:
{expr}"
+ ))
+ })?;
+ Ok(s.to_string())
+ }
+ _ => Err(datafusion::error::DataFusionError::Plan(format!(
+ "full_text_search: {name} must be a literal, got: {expr}"
+ ))),
+ }
+}
+
+fn extract_int_literal(expr: &Expr, name: &str) -> DFResult<i64> {
+ use datafusion::common::ScalarValue;
+ match expr {
+ Expr::Literal(scalar, _) => match scalar {
+ ScalarValue::Int8(Some(v)) => Ok(*v as i64),
+ ScalarValue::Int16(Some(v)) => Ok(*v as i64),
+ ScalarValue::Int32(Some(v)) => Ok(*v as i64),
+ ScalarValue::Int64(Some(v)) => Ok(*v),
+ ScalarValue::UInt8(Some(v)) => Ok(*v as i64),
+ ScalarValue::UInt16(Some(v)) => Ok(*v as i64),
+ ScalarValue::UInt32(Some(v)) => Ok(*v as i64),
+ ScalarValue::UInt64(Some(v)) => i64::try_from(*v).map_err(|_| {
+ datafusion::error::DataFusionError::Plan(format!(
+ "full_text_search: {name} value {v} exceeds i64 range"
+ ))
+ }),
+ _ => Err(datafusion::error::DataFusionError::Plan(format!(
+ "full_text_search: {name} must be an integer literal, got:
{expr}"
+ ))),
+ },
+ _ => Err(datafusion::error::DataFusionError::Plan(format!(
+ "full_text_search: {name} must be a literal, got: {expr}"
+ ))),
+ }
+}
+
+fn parse_table_identifier(name: &str, default_database: &str) ->
DFResult<Identifier> {
+ let parts: Vec<&str> = name.split('.').collect();
+ match parts.len() {
+ 1 => Ok(Identifier::new(default_database, parts[0])),
+ 2 => Ok(Identifier::new(parts[0], parts[1])),
+ // 3-part name: catalog.database.table — ignore catalog prefix
+ 3 => Ok(Identifier::new(parts[1], parts[2])),
+ _ => Err(datafusion::error::DataFusionError::Plan(format!(
+ "full_text_search: invalid table name '{name}', expected 'table',
'database.table', or 'catalog.database.table'"
+ ))),
+ }
+}
diff --git a/crates/integrations/datafusion/src/lib.rs
b/crates/integrations/datafusion/src/lib.rs
index 8454bf7..abcf744 100644
--- a/crates/integrations/datafusion/src/lib.rs
+++ b/crates/integrations/datafusion/src/lib.rs
@@ -39,6 +39,8 @@
mod catalog;
mod error;
mod filter_pushdown;
+#[cfg(feature = "fulltext")]
+mod full_text_search;
mod physical_plan;
mod relation_planner;
pub mod runtime;
@@ -46,6 +48,8 @@ mod table;
pub use catalog::{PaimonCatalogProvider, PaimonSchemaProvider};
pub use error::to_datafusion_error;
+#[cfg(feature = "fulltext")]
+pub use full_text_search::{register_full_text_search, FullTextSearchFunction};
pub use physical_plan::PaimonTableScan;
pub use relation_planner::PaimonRelationPlanner;
pub use table::PaimonTableProvider;
diff --git a/crates/integrations/datafusion/src/table/mod.rs
b/crates/integrations/datafusion/src/table/mod.rs
index fdc4f0e..65eb07f 100644
--- a/crates/integrations/datafusion/src/table/mod.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -74,7 +74,7 @@ impl PaimonTableProvider {
}
/// Distribute `items` into `num_buckets` groups using round-robin assignment.
-fn bucket_round_robin<T>(items: Vec<T>, num_buckets: usize) -> Vec<Vec<T>> {
+pub(crate) fn bucket_round_robin<T>(items: Vec<T>, num_buckets: usize) ->
Vec<Vec<T>> {
let mut buckets: Vec<Vec<T>> = (0..num_buckets).map(|_|
Vec::new()).collect();
for (i, item) in items.into_iter().enumerate() {
buckets[i % num_buckets].push(item);
@@ -82,6 +82,49 @@ fn bucket_round_robin<T>(items: Vec<T>, num_buckets: usize)
-> Vec<Vec<T>> {
buckets
}
+/// Build a [`PaimonTableScan`] from a planned [`paimon::table::Plan`].
+///
+/// Shared by [`PaimonTableProvider`] and the full-text search UDTF to avoid
+/// duplicating projection, partition distribution, and scan construction.
+pub(crate) fn build_paimon_scan(
+ table: &Table,
+ schema: &ArrowSchemaRef,
+ plan: &paimon::table::Plan,
+ projection: Option<&Vec<usize>>,
+ pushed_predicate: Option<paimon::spec::Predicate>,
+ limit: Option<usize>,
+ target_partitions: usize,
+) -> DFResult<Arc<dyn ExecutionPlan>> {
+ let (projected_schema, projected_columns) = if let Some(indices) =
projection {
+ let fields: Vec<Field> = indices.iter().map(|&i|
schema.field(i).clone()).collect();
+ let column_names: Vec<String> = fields.iter().map(|f|
f.name().clone()).collect();
+ (Arc::new(Schema::new(fields)), Some(column_names))
+ } else {
+ let column_names: Vec<String> = schema.fields().iter().map(|f|
f.name().clone()).collect();
+ (schema.clone(), Some(column_names))
+ };
+
+ let splits = plan.splits().to_vec();
+ let planned_partitions: Vec<Arc<[_]>> = if splits.is_empty() {
+ vec![Arc::from(Vec::new())]
+ } else {
+ let num_partitions = splits.len().min(target_partitions.max(1));
+ bucket_round_robin(splits, num_partitions)
+ .into_iter()
+ .map(Arc::from)
+ .collect()
+ };
+
+ Ok(Arc::new(PaimonTableScan::new(
+ projected_schema,
+ table.clone(),
+ projected_columns,
+ pushed_predicate,
+ planned_partitions,
+ limit,
+ )))
+}
+
#[async_trait]
impl TableProvider for PaimonTableProvider {
fn as_any(&self) -> &dyn Any {
@@ -103,23 +146,6 @@ impl TableProvider for PaimonTableProvider {
filters: &[Expr],
limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
- let (projected_schema, projected_columns) = if let Some(indices) =
projection {
- let fields: Vec<Field> = indices
- .iter()
- .map(|&i| self.schema.field(i).clone())
- .collect();
- let column_names: Vec<String> = fields.iter().map(|f|
f.name().clone()).collect();
- (Arc::new(Schema::new(fields)), Some(column_names))
- } else {
- let column_names: Vec<String> = self
- .schema
- .fields()
- .iter()
- .map(|f| f.name().clone())
- .collect();
- (self.schema.clone(), Some(column_names))
- };
-
// Plan splits eagerly so we know partition count upfront.
let pushed_predicate = build_pushed_predicate(filters,
self.table.schema().fields());
let mut read_builder = self.table.new_read_builder();
@@ -140,30 +166,16 @@ impl TableProvider for PaimonTableProvider {
.await
.map_err(to_datafusion_error)?;
- // Distribute splits across DataFusion partitions, capped by the
- // session's target_partitions to avoid over-sharding with many small
splits.
- // Each partition's splits are wrapped in Arc to avoid deep-cloning in
execute().
- let splits = plan.splits().to_vec();
- let planned_partitions: Vec<Arc<[_]>> = if splits.is_empty() {
- // Empty plans get a single empty partition to avoid 0-partition
edge cases.
- vec![Arc::from(Vec::new())]
- } else {
- let target = state.config_options().execution.target_partitions;
- let num_partitions = splits.len().min(target.max(1));
- bucket_round_robin(splits, num_partitions)
- .into_iter()
- .map(Arc::from)
- .collect()
- };
-
- Ok(Arc::new(PaimonTableScan::new(
- projected_schema,
- self.table.clone(),
- projected_columns,
+ let target = state.config_options().execution.target_partitions;
+ build_paimon_scan(
+ &self.table,
+ &self.schema,
+ &plan,
+ projection,
pushed_predicate,
- planned_partitions,
limit,
- )))
+ target,
+ )
}
fn supports_filters_pushdown(
diff --git
a/crates/integrations/datafusion/testdata/test_tantivy_fulltext.tar.gz
b/crates/integrations/datafusion/testdata/test_tantivy_fulltext.tar.gz
new file mode 100644
index 0000000..3fb2f07
Binary files /dev/null and
b/crates/integrations/datafusion/testdata/test_tantivy_fulltext.tar.gz differ
diff --git a/crates/integrations/datafusion/tests/read_tables.rs
b/crates/integrations/datafusion/tests/read_tables.rs
index 5a20712..4fc26d6 100644
--- a/crates/integrations/datafusion/tests/read_tables.rs
+++ b/crates/integrations/datafusion/tests/read_tables.rs
@@ -754,3 +754,126 @@ async fn test_filter_row_id_from_data_evolution_table() {
}
}
}
+
+// ======================= Full-Text Search Tests =======================
+
+#[cfg(feature = "fulltext")]
+mod fulltext_tests {
+ use std::sync::Arc;
+
+ use datafusion::arrow::array::{Int32Array, StringArray};
+ use datafusion::prelude::SessionContext;
+ use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options};
+ use paimon_datafusion::{register_full_text_search, PaimonCatalogProvider};
+
+ /// Extract the bundled tar.gz into a temp dir and return (tempdir,
warehouse_path).
+ fn extract_test_warehouse() -> (tempfile::TempDir, String) {
+ let archive_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
+ .join("testdata/test_tantivy_fulltext.tar.gz");
+ let file = std::fs::File::open(&archive_path)
+ .unwrap_or_else(|e| panic!("Failed to open {}: {e}",
archive_path.display()));
+ let decoder = flate2::read::GzDecoder::new(file);
+ let mut archive = tar::Archive::new(decoder);
+
+ let tmp = tempfile::tempdir().expect("Failed to create temp dir");
+ let db_dir = tmp.path().join("default.db");
+ std::fs::create_dir_all(&db_dir).unwrap();
+ archive.unpack(&db_dir).unwrap();
+
+ let warehouse = format!("file://{}", tmp.path().display());
+ (tmp, warehouse)
+ }
+
+ async fn create_fulltext_context() -> (SessionContext, tempfile::TempDir) {
+ let (tmp, warehouse) = extract_test_warehouse();
+ let mut options = Options::new();
+ options.set(CatalogOptions::WAREHOUSE, warehouse);
+ let catalog = FileSystemCatalog::new(options).expect("Failed to create
catalog");
+ let catalog: Arc<dyn Catalog> = Arc::new(catalog);
+
+ let ctx = SessionContext::new();
+ ctx.register_catalog(
+ "paimon",
+ Arc::new(PaimonCatalogProvider::new(Arc::clone(&catalog))),
+ );
+ register_full_text_search(&ctx, catalog, "default");
+ (ctx, tmp)
+ }
+
+ fn extract_id_content_rows(
+ batches: &[datafusion::arrow::record_batch::RecordBatch],
+ ) -> Vec<(i32, String)> {
+ let mut rows = Vec::new();
+ for batch in batches {
+ let id_array = batch
+ .column_by_name("id")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .expect("Expected Int32Array for id");
+ let content_array = batch
+ .column_by_name("content")
+ .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+ .expect("Expected StringArray for content");
+ for i in 0..batch.num_rows() {
+ rows.push((id_array.value(i),
content_array.value(i).to_string()));
+ }
+ }
+ rows.sort_by_key(|(id, _)| *id);
+ rows
+ }
+
+ /// Search for 'paimon' — rows 0, 2, 4 mention "paimon".
+ #[tokio::test]
+ async fn test_full_text_search_paimon() {
+ let (ctx, _tmp) = create_fulltext_context().await;
+ let batches = ctx
+ .sql("SELECT id, content FROM
full_text_search('paimon.default.test_tantivy_fulltext', 'content', 'paimon',
10)")
+ .await
+ .expect("SQL should parse")
+ .collect()
+ .await
+ .expect("query should execute");
+
+ let rows = extract_id_content_rows(&batches);
+ let ids: Vec<i32> = rows.iter().map(|(id, _)| *id).collect();
+ assert_eq!(
+ ids,
+ vec![0, 2, 4],
+ "Searching 'paimon' should match rows 0, 2, 4"
+ );
+ }
+
+ /// Search for 'tantivy' — only row 1.
+ #[tokio::test]
+ async fn test_full_text_search_tantivy() {
+ let (ctx, _tmp) = create_fulltext_context().await;
+ let batches = ctx
+ .sql("SELECT id, content FROM
full_text_search('paimon.default.test_tantivy_fulltext', 'content', 'tantivy',
10)")
+ .await
+ .expect("SQL should parse")
+ .collect()
+ .await
+ .expect("query should execute");
+
+ let rows = extract_id_content_rows(&batches);
+ let ids: Vec<i32> = rows.iter().map(|(id, _)| *id).collect();
+ assert_eq!(ids, vec![1], "Searching 'tantivy' should match row 1");
+ }
+
+ /// Search for 'search' — rows 1, 3 mention "full-text search".
+ #[tokio::test]
+ async fn test_full_text_search_search() {
+ let (ctx, _tmp) = create_fulltext_context().await;
+ let batches = ctx
+ .sql("SELECT id, content FROM
full_text_search('paimon.default.test_tantivy_fulltext', 'content', 'search',
10)")
+ .await
+ .expect("SQL should parse")
+ .collect()
+ .await
+ .expect("query should execute");
+
+ let rows = extract_id_content_rows(&batches);
+ let ids: Vec<i32> = rows.iter().map(|(id, _)| *id).collect();
+ assert!(ids.contains(&1), "Searching 'search' should match row 1");
+ assert!(ids.contains(&3), "Searching 'search' should match row 3");
+ }
+}
diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index 682adab..10e5fa0 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -20,6 +20,7 @@ categories = ["database"]
description = "The rust implementation of Apache Paimon"
documentation = "https://docs.rs/paimon"
name = "paimon"
+exclude = ["testdata/"]
homepage.workspace = true
repository.workspace = true
@@ -30,6 +31,7 @@ version.workspace = true
[features]
default = ["storage-memory", "storage-fs", "storage-oss"]
storage-all = ["storage-memory", "storage-fs", "storage-oss", "storage-s3"]
+fulltext = ["tantivy", "tempfile"]
storage-memory = ["opendal/services-memory"]
storage-fs = ["opendal/services-fs"]
@@ -77,9 +79,10 @@ md-5 = "0.10"
regex = "1"
uuid = { version = "1", features = ["v4"] }
urlencoding = "2.1"
+tantivy = { version = "0.22", optional = true }
+tempfile = { version = "3", optional = true }
[dev-dependencies]
axum = { version = "0.7", features = ["macros", "tokio", "http1", "http2"] }
rand = "0.8.5"
-
tempfile = "3"
diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs
index f2deafe..a68ba6b 100644
--- a/crates/paimon/src/lib.rs
+++ b/crates/paimon/src/lib.rs
@@ -34,6 +34,8 @@ pub mod io;
mod predicate_stats;
pub mod spec;
pub mod table;
+#[cfg(feature = "fulltext")]
+pub mod tantivy;
pub use catalog::Catalog;
pub use catalog::CatalogFactory;
diff --git a/crates/paimon/src/table/full_text_search_builder.rs
b/crates/paimon/src/table/full_text_search_builder.rs
new file mode 100644
index 0000000..783bd57
--- /dev/null
+++ b/crates/paimon/src/table/full_text_search_builder.rs
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Full-text search builder for Paimon tables.
+//!
+//! Reference:
[FullTextSearchBuilderImpl.java](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextSearchBuilderImpl.java)
+
+use crate::spec::{DataField, FileKind, IndexManifest};
+use crate::table::snapshot_manager::SnapshotManager;
+use crate::table::{RowRange, Table};
+use crate::tantivy::full_text_search::{FullTextSearch, SearchResult};
+use crate::tantivy::reader::TantivyFullTextReader;
+
+const INDEX_DIR: &str = "index";
+const TANTIVY_FULLTEXT_INDEX_TYPE: &str = "tantivy-fulltext";
+
+/// Builder for executing full-text search on a Paimon table.
+///
+/// Usage:
+/// ```ignore
+/// let result = table.new_full_text_search_builder()
+/// .with_text_column("content")
+/// .with_query_text("hello world")
+/// .with_limit(10)
+/// .execute()
+/// .await?;
+/// ```
+///
+/// Reference: `org.apache.paimon.table.source.FullTextSearchBuilder`
+pub struct FullTextSearchBuilder<'a> {
+ table: &'a Table,
+ text_column: Option<String>,
+ query_text: Option<String>,
+ limit: Option<usize>,
+}
+
+impl<'a> FullTextSearchBuilder<'a> {
+ pub(crate) fn new(table: &'a Table) -> Self {
+ Self {
+ table,
+ text_column: None,
+ query_text: None,
+ limit: None,
+ }
+ }
+
+ /// Set the text column to search.
+ pub fn with_text_column(&mut self, name: &str) -> &mut Self {
+ self.text_column = Some(name.to_string());
+ self
+ }
+
+ /// Set the query text to search for.
+ pub fn with_query_text(&mut self, query: &str) -> &mut Self {
+ self.query_text = Some(query.to_string());
+ self
+ }
+
+ /// Set the top-k limit for results.
+ pub fn with_limit(&mut self, limit: usize) -> &mut Self {
+ self.limit = Some(limit);
+ self
+ }
+
+ /// Execute the full-text search and return row ranges.
+ ///
+ /// This reads the latest snapshot, loads the index manifest, and evaluates
+ /// the search against Tantivy indexes.
+ ///
+ /// Reference: `FullTextSearchBuilder.executeLocal()`
+ pub async fn execute(&self) -> crate::Result<Vec<RowRange>> {
+ let text_column =
+ self.text_column
+ .as_deref()
+ .ok_or_else(|| crate::Error::ConfigInvalid {
+ message: "Text column must be set via
with_text_column()".to_string(),
+ })?;
+ let query_text = self
+ .query_text
+ .as_deref()
+ .ok_or_else(|| crate::Error::ConfigInvalid {
+ message: "Query text must be set via
with_query_text()".to_string(),
+ })?;
+ let limit = self.limit.ok_or_else(|| crate::Error::ConfigInvalid {
+ message: "Limit must be set via with_limit()".to_string(),
+ })?;
+
+ let search = FullTextSearch::new(query_text.to_string(), limit,
text_column.to_string())?;
+
+ let snapshot_manager = SnapshotManager::new(
+ self.table.file_io().clone(),
+ self.table.location().to_string(),
+ );
+
+ let snapshot = match snapshot_manager.get_latest_snapshot().await? {
+ Some(s) => s,
+ None => return Ok(Vec::new()),
+ };
+
+ let index_manifest_name = match snapshot.index_manifest() {
+ Some(name) => name.to_string(),
+ None => return Ok(Vec::new()),
+ };
+
+ let manifest_path = format!(
+ "{}/manifest/{}",
+ self.table.location().trim_end_matches('/'),
+ index_manifest_name
+ );
+ let index_entries = IndexManifest::read(self.table.file_io(),
&manifest_path).await?;
+
+ evaluate_full_text_search(
+ self.table.file_io(),
+ self.table.location(),
+ &index_entries,
+ &search,
+ self.table.schema().fields(),
+ )
+ .await
+ }
+}
+
+/// Evaluate a full-text search query against Tantivy indexes found in the
index manifest.
+async fn evaluate_full_text_search(
+ file_io: &crate::io::FileIO,
+ table_path: &str,
+ index_entries: &[crate::spec::IndexManifestEntry],
+ search: &FullTextSearch,
+ schema_fields: &[DataField],
+) -> crate::Result<Vec<RowRange>> {
+ let table_path = table_path.trim_end_matches('/');
+
+ let field_id = match find_field_id_by_name(schema_fields,
&search.field_name) {
+ Some(id) => id,
+ None => return Ok(Vec::new()),
+ };
+
+ // Collect tantivy fulltext entries for the target field.
+ let fulltext_entries: Vec<_> = index_entries
+ .iter()
+ .filter(|e| {
+ e.kind == FileKind::Add
+ && e.index_file.index_type == TANTIVY_FULLTEXT_INDEX_TYPE
+ && e.index_file
+ .global_index_meta
+ .as_ref()
+ .is_some_and(|m| m.index_field_id == field_id)
+ })
+ .collect();
+
+ if fulltext_entries.is_empty() {
+ return Ok(Vec::new());
+ }
+
+ let futures: Vec<_> = fulltext_entries
+ .into_iter()
+ .map(|entry| {
+ let global_meta =
entry.index_file.global_index_meta.as_ref().unwrap();
+ let path = format!("{table_path}/{INDEX_DIR}/{}",
entry.index_file.file_name);
+ let file_name = entry.index_file.file_name.clone();
+ let query_text = search.query_text.clone();
+ let limit = search.limit;
+ let row_range_start = global_meta.row_range_start;
+ let input = file_io.new_input(&path);
+ async move {
+ let input = input?;
+ let reader = TantivyFullTextReader::from_input_file(&input)
+ .await
+ .map_err(|e| crate::Error::UnexpectedError {
+ message: format!(
+ "Failed to open Tantivy full-text index '{}': {}",
+ file_name, e
+ ),
+ source: None,
+ })?;
+ let result = reader.search(&query_text, limit)?;
+ Ok::<_, crate::Error>(result.offset(row_range_start))
+ }
+ })
+ .collect();
+
+ let results = futures::future::try_join_all(futures).await?;
+ let mut merged = SearchResult::empty();
+ for r in &results {
+ merged = merged.or(r);
+ }
+
+ Ok(merged.top_k(search.limit).to_row_ranges())
+}
+
+fn find_field_id_by_name(fields: &[DataField], name: &str) -> Option<i32> {
+ fields.iter().find(|f| f.name() == name).map(|f| f.id())
+}
diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs
index 9b1d6f7..1b34324 100644
--- a/crates/paimon/src/table/mod.rs
+++ b/crates/paimon/src/table/mod.rs
@@ -19,6 +19,8 @@
pub(crate) mod bin_pack;
mod bucket_filter;
+#[cfg(feature = "fulltext")]
+mod full_text_search_builder;
pub(crate) mod global_index_scanner;
mod read_builder;
pub(crate) mod row_id_predicate;
@@ -31,6 +33,8 @@ mod tag_manager;
use crate::Result;
use arrow_array::RecordBatch;
+#[cfg(feature = "fulltext")]
+pub use full_text_search_builder::FullTextSearchBuilder;
use futures::stream::BoxStream;
pub use read_builder::{ReadBuilder, TableRead};
pub use schema_manager::SchemaManager;
@@ -106,6 +110,14 @@ impl Table {
ReadBuilder::new(self)
}
+ /// Create a full-text search builder.
+ ///
+ /// Reference:
[FullTextSearchBuilderImpl](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextSearchBuilderImpl.java)
+ #[cfg(feature = "fulltext")]
+ pub fn new_full_text_search_builder(&self) -> FullTextSearchBuilder<'_> {
+ FullTextSearchBuilder::new(self)
+ }
+
/// Create a copy of this table with extra options merged into the schema.
pub fn copy_with_options(&self, extra: HashMap<String, String>) -> Self {
Self {
diff --git a/crates/paimon/src/table/table_scan.rs
b/crates/paimon/src/table/table_scan.rs
index 4e38958..e21f00b 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -329,6 +329,19 @@ impl<'a> TableScan<'a> {
}
}
+ /// Set row ranges for scan-time filtering.
+ ///
+ /// This replaces any existing row_ranges. Typically used to inject
+ /// results from global index lookups (e.g. full-text search).
+ pub fn with_row_ranges(mut self, ranges: Vec<RowRange>) -> Self {
+ self.row_ranges = if ranges.is_empty() {
+ None
+ } else {
+ Some(ranges)
+ };
+ self
+ }
+
/// Plan the full scan: resolve snapshot (via options or latest), then
read manifests and build DataSplits.
///
/// Time travel is resolved from table options:
diff --git a/crates/paimon/src/tantivy/directory.rs
b/crates/paimon/src/tantivy/directory.rs
new file mode 100644
index 0000000..32c3058
--- /dev/null
+++ b/crates/paimon/src/tantivy/directory.rs
@@ -0,0 +1,394 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Archive-backed Tantivy Directory implementation.
+//!
+//! Reads a Tantivy index packed into a single archive file. The archive format
+//! (Big-Endian, compatible with Java Paimon):
+//!
+//! ```text
+//! [fileCount: 4 bytes BE]
+//! for each file:
+//! [nameLen: 4 bytes BE]
+//! [name: nameLen bytes UTF-8]
+//! [dataLen: 8 bytes BE]
+//! [data: dataLen bytes]
+//! ```
+//!
+//! Reference:
`org.apache.paimon.tantivy.index.TantivyFullTextGlobalIndexWriter.packIndex()`
+//! Reference:
`org.apache.paimon.tantivy.index.TantivyFullTextGlobalIndexReader.parseArchiveHeader()`
+
+use bytes::Bytes;
+use std::collections::HashMap;
+use std::fmt;
+use std::io;
+use std::ops::Range;
+use std::path::{Path, PathBuf};
+use std::sync::{Arc, Mutex};
+use tantivy::directory::error::{DeleteError, LockError, OpenReadError,
OpenWriteError};
+use tantivy::directory::Directory;
+use tantivy::directory::{
+ AntiCallToken, DirectoryLock, FileHandle, Lock, OwnedBytes,
TerminatingWrite, WatchCallback,
+ WatchHandle, WritePtr,
+};
+use tantivy::HasLen;
+
+use crate::io::FileRead;
+
+/// Metadata for a single file within the archive.
+#[derive(Clone, Debug)]
+struct FileMeta {
+ /// Absolute byte offset of the file data within the archive.
+ offset: u64,
+ length: usize,
+}
+
+/// A read-only Tantivy `Directory` backed by an archive file.
+///
+/// Only the archive header (file names, offsets, lengths) is parsed eagerly.
+/// Actual file data is read on demand via `FileRead`.
+#[derive(Clone)]
+pub struct ArchiveDirectory {
+ files: Arc<HashMap<PathBuf, FileMeta>>,
+ reader: Arc<dyn FileRead>,
+ /// In-memory storage for atomic_write (used by Tantivy for meta.json).
+ atomic_data: Arc<Mutex<HashMap<PathBuf, Vec<u8>>>>,
+}
+
+impl ArchiveDirectory {
+ /// Create an `ArchiveDirectory` from an async `FileRead`.
+ ///
+ /// Only the archive header (file names, offsets, lengths) is read eagerly.
+ /// Actual file data is read on demand when Tantivy requests it.
+ pub async fn from_reader(reader: impl FileRead, file_size: u64) ->
io::Result<Self> {
+ let reader: Arc<dyn FileRead> = Arc::new(reader);
+
+ if file_size < 4 {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ "Archive too small to contain file count",
+ ));
+ }
+
+ // Read file count (4 bytes).
+ let buf = reader
+ .read(0..4)
+ .await
+ .map_err(|e| io::Error::other(e.to_string()))?;
+ let file_count = i32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
+ if file_count < 0 {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!("Negative file count in archive: {file_count}"),
+ ));
+ }
+ let file_count = file_count as usize;
+
+ let mut pos: u64 = 4;
+ let mut files = HashMap::with_capacity(file_count);
+
+ for _ in 0..file_count {
+ // Read name_len (4 bytes).
+ let buf = reader
+ .read(pos..pos + 4)
+ .await
+ .map_err(|e| io::Error::other(e.to_string()))?;
+ let name_len = i32::from_be_bytes([buf[0], buf[1], buf[2],
buf[3]]);
+ if name_len < 0 {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!("Negative name length in archive: {name_len}"),
+ ));
+ }
+ let name_len = name_len as u64;
+ pos += 4;
+
+ // Read name + data_len together in a single IO call.
+ let meta_buf = reader
+ .read(pos..pos + name_len + 8)
+ .await
+ .map_err(|e| io::Error::other(e.to_string()))?;
+
+ let name = String::from_utf8(meta_buf[..name_len as
usize].to_vec()).map_err(|e| {
+ io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!("Invalid UTF-8 in file name: {}", e),
+ )
+ })?;
+
+ let dl = name_len as usize;
+ let data_len = i64::from_be_bytes([
+ meta_buf[dl],
+ meta_buf[dl + 1],
+ meta_buf[dl + 2],
+ meta_buf[dl + 3],
+ meta_buf[dl + 4],
+ meta_buf[dl + 5],
+ meta_buf[dl + 6],
+ meta_buf[dl + 7],
+ ]);
+ if data_len < 0 {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!("Negative data length in archive: {data_len}"),
+ ));
+ }
+ let data_len = data_len as u64;
+ pos += name_len + 8;
+
+ let data_offset = pos;
+ files.insert(
+ PathBuf::from(&name),
+ FileMeta {
+ offset: data_offset,
+ length: data_len as usize,
+ },
+ );
+
+ // Skip past file data — do NOT read it.
+ pos += data_len;
+ }
+
+ Ok(Self {
+ files: Arc::new(files),
+ reader,
+ atomic_data: Arc::new(Mutex::new(HashMap::new())),
+ })
+ }
+}
+
+/// Bridge sync Tantivy `FileHandle::read_bytes` to async `FileRead::read`.
+///
+/// Uses `block_in_place` on multi-threaded tokio runtimes (no thread spawn
+/// overhead). Falls back to a scoped thread for current-thread runtimes.
+fn block_on_read(reader: &Arc<dyn FileRead>, range: Range<u64>) ->
io::Result<Bytes> {
+ let handle = tokio::runtime::Handle::current();
+ let do_read = || {
+ handle
+ .block_on(reader.read(range.clone()))
+ .map_err(|e| io::Error::other(e.to_string()))
+ };
+
+ match handle.runtime_flavor() {
+ tokio::runtime::RuntimeFlavor::MultiThread =>
tokio::task::block_in_place(do_read),
+ _ => {
+ // Current-thread runtime: block_in_place is not available,
+ // fall back to a scoped thread.
+ let reader = Arc::clone(reader);
+ std::thread::scope(|s| {
+ s.spawn(move || {
+ handle
+ .block_on(reader.read(range))
+ .map_err(|e| io::Error::other(e.to_string()))
+ })
+ .join()
+ .map_err(|_| io::Error::other("reader thread panicked"))?
+ })
+ }
+ }
+}
+
+impl fmt::Debug for ArchiveDirectory {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("ArchiveDirectory")
+ .field("files", &self.files.keys().collect::<Vec<_>>())
+ .finish()
+ }
+}
+
+/// A `FileHandle` for a single file within the archive.
+#[derive(Clone)]
+struct ArchiveFileHandle {
+ reader: Arc<dyn FileRead>,
+ file_offset: u64,
+ file_length: usize,
+}
+
+impl fmt::Debug for ArchiveFileHandle {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("ArchiveFileHandle")
+ .field("offset", &self.file_offset)
+ .field("length", &self.file_length)
+ .finish()
+ }
+}
+
+impl HasLen for ArchiveFileHandle {
+ fn len(&self) -> usize {
+ self.file_length
+ }
+}
+
+impl FileHandle for ArchiveFileHandle {
+ fn read_bytes(&self, range: Range<usize>) -> io::Result<OwnedBytes> {
+ if range.end > self.file_length {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ format!(
+ "Read range {:?} exceeds file length {}",
+ range, self.file_length
+ ),
+ ));
+ }
+
+ let abs_start = self.file_offset + range.start as u64;
+ let abs_end = self.file_offset + range.end as u64;
+ let data = block_on_read(&self.reader, abs_start..abs_end)?;
+ Ok(OwnedBytes::new(data.to_vec()))
+ }
+}
+
+impl Directory for ArchiveDirectory {
+ fn get_file_handle(&self, path: &Path) -> Result<Arc<dyn FileHandle>,
OpenReadError> {
+ let meta = self
+ .files
+ .get(path)
+ .ok_or_else(||
OpenReadError::FileDoesNotExist(path.to_path_buf()))?;
+
+ Ok(Arc::new(ArchiveFileHandle {
+ reader: self.reader.clone(),
+ file_offset: meta.offset,
+ file_length: meta.length,
+ }))
+ }
+
+ fn exists(&self, path: &Path) -> Result<bool, OpenReadError> {
+ Ok(self.files.contains_key(path) ||
self.atomic_data.lock().unwrap().contains_key(path))
+ }
+
+ fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
+ if let Some(data) = self.atomic_data.lock().unwrap().get(path) {
+ return Ok(data.clone());
+ }
+ let meta = self
+ .files
+ .get(path)
+ .ok_or_else(||
OpenReadError::FileDoesNotExist(path.to_path_buf()))?;
+
+ let data = block_on_read(&self.reader, meta.offset..meta.offset +
meta.length as u64)
+ .map_err(|e| OpenReadError::wrap_io_error(e, path.to_path_buf()))?;
+ Ok(data.to_vec())
+ }
+
+ fn atomic_write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
+ self.atomic_data
+ .lock()
+ .unwrap()
+ .insert(path.to_path_buf(), data.to_vec());
+ Ok(())
+ }
+
+ fn delete(&self, _path: &Path) -> Result<(), DeleteError> {
+ Ok(())
+ }
+
+ fn open_write(&self, _path: &Path) -> Result<WritePtr, OpenWriteError> {
+ Ok(io::BufWriter::new(Box::new(
+ VecTerminatingWrite(Vec::new()),
+ )))
+ }
+
+ fn sync_directory(&self) -> io::Result<()> {
+ Ok(())
+ }
+
+ fn acquire_lock(&self, _lock: &Lock) -> Result<DirectoryLock, LockError> {
+ Ok(DirectoryLock::from(Box::new(())))
+ }
+
+ fn watch(&self, _watch_callback: WatchCallback) ->
tantivy::Result<WatchHandle> {
+ Ok(WatchHandle::empty())
+ }
+}
+
+/// Dummy writer for lock file support.
+struct VecTerminatingWrite(Vec<u8>);
+
+impl io::Write for VecTerminatingWrite {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.0.extend_from_slice(buf);
+ Ok(buf.len())
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ Ok(())
+ }
+}
+
+impl TerminatingWrite for VecTerminatingWrite {
+ fn terminate_ref(&mut self, _token: AntiCallToken) -> io::Result<()> {
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::io::FileIOBuilder;
+ use crate::tantivy::writer::TantivyFullTextWriter;
+
+ async fn make_test_dir() -> ArchiveDirectory {
+ let file_io = FileIOBuilder::new("memory").build().unwrap();
+
+ let mut writer = TantivyFullTextWriter::new().unwrap();
+ writer.add_document(0, Some("hello")).unwrap();
+ writer.add_document(1, Some("world")).unwrap();
+ let output = file_io.new_output("/test_archive.bin").unwrap();
+ writer.finish(&output).await.unwrap();
+
+ let input = output.to_input_file();
+ let metadata = input.metadata().await.unwrap();
+ let reader = input.reader().await.unwrap();
+ ArchiveDirectory::from_reader(reader, metadata.size)
+ .await
+ .unwrap()
+ }
+
+ #[tokio::test]
+ async fn test_parse_archive() {
+ let dir = make_test_dir().await;
+ // Tantivy index files should be present.
+ assert!(!dir.files.is_empty());
+ }
+
+ #[tokio::test]
+ async fn test_read_file_from_archive() {
+ let dir = make_test_dir().await;
+ // Read any file from the archive and verify it's non-empty.
+ let first_path = dir.files.keys().next().unwrap().clone();
+ let handle = dir.get_file_handle(&first_path).unwrap();
+ assert!(handle.len() > 0);
+ let data = handle.read_bytes(0..handle.len()).unwrap();
+ assert_eq!(data.len(), handle.len());
+ }
+
+ #[tokio::test]
+ async fn test_atomic_read_write() {
+ let dir = make_test_dir().await;
+
+ // atomic_write + atomic_read
+ dir.atomic_write(Path::new("meta.json"), b"{}").unwrap();
+ let data = dir.atomic_read(Path::new("meta.json")).unwrap();
+ assert_eq!(&data, b"{}");
+ }
+
+ #[tokio::test]
+ async fn test_file_not_found() {
+ let dir = make_test_dir().await;
+ assert!(dir.get_file_handle(Path::new("missing.txt")).is_err());
+ }
+}
diff --git a/crates/paimon/src/tantivy/full_text_search.rs
b/crates/paimon/src/tantivy/full_text_search.rs
new file mode 100644
index 0000000..77bcd03
--- /dev/null
+++ b/crates/paimon/src/tantivy/full_text_search.rs
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Full-text search types for global index.
+//!
+//! Reference:
[org.apache.paimon.predicate.FullTextSearch](https://github.com/apache/paimon/blob/master/paimon-common/src/main/java/org/apache/paimon/predicate/FullTextSearch.java)
+
+/// Full-text search predicate.
+///
+/// Reference: `org.apache.paimon.predicate.FullTextSearch`
+#[derive(Debug, Clone)]
+pub struct FullTextSearch {
+ pub query_text: String,
+ pub field_name: String,
+ pub limit: usize,
+}
+
+impl FullTextSearch {
+ pub fn new(query_text: String, limit: usize, field_name: String) ->
crate::Result<Self> {
+ if query_text.is_empty() {
+ return Err(crate::Error::ConfigInvalid {
+ message: "Query text cannot be empty".to_string(),
+ });
+ }
+ if limit == 0 {
+ return Err(crate::Error::ConfigInvalid {
+ message: "Limit must be positive".to_string(),
+ });
+ }
+ if field_name.is_empty() {
+ return Err(crate::Error::ConfigInvalid {
+ message: "Field name cannot be empty".to_string(),
+ });
+ }
+ Ok(Self {
+ query_text,
+ field_name,
+ limit,
+ })
+ }
+}
+
+/// Search result containing parallel arrays of row IDs and scores.
+#[derive(Debug, Clone)]
+pub struct SearchResult {
+ pub row_ids: Vec<u64>,
+ pub scores: Vec<f32>,
+}
+
+impl SearchResult {
+ pub fn new(row_ids: Vec<u64>, scores: Vec<f32>) -> Self {
+ assert_eq!(row_ids.len(), scores.len());
+ Self { row_ids, scores }
+ }
+
+ pub fn empty() -> Self {
+ Self {
+ row_ids: Vec::new(),
+ scores: Vec::new(),
+ }
+ }
+
+ pub fn len(&self) -> usize {
+ self.row_ids.len()
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.row_ids.is_empty()
+ }
+
+ /// Apply an offset to all row IDs.
+ pub fn offset(&self, offset: i64) -> Self {
+ if offset == 0 {
+ return self.clone();
+ }
+ let row_ids = self
+ .row_ids
+ .iter()
+ .map(|&id| {
+ if offset >= 0 {
+ id.saturating_add(offset as u64)
+ } else {
+ id.saturating_sub(offset.unsigned_abs())
+ }
+ })
+ .collect();
+ Self {
+ row_ids,
+ scores: self.scores.clone(),
+ }
+ }
+
+ /// Merge two search results.
+ pub fn or(&self, other: &SearchResult) -> Self {
+ let mut row_ids = self.row_ids.clone();
+ let mut scores = self.scores.clone();
+ row_ids.extend_from_slice(&other.row_ids);
+ scores.extend_from_slice(&other.scores);
+ Self { row_ids, scores }
+ }
+
+ /// Return top-k results by score (descending).
+ pub fn top_k(&self, k: usize) -> Self {
+ if self.row_ids.len() <= k {
+ return self.clone();
+ }
+ let mut indices: Vec<usize> = (0..self.row_ids.len()).collect();
+ indices.sort_by(|&a, &b| {
+ self.scores[b]
+ .partial_cmp(&self.scores[a])
+ .unwrap_or(std::cmp::Ordering::Equal)
+ });
+ indices.truncate(k);
+ let row_ids = indices.iter().map(|&i| self.row_ids[i]).collect();
+ let scores = indices.iter().map(|&i| self.scores[i]).collect();
+ Self { row_ids, scores }
+ }
+
+ /// Convert to sorted, merged row ranges.
+ pub fn to_row_ranges(&self) -> Vec<crate::table::RowRange> {
+ if self.row_ids.is_empty() {
+ return Vec::new();
+ }
+ let mut sorted: Vec<u64> = self.row_ids.clone();
+ sorted.sort_unstable();
+ sorted.dedup();
+ let mut ranges = Vec::new();
+ let mut start = sorted[0] as i64;
+ let mut end = start;
+ for &id in &sorted[1..] {
+ let id = id as i64;
+ if id == end + 1 {
+ end = id;
+ } else {
+ ranges.push(crate::table::RowRange::new(start, end));
+ start = id;
+ end = id;
+ }
+ }
+ ranges.push(crate::table::RowRange::new(start, end));
+ ranges
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_full_text_search_new() {
+ let fts = FullTextSearch::new("hello".into(), 10,
"text".into()).unwrap();
+ assert_eq!(fts.query_text, "hello");
+ assert_eq!(fts.limit, 10);
+ assert_eq!(fts.field_name, "text");
+ }
+
+ #[test]
+ fn test_full_text_search_empty_query() {
+ let result = FullTextSearch::new("".into(), 10, "text".into());
+ assert!(result.is_err());
+ }
+}
diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/tantivy/mod.rs
similarity index 57%
copy from crates/paimon/src/lib.rs
copy to crates/paimon/src/tantivy/mod.rs
index f2deafe..0922937 100644
--- a/crates/paimon/src/lib.rs
+++ b/crates/paimon/src/tantivy/mod.rs
@@ -15,31 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-mod error;
-pub use error::Error;
-pub use error::Result;
+//! Tantivy-based full-text search for global index.
-pub mod common;
-pub use common::{CatalogOptions, Options};
-
-pub mod api;
-pub use api::rest_api::RESTApi;
-
-pub mod arrow;
-pub mod btree;
-pub mod catalog;
-mod deletion_vector;
-pub mod file_index;
-pub mod io;
-mod predicate_stats;
-pub mod spec;
-pub mod table;
-
-pub use catalog::Catalog;
-pub use catalog::CatalogFactory;
-pub use catalog::FileSystemCatalog;
-
-pub use table::{
- DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan,
ReadBuilder, RowRange,
- SnapshotManager, Table, TableRead, TableScan, TagManager,
-};
+pub(crate) mod directory;
+pub mod full_text_search;
+pub mod reader;
+pub mod writer;
diff --git a/crates/paimon/src/tantivy/reader.rs
b/crates/paimon/src/tantivy/reader.rs
new file mode 100644
index 0000000..1748850
--- /dev/null
+++ b/crates/paimon/src/tantivy/reader.rs
@@ -0,0 +1,198 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tantivy full-text index reader.
+//!
+//! Reads a Tantivy index from an archive (packed by `TantivyFullTextWriter`)
+//! and performs full-text search queries.
+//!
+//! Reference:
`org.apache.paimon.tantivy.index.TantivyFullTextGlobalIndexReader`
+
+use crate::io::{FileRead, InputFile};
+use crate::tantivy::directory::ArchiveDirectory;
+use crate::tantivy::full_text_search::SearchResult;
+use tantivy::collector::TopDocs;
+use tantivy::query::QueryParser;
+use tantivy::{Index, IndexReader, ReloadPolicy};
+
+/// Reader for a Tantivy full-text index stored in archive format.
+pub struct TantivyFullTextReader {
+ reader: IndexReader,
+ index: Index,
+}
+
+impl TantivyFullTextReader {
+ /// Open a reader from an `InputFile` (on-demand reading, no full load).
+ pub async fn from_input_file(input: &InputFile) -> crate::Result<Self> {
+ let metadata = input.metadata().await?;
+ let reader = input.reader().await?;
+ Self::from_reader(reader, metadata.size).await
+ }
+
+ /// Open a reader from an async `FileRead` and file size.
+ pub async fn from_reader(reader: impl FileRead, file_size: u64) ->
crate::Result<Self> {
+ let directory = ArchiveDirectory::from_reader(reader, file_size)
+ .await
+ .map_err(|e| crate::Error::UnexpectedError {
+ message: format!("Failed to parse Tantivy archive: {}", e),
+ source: None,
+ })?;
+
+ let index = Index::open(directory).map_err(|e|
crate::Error::UnexpectedError {
+ message: format!("Failed to open Tantivy index from archive: {}",
e),
+ source: None,
+ })?;
+
+ let reader = index
+ .reader_builder()
+ .reload_policy(ReloadPolicy::Manual)
+ .try_into()
+ .map_err(|e| crate::Error::UnexpectedError {
+ message: format!("Failed to create Tantivy reader: {}", e),
+ source: None,
+ })?;
+
+ Ok(Self { reader, index })
+ }
+
+ /// Search the index and return top-N results ranked by score.
+ pub fn search(&self, query_text: &str, limit: usize) ->
crate::Result<SearchResult> {
+ let schema = self.index.schema();
+ let text_field = schema
+ .get_field("text")
+ .map_err(|_| crate::Error::UnexpectedError {
+ message: "Tantivy schema missing 'text' field".to_string(),
+ source: None,
+ })?;
+
+ let searcher = self.reader.searcher();
+ let query_parser = QueryParser::for_index(&self.index,
vec![text_field]);
+ let query =
+ query_parser
+ .parse_query(query_text)
+ .map_err(|e| crate::Error::UnexpectedError {
+ message: format!("Failed to parse query '{}': {}",
query_text, e),
+ source: None,
+ })?;
+
+ let top_docs = searcher
+ .search(&query, &TopDocs::with_limit(limit))
+ .map_err(|e| crate::Error::UnexpectedError {
+ message: format!("Tantivy search failed: {}", e),
+ source: None,
+ })?;
+
+ let mut row_ids = Vec::with_capacity(top_docs.len());
+ let mut scores = Vec::with_capacity(top_docs.len());
+
+ for (score, doc_address) in &top_docs {
+ let segment_reader =
searcher.segment_reader(doc_address.segment_ord);
+ let fast_fields =
segment_reader.fast_fields().u64("row_id").map_err(|e| {
+ crate::Error::UnexpectedError {
+ message: format!("Failed to get row_id fast field: {}", e),
+ source: None,
+ }
+ })?;
+ let row_id = fast_fields.first(doc_address.doc_id).ok_or_else(|| {
+ crate::Error::UnexpectedError {
+ message: format!(
+ "Missing row_id for doc_id {} in segment {}",
+ doc_address.doc_id, doc_address.segment_ord
+ ),
+ source: None,
+ }
+ })?;
+ row_ids.push(row_id);
+ scores.push(*score);
+ }
+
+ Ok(SearchResult::new(row_ids, scores))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::io::FileIOBuilder;
+ use crate::tantivy::writer::TantivyFullTextWriter;
+
+ async fn create_test_reader() -> TantivyFullTextReader {
+ let file_io = FileIOBuilder::new("memory").build().unwrap();
+ let mut writer = TantivyFullTextWriter::new().unwrap();
+ writer
+ .add_document(0, Some("the quick brown fox jumps over the lazy
dog"))
+ .unwrap();
+ writer
+ .add_document(1, Some("rust programming language"))
+ .unwrap();
+ writer
+ .add_document(2, Some("apache paimon data lake"))
+ .unwrap();
+ writer
+ .add_document(3, Some("full text search with tantivy"))
+ .unwrap();
+ writer
+ .add_document(4, Some("the fox is quick and brown"))
+ .unwrap();
+ let output = file_io.new_output("/test_reader_index.archive").unwrap();
+ writer.finish(&output).await.unwrap();
+ TantivyFullTextReader::from_input_file(&output.to_input_file())
+ .await
+ .unwrap()
+ }
+
+ #[tokio::test]
+ async fn test_search_basic() {
+ let reader = create_test_reader().await;
+ let result = reader.search("fox", 10).unwrap();
+ assert_eq!(result.len(), 2);
+ assert!(result.row_ids.contains(&0));
+ assert!(result.row_ids.contains(&4));
+ }
+
+ #[tokio::test]
+ async fn test_search_limit() {
+ let reader = create_test_reader().await;
+ let result = reader.search("fox", 1).unwrap();
+ assert_eq!(result.len(), 1);
+ }
+
+ #[tokio::test]
+ async fn test_search_no_match() {
+ let reader = create_test_reader().await;
+ let result = reader.search("nonexistent", 10).unwrap();
+ assert!(result.is_empty());
+ }
+
+ #[tokio::test]
+ async fn test_search_scored() {
+ let reader = create_test_reader().await;
+ let result = reader.search("tantivy", 10).unwrap();
+ assert_eq!(result.len(), 1);
+ assert!(result.row_ids.contains(&3));
+ assert!(result.scores[0] > 0.0);
+ }
+
+ #[tokio::test]
+ async fn test_search_with_offset() {
+ let reader = create_test_reader().await;
+ let result = reader.search("fox", 10).unwrap();
+ let offset_result = result.offset(1000);
+ assert!(offset_result.row_ids.contains(&1000));
+ assert!(offset_result.row_ids.contains(&1004));
+ }
+}
diff --git a/crates/paimon/src/tantivy/writer.rs
b/crates/paimon/src/tantivy/writer.rs
new file mode 100644
index 0000000..a12ed30
--- /dev/null
+++ b/crates/paimon/src/tantivy/writer.rs
@@ -0,0 +1,242 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tantivy full-text index writer.
+//!
+//! Writes documents (rowId + text) to a local Tantivy index, then packs the
+//! index directory into a single archive file (Big-Endian, Java-compatible)
+//! and writes it to an `OutputFile`.
+//!
+//! Reference:
`org.apache.paimon.tantivy.index.TantivyFullTextGlobalIndexWriter`
+
+use bytes::Bytes;
+use std::io::Read;
+
+use crate::io::OutputFile;
+use tantivy::schema::{Field, NumericOptions, Schema, TEXT};
+use tantivy::{Index, IndexWriter, TantivyDocument};
+
+/// Builds the fixed schema: `row_id` (u64 fast+stored+indexed) + `text`
(full-text).
+fn build_schema() -> (Schema, Field, Field) {
+ let mut builder = Schema::builder();
+ let row_id_field = builder.add_u64_field(
+ "row_id",
+ NumericOptions::default()
+ .set_stored()
+ .set_indexed()
+ .set_fast(),
+ );
+ let text_field = builder.add_text_field("text", TEXT);
+ (builder.build(), row_id_field, text_field)
+}
+
+/// Writer for creating a Tantivy full-text index and packing it into an
archive.
+pub struct TantivyFullTextWriter {
+ writer: IndexWriter,
+ row_id_field: Field,
+ text_field: Field,
+ temp_dir: tempfile::TempDir,
+ row_count: u64,
+}
+
+impl TantivyFullTextWriter {
+ /// Create a new writer. The index is built in a temporary directory.
+ pub fn new() -> crate::Result<Self> {
+ let temp_dir = tempfile::tempdir().map_err(|e|
crate::Error::UnexpectedError {
+ message: format!("Failed to create temp directory for Tantivy
index: {}", e),
+ source: None,
+ })?;
+
+ let (schema, row_id_field, text_field) = build_schema();
+ let index = Index::create_in_dir(temp_dir.path(), schema).map_err(|e| {
+ crate::Error::UnexpectedError {
+ message: format!("Failed to create Tantivy index: {}", e),
+ source: None,
+ }
+ })?;
+ let writer = index
+ .writer(50_000_000)
+ .map_err(|e| crate::Error::UnexpectedError {
+ message: format!("Failed to create Tantivy writer: {}", e),
+ source: None,
+ })?;
+
+ Ok(Self {
+ writer,
+ row_id_field,
+ text_field,
+ temp_dir,
+ row_count: 0,
+ })
+ }
+
+ /// Add a document with the given row ID and text content.
+ /// If text is None, the row ID is still incremented (null value).
+ pub fn add_document(&mut self, row_id: u64, text: Option<&str>) ->
crate::Result<()> {
+ if let Some(text) = text {
+ let mut doc = TantivyDocument::new();
+ doc.add_u64(self.row_id_field, row_id);
+ doc.add_text(self.text_field, text);
+ self.writer
+ .add_document(doc)
+ .map_err(|e| crate::Error::UnexpectedError {
+ message: format!("Failed to add document: {}", e),
+ source: None,
+ })?;
+ }
+ self.row_count += 1;
+ Ok(())
+ }
+
+ /// Commit, pack the index into an archive, and write it to the given
`OutputFile`.
+ ///
+ /// Returns `false` if no documents were written (nothing is written to
the file).
+ ///
+ /// Reference: `TantivyFullTextGlobalIndexWriter.packIndex()`
+ pub async fn finish(mut self, output: &OutputFile) -> crate::Result<bool> {
+ if self.row_count == 0 {
+ return Ok(false);
+ }
+
+ self.writer
+ .commit()
+ .map_err(|e| crate::Error::UnexpectedError {
+ message: format!("Failed to commit Tantivy index: {}", e),
+ source: None,
+ })?;
+
+ // Drop the writer to release file locks before packing.
+ drop(self.writer);
+
+ // Stream the archive directly to the OutputFile.
+ let mut file_writer = output.writer().await?;
+
+ // Collect file entries from the temp directory.
+ let mut entries: Vec<(String, std::path::PathBuf)> = Vec::new();
+ for entry in
+ std::fs::read_dir(self.temp_dir.path()).map_err(|e|
crate::Error::UnexpectedError {
+ message: format!("Failed to read Tantivy index directory: {}",
e),
+ source: None,
+ })?
+ {
+ let entry = entry.map_err(|e| crate::Error::UnexpectedError {
+ message: format!("Failed to read directory entry: {}", e),
+ source: None,
+ })?;
+ if entry.file_type().map(|t| t.is_file()).unwrap_or(false) {
+ let name = entry.file_name().to_string_lossy().to_string();
+ entries.push((name, entry.path()));
+ }
+ }
+
+ // Write file count (4 bytes BE).
+ file_writer
+ .write(Bytes::from((entries.len() as i32).to_be_bytes().to_vec()))
+ .await?;
+
+ // Write each file: name_len + name + data_len + data (chunked).
+ const CHUNK_SIZE: usize = 4 * 1024 * 1024; // 4 MiB
+ for (name, path) in &entries {
+ let name_bytes = name.as_bytes();
+ file_writer
+ .write(Bytes::from(
+ (name_bytes.len() as i32).to_be_bytes().to_vec(),
+ ))
+ .await?;
+ file_writer.write(Bytes::from(name_bytes.to_vec())).await?;
+
+ let file_len = std::fs::metadata(path)
+ .map_err(|e| crate::Error::UnexpectedError {
+ message: format!("Failed to stat index file '{}': {}",
name, e),
+ source: None,
+ })?
+ .len();
+ file_writer
+ .write(Bytes::from((file_len as i64).to_be_bytes().to_vec()))
+ .await?;
+
+ let mut file =
+ std::fs::File::open(path).map_err(|e|
crate::Error::UnexpectedError {
+ message: format!("Failed to open index file '{}': {}",
name, e),
+ source: None,
+ })?;
+ let mut buf = vec![0u8; CHUNK_SIZE];
+ loop {
+ let n = file
+ .read(&mut buf)
+ .map_err(|e| crate::Error::UnexpectedError {
+ message: format!("Failed to read index file '{}': {}",
name, e),
+ source: None,
+ })?;
+ if n == 0 {
+ break;
+ }
+ file_writer.write(Bytes::copy_from_slice(&buf[..n])).await?;
+ }
+ }
+
+ file_writer.close().await?;
+ Ok(true)
+ }
+
+ /// Number of rows processed (including nulls).
+ pub fn row_count(&self) -> u64 {
+ self.row_count
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::io::FileIOBuilder;
+ use crate::tantivy::reader::TantivyFullTextReader;
+
+ #[tokio::test]
+ async fn test_write_and_read_roundtrip() {
+ let file_io = FileIOBuilder::new("memory").build().unwrap();
+
+ let mut writer = TantivyFullTextWriter::new().unwrap();
+ writer.add_document(0, Some("hello world")).unwrap();
+ writer.add_document(1, Some("foo bar baz")).unwrap();
+ writer.add_document(2, None).unwrap();
+ writer.add_document(3, Some("hello again")).unwrap();
+
+ let output = file_io.new_output("/test_index.archive").unwrap();
+ let written = writer.finish(&output).await.unwrap();
+ assert!(written);
+
+ let input = output.to_input_file();
+ let reader = TantivyFullTextReader::from_input_file(&input)
+ .await
+ .unwrap();
+ let result = reader.search("hello", 10).unwrap();
+ assert_eq!(result.len(), 2);
+ assert!(result.row_ids.contains(&0));
+ assert!(result.row_ids.contains(&3));
+ }
+
+ #[tokio::test]
+ async fn test_empty_writer() {
+ let file_io = FileIOBuilder::new("memory").build().unwrap();
+ let output = file_io.new_output("/empty_index.archive").unwrap();
+
+ let writer = TantivyFullTextWriter::new().unwrap();
+ let written = writer.finish(&output).await.unwrap();
+ assert!(!written);
+ assert!(!output.exists().await.unwrap());
+ }
+}