This is an automated email from the ASF dual-hosted git repository.
jerryjing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c08827bd0f [python] Introduce Full Text Search and Tantivy index in
Python (#7560)
c08827bd0f is described below
commit c08827bd0facf5b28c4aff4e704921c3ec250f74
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Apr 1 18:14:14 2026 +0800
[python] Introduce Full Text Search and Tantivy index in Python (#7560)
---
.github/workflows/paimon-python-checks.yml | 25 ++
docs/content/append-table/global-index.md | 21 ++
docs/content/pypaimon/cli.md | 42 +++
paimon-python/dev/run_mixed_tests.sh | 54 +++-
paimon-python/pypaimon/cli/cli_table.py | 114 ++++++++
paimon-python/pypaimon/globalindex/__init__.py | 4 +
.../pypaimon/globalindex/full_text_search.py | 53 ++++
.../pypaimon/globalindex/global_index_reader.py | 4 +
.../globalindex/offset_global_index_reader.py | 102 +++++++
.../pypaimon/globalindex/{ => tantivy}/__init__.py | 28 +-
.../tantivy_full_text_global_index_reader.py | 294 +++++++++++++++++++++
.../pypaimon/globalindex/vector_search_result.py | 31 +++
.../pypaimon/read/scanner/file_scanner.py | 8 +-
paimon-python/pypaimon/read/table_scan.py | 4 +
paimon-python/pypaimon/table/file_store_table.py | 4 +
.../pypaimon/table/format/format_table.py | 3 +
.../pypaimon/table/iceberg/iceberg_table.py | 3 +
.../pypaimon/table/object/object_table.py | 5 +
.../pypaimon/table/source/full_text_read.py | 120 +++++++++
.../pypaimon/table/source/full_text_scan.py | 93 +++++++
.../table/source/full_text_search_builder.py | 100 +++++++
.../table/source/full_text_search_split.py | 53 ++++
paimon-python/pypaimon/table/table.py | 5 +
.../pypaimon/tests/e2e/java_py_read_write_test.py | 68 +++++
paimon-tantivy/paimon-tantivy-index/pom.xml | 103 ++++++++
.../paimon/tantivy/index/JavaPyTantivyE2ETest.java | 226 ++++++++++++++++
26 files changed, 1539 insertions(+), 28 deletions(-)
diff --git a/.github/workflows/paimon-python-checks.yml
b/.github/workflows/paimon-python-checks.yml
index 722f37fa05..a797257e06 100755
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -79,6 +79,20 @@ jobs:
java -version
mvn -version
+ - name: Install Rust toolchain
+ uses: dtolnay/rust-toolchain@stable
+
+ - name: Build Tantivy native library
+ run: |
+ cd paimon-tantivy/paimon-tantivy-jni/rust
+ cargo build --release
+
+ - name: Copy Tantivy native library to resources
+ run: |
+
RESOURCE_DIR=paimon-tantivy/paimon-tantivy-jni/src/main/resources/native/linux-amd64
+ mkdir -p ${RESOURCE_DIR}
+ cp
paimon-tantivy/paimon-tantivy-jni/rust/target/release/libtantivy_jni.so
${RESOURCE_DIR}/
+
- name: Verify Python version
run: python --version
@@ -118,6 +132,17 @@ jobs:
fi
fi
df -h
+
+ - name: Build and install tantivy-py from source
+ if: matrix.python-version != '3.6.15'
+ shell: bash
+ run: |
+ pip install maturin
+ git clone -b support_directory
https://github.com/JingsongLi/tantivy-py.git /tmp/tantivy-py
+ cd /tmp/tantivy-py
+ maturin build --release
+ pip install target/wheels/tantivy-*.whl
+
- name: Run lint-python.sh
shell: bash
run: |
diff --git a/docs/content/append-table/global-index.md
b/docs/content/append-table/global-index.md
index 172c5bedcb..d88a3348a6 100644
--- a/docs/content/append-table/global-index.md
+++ b/docs/content/append-table/global-index.md
@@ -211,4 +211,25 @@ try (RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(plan)
```
{{< /tab >}}
+{{< tab "Python SDK" >}}
+```python
+table = catalog.get_table('db.my_table')
+
+# Step 1: Build full-text search
+builder = table.new_full_text_search_builder()
+builder.with_text_column('content')
+builder.with_query_text('paimon lake format')
+builder.with_limit(10)
+result = builder.execute_local()
+
+# Step 2: Read matching rows using the search result
+read_builder = table.new_read_builder()
+scan = read_builder.new_scan().with_global_index_result(result)
+plan = scan.plan()
+table_read = read_builder.new_read()
+pa_table = table_read.to_arrow(plan.splits())
+print(pa_table)
+```
+{{< /tab >}}
+
{{< /tabs >}}
diff --git a/docs/content/pypaimon/cli.md b/docs/content/pypaimon/cli.md
index 22bdbd3c18..a328fc4744 100644
--- a/docs/content/pypaimon/cli.md
+++ b/docs/content/pypaimon/cli.md
@@ -362,6 +362,48 @@ Table 'mydb.old_name' renamed to 'mydb.new_name'
successfully.
**Note:** Both filesystem and REST catalogs support table rename. For
filesystem catalogs, the rename is performed by renaming the underlying table
directory.
+### Table Full-Text Search
+
+Perform full-text search on a Paimon table with a Tantivy full-text index and
display matching rows.
+
+```shell
+paimon table full-text-search mydb.articles --column content --query "paimon
lake"
+```
+
+**Options:**
+
+- `--column, -c`: Text column to search on - **Required**
+- `--query, -q`: Query text to search for - **Required**
+- `--limit, -l`: Maximum number of results to return (default: 10)
+- `--select, -s`: Select specific columns to display (comma-separated)
+- `--format, -f`: Output format: `table` (default) or `json`
+
+**Examples:**
+
+```shell
+# Basic full-text search
+paimon table full-text-search mydb.articles -c content -q "paimon lake"
+
+# Search with limit
+paimon table full-text-search mydb.articles -c content -q "streaming data" -l
20
+
+# Search with column projection
+paimon table full-text-search mydb.articles -c content -q "paimon" -s
"id,title,content"
+
+# Output as JSON
+paimon table full-text-search mydb.articles -c content -q "paimon" -f json
+```
+
+Output:
+```
+ id content
+ 0 Apache Paimon is a streaming data lake platform
+ 2 Paimon supports real-time data ingestion and...
+ 4 Data lake platforms like Paimon handle large-...
+```
+
+**Note:** The table must have a Tantivy full-text index built on the target
column. See [Global Index]({{< ref "append-table/global-index" >}}) for how to
create full-text indexes.
+
### Table Drop
Drop a table from the catalog. This will permanently delete the table and all
its data.
diff --git a/paimon-python/dev/run_mixed_tests.sh
b/paimon-python/dev/run_mixed_tests.sh
index a7e5d5dd76..839a750780 100755
--- a/paimon-python/dev/run_mixed_tests.sh
+++ b/paimon-python/dev/run_mixed_tests.sh
@@ -138,9 +138,6 @@ run_java_read_test() {
cd "$PROJECT_ROOT"
- PYTHON_VERSION=$(python -c "import sys;
print(f'{sys.version_info.major}.{sys.version_info.minor}')" 2>/dev/null ||
echo "unknown")
- echo "Detected Python version: $PYTHON_VERSION"
-
# Run Java test for Parquet/Orc/Avro format in paimon-core
echo "Running Maven test for JavaPyE2ETest.testReadPkTable (Java Read
Parquet/Orc/Avro)..."
echo "Note: Maven may download dependencies on first run, this may take a
while..."
@@ -171,6 +168,7 @@ run_java_read_test() {
return 1
fi
}
+
run_pk_dv_test() {
echo -e "${YELLOW}=== Step 5: Running Primary Key & Deletion Vector Test
(testPKDeletionVectorWriteRead) ===${NC}"
@@ -244,6 +242,30 @@ run_compressed_text_test() {
fi
}
+# Function to run Tantivy full-text index test (Java write index, Python read
and search)
+run_tantivy_fulltext_test() {
+ echo -e "${YELLOW}=== Step 8: Running Tantivy Full-Text Index Test (Java
Write, Python Read) ===${NC}"
+
+ cd "$PROJECT_ROOT"
+
+ echo "Running Maven test for
JavaPyTantivyE2ETest.testTantivyFullTextIndexWrite..."
+ if mvn test
-Dtest=org.apache.paimon.tantivy.index.JavaPyTantivyE2ETest#testTantivyFullTextIndexWrite
-pl paimon-tantivy/paimon-tantivy-index -q -Drun.e2e.tests=true; then
+ echo -e "${GREEN}✓ Java test completed successfully${NC}"
+ else
+ echo -e "${RED}✗ Java test failed${NC}"
+ return 1
+ fi
+ cd "$PAIMON_PYTHON_DIR"
+ echo "Running Python test for
JavaPyReadWriteTest.test_read_tantivy_full_text_index..."
+ if python -m pytest
java_py_read_write_test.py::JavaPyReadWriteTest::test_read_tantivy_full_text_index
-v; then
+ echo -e "${GREEN}✓ Python test completed successfully${NC}"
+ return 0
+ else
+ echo -e "${RED}✗ Python test failed${NC}"
+ return 1
+ fi
+}
+
# Main execution
main() {
local java_write_result=0
@@ -253,6 +275,12 @@ main() {
local pk_dv_result=0
local btree_index_result=0
local compressed_text_result=0
+ local tantivy_fulltext_result=0
+
+ # Detect Python version
+ PYTHON_VERSION=$(python -c "import sys;
print(f'{sys.version_info.major}.{sys.version_info.minor}')" 2>/dev/null ||
echo "unknown")
+ PYTHON_MINOR=$(python -c "import sys; print(sys.version_info.minor)"
2>/dev/null || echo "0")
+ echo "Detected Python version: $PYTHON_VERSION"
echo -e "${YELLOW}Starting mixed language test execution...${NC}"
echo ""
@@ -311,6 +339,18 @@ main() {
echo ""
+ # Run Tantivy full-text index test (requires Python >= 3.10)
+ if [[ "$PYTHON_MINOR" -ge 10 ]]; then
+ if ! run_tantivy_fulltext_test; then
+ tantivy_fulltext_result=1
+ fi
+ else
+ echo -e "${YELLOW}⏭ Skipping Tantivy Full-Text Index Test (requires
Python >= 3.10, current: $PYTHON_VERSION)${NC}"
+ tantivy_fulltext_result=0
+ fi
+
+ echo ""
+
echo -e "${YELLOW}=== Test Results Summary ===${NC}"
if [[ $java_write_result -eq 0 ]]; then
@@ -355,12 +395,18 @@ main() {
echo -e "${RED}✗ Compressed Text Test (Java Write, Python Read):
FAILED${NC}"
fi
+ if [[ $tantivy_fulltext_result -eq 0 ]]; then
+ echo -e "${GREEN}✓ Tantivy Full-Text Index Test (Java Write, Python
Read): PASSED${NC}"
+ else
+ echo -e "${RED}✗ Tantivy Full-Text Index Test (Java Write, Python
Read): FAILED${NC}"
+ fi
+
echo ""
# Clean up warehouse directory after all tests
cleanup_warehouse
- if [[ $java_write_result -eq 0 && $python_read_result -eq 0 &&
$python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 &&
$btree_index_result -eq 0 && $compressed_text_result -eq 0 ]]; then
+ if [[ $java_write_result -eq 0 && $python_read_result -eq 0 &&
$python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 &&
$btree_index_result -eq 0 && $compressed_text_result -eq 0 &&
$tantivy_fulltext_result -eq 0 ]]; then
echo -e "${GREEN}🎉 All tests passed! Java-Python interoperability
verified.${NC}"
return 0
else
diff --git a/paimon-python/pypaimon/cli/cli_table.py
b/paimon-python/pypaimon/cli/cli_table.py
index 6a7c383bb9..0f829f4553 100644
--- a/paimon-python/pypaimon/cli/cli_table.py
+++ b/paimon-python/pypaimon/cli/cli_table.py
@@ -147,6 +147,83 @@ def cmd_table_read(args):
print(df.to_string(index=False))
+def cmd_table_full_text_search(args):
+ """
+ Execute the 'table full-text-search' command.
+
+ Performs full-text search on a Paimon table and displays matching rows.
+
+ Args:
+ args: Parsed command line arguments.
+ """
+ from pypaimon.cli.cli import load_catalog_config, create_catalog
+
+ config_path = args.config
+ config = load_catalog_config(config_path)
+ catalog = create_catalog(config)
+
+ table_identifier = args.table
+ parts = table_identifier.split('.')
+ if len(parts) != 2:
+ print(f"Error: Invalid table identifier '{table_identifier}'. "
+ f"Expected format: 'database.table'", file=sys.stderr)
+ sys.exit(1)
+
+ database_name, table_name = parts
+
+ try:
+ table = catalog.get_table(f"{database_name}.{table_name}")
+ except Exception as e:
+ print(f"Error: Failed to get table '{table_identifier}': {e}",
file=sys.stderr)
+ sys.exit(1)
+
+ # Build full-text search
+ text_column = args.column
+ query_text = args.query
+ limit = args.limit
+
+ try:
+ builder = table.new_full_text_search_builder()
+ builder.with_text_column(text_column)
+ builder.with_query_text(query_text)
+ builder.with_limit(limit)
+ result = builder.execute_local()
+ except Exception as e:
+ print(f"Error: Full-text search failed: {e}", file=sys.stderr)
+ sys.exit(1)
+
+ if result.is_empty():
+ print("No matching rows found.")
+ return
+
+ # Read matching rows using global index result
+ read_builder = table.new_read_builder()
+
+ select_columns = args.select
+ if select_columns:
+ projection = [col.strip() for col in select_columns.split(',')]
+ available_fields = set(field.name for field in
table.table_schema.fields)
+ invalid_columns = [col for col in projection if col not in
available_fields]
+ if invalid_columns:
+ print(f"Error: Column(s) {invalid_columns} do not exist in table
'{table_identifier}'.",
+ file=sys.stderr)
+ sys.exit(1)
+ read_builder = read_builder.with_projection(projection)
+
+ scan = read_builder.new_scan().with_global_index_result(result)
+ plan = scan.plan()
+ splits = plan.splits()
+ read = read_builder.new_read()
+ df = read.to_pandas(splits)
+
+ output_format = getattr(args, 'format', 'table')
+ if output_format == 'json':
+ import json
+ print(json.dumps(df.to_dict(orient='records'), ensure_ascii=False))
+ else:
+ print(df.to_string(index=False))
+
+
def cmd_table_get(args):
"""
Execute the 'table get' command.
@@ -773,6 +850,43 @@ def add_table_subcommands(table_parser):
# table rename command
rename_parser = table_subparsers.add_parser('rename', help='Rename a
table')
+
+ # table full-text-search command
+ fts_parser = table_subparsers.add_parser('full-text-search',
help='Full-text search on a table')
+ fts_parser.add_argument(
+ 'table',
+ help='Table identifier in format: database.table'
+ )
+ fts_parser.add_argument(
+ '--column', '-c',
+ required=True,
+ help='Text column to search on'
+ )
+ fts_parser.add_argument(
+ '--query', '-q',
+ required=True,
+ help='Query text to search for'
+ )
+ fts_parser.add_argument(
+ '--limit', '-l',
+ type=int,
+ default=10,
+ help='Maximum number of results to return (default: 10)'
+ )
+ fts_parser.add_argument(
+ '--select', '-s',
+ type=str,
+ default=None,
+ help='Select specific columns to display (comma-separated, e.g.,
"id,name,content")'
+ )
+ fts_parser.add_argument(
+ '--format', '-f',
+ type=str,
+ choices=['table', 'json'],
+ default='table',
+ help='Output format: table (default) or json'
+ )
+ fts_parser.set_defaults(func=cmd_table_full_text_search)
rename_parser.add_argument(
'table',
help='Source table identifier in format: database.table'
diff --git a/paimon-python/pypaimon/globalindex/__init__.py
b/paimon-python/pypaimon/globalindex/__init__.py
index e96d2d6a80..39be69490a 100644
--- a/paimon-python/pypaimon/globalindex/__init__.py
+++ b/paimon-python/pypaimon/globalindex/__init__.py
@@ -19,6 +19,7 @@
from pypaimon.globalindex.global_index_result import GlobalIndexResult
from pypaimon.globalindex.global_index_reader import GlobalIndexReader,
FieldRef
from pypaimon.globalindex.vector_search import VectorSearch
+from pypaimon.globalindex.full_text_search import FullTextSearch
from pypaimon.globalindex.vector_search_result import (
ScoredGlobalIndexResult,
DictBasedScoredIndexResult,
@@ -27,6 +28,7 @@ from pypaimon.globalindex.vector_search_result import (
from pypaimon.globalindex.global_index_meta import GlobalIndexMeta,
GlobalIndexIOMeta
from pypaimon.globalindex.global_index_evaluator import GlobalIndexEvaluator
from pypaimon.globalindex.global_index_scanner import GlobalIndexScanner
+from pypaimon.globalindex.offset_global_index_reader import
OffsetGlobalIndexReader
from pypaimon.utils.range import Range
__all__ = [
@@ -34,6 +36,7 @@ __all__ = [
'GlobalIndexReader',
'FieldRef',
'VectorSearch',
+ 'FullTextSearch',
'ScoredGlobalIndexResult',
'DictBasedScoredIndexResult',
'ScoreGetter',
@@ -41,5 +44,6 @@ __all__ = [
'GlobalIndexIOMeta',
'GlobalIndexEvaluator',
'GlobalIndexScanner',
+ 'OffsetGlobalIndexReader',
'Range',
]
diff --git a/paimon-python/pypaimon/globalindex/full_text_search.py
b/paimon-python/pypaimon/globalindex/full_text_search.py
new file mode 100644
index 0000000000..463941a317
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/full_text_search.py
@@ -0,0 +1,53 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""FullTextSearch for performing full-text search on a text column."""
+
+from dataclasses import dataclass
+from typing import Optional
+
+
+@dataclass
+class FullTextSearch:
+ """
+ FullTextSearch to perform full-text search on a text column.
+
+ Attributes:
+ query_text: The query text to search
+ limit: Maximum number of results to return
+ field_name: Name of the text field to search
+ """
+
+ query_text: str
+ limit: int
+ field_name: str
+
+ def __post_init__(self):
+ if not self.query_text:
+ raise ValueError("Query text cannot be None or empty")
+ if self.limit <= 0:
+ raise ValueError(f"Limit must be positive, got: {self.limit}")
+ if not self.field_name:
+ raise ValueError("Field name cannot be null or empty")
+
+ def visit(self, visitor: 'GlobalIndexReader') ->
Optional['ScoredGlobalIndexResult']:
+ """Visit the global index reader with this full-text search."""
+ return visitor.visit_full_text_search(self)
+
+ def __repr__(self) -> str:
+ return f"FullTextSearch(field={self.field_name},
query='{self.query_text}', limit={self.limit})"
diff --git a/paimon-python/pypaimon/globalindex/global_index_reader.py
b/paimon-python/pypaimon/globalindex/global_index_reader.py
index cb9760ed2d..e5ba29e19b 100644
--- a/paimon-python/pypaimon/globalindex/global_index_reader.py
+++ b/paimon-python/pypaimon/globalindex/global_index_reader.py
@@ -42,6 +42,10 @@ class GlobalIndexReader(ABC):
"""Visit a vector search query."""
raise NotImplementedError("Vector search not supported by this reader")
+ def visit_full_text_search(self, full_text_search: 'FullTextSearch') ->
Optional['GlobalIndexResult']:
+ """Visit a full-text search query."""
+ raise NotImplementedError("Full-text search not supported by this
reader")
+
def visit_equal(self, field_ref: FieldRef, literal: object) ->
Optional['GlobalIndexResult']:
"""Visit an equality predicate."""
return None
diff --git a/paimon-python/pypaimon/globalindex/offset_global_index_reader.py
b/paimon-python/pypaimon/globalindex/offset_global_index_reader.py
new file mode 100644
index 0000000000..acd9a6be95
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/offset_global_index_reader.py
@@ -0,0 +1,102 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""A GlobalIndexReader that wraps another reader and applies an offset to all
row IDs."""
+
+from typing import List, Optional
+
+from pypaimon.globalindex.global_index_reader import GlobalIndexReader,
FieldRef
+from pypaimon.globalindex.global_index_result import GlobalIndexResult
+
+
+class OffsetGlobalIndexReader(GlobalIndexReader):
+ """
+ A GlobalIndexReader that wraps another reader and applies an offset
+ to all row IDs in the results.
+ """
+
+ def __init__(self, wrapped: GlobalIndexReader, offset: int, to: int):
+ self._wrapped = wrapped
+ self._offset = offset
+ self._to = to
+
+ def visit_vector_search(self, vector_search) ->
Optional[GlobalIndexResult]:
+ result = self._wrapped.visit_vector_search(
+ vector_search.offset_range(self._offset, self._to))
+ if result is not None:
+ return result.offset(self._offset)
+ return None
+
+ def visit_full_text_search(self, full_text_search) ->
Optional[GlobalIndexResult]:
+ result = self._wrapped.visit_full_text_search(full_text_search)
+ if result is not None:
+ return result.offset(self._offset)
+ return None
+
+ def visit_equal(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ return self._apply_offset(self._wrapped.visit_equal(field_ref,
literal))
+
+ def visit_not_equal(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ return self._apply_offset(self._wrapped.visit_not_equal(field_ref,
literal))
+
+ def visit_less_than(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ return self._apply_offset(self._wrapped.visit_less_than(field_ref,
literal))
+
+ def visit_less_or_equal(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ return self._apply_offset(self._wrapped.visit_less_or_equal(field_ref,
literal))
+
+ def visit_greater_than(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ return self._apply_offset(self._wrapped.visit_greater_than(field_ref,
literal))
+
+ def visit_greater_or_equal(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ return
self._apply_offset(self._wrapped.visit_greater_or_equal(field_ref, literal))
+
+ def visit_is_null(self, field_ref: FieldRef) ->
Optional[GlobalIndexResult]:
+ return self._apply_offset(self._wrapped.visit_is_null(field_ref))
+
+ def visit_is_not_null(self, field_ref: FieldRef) ->
Optional[GlobalIndexResult]:
+ return self._apply_offset(self._wrapped.visit_is_not_null(field_ref))
+
+ def visit_in(self, field_ref: FieldRef, literals: List[object]) ->
Optional[GlobalIndexResult]:
+ return self._apply_offset(self._wrapped.visit_in(field_ref, literals))
+
+ def visit_not_in(self, field_ref: FieldRef, literals: List[object]) ->
Optional[GlobalIndexResult]:
+ return self._apply_offset(self._wrapped.visit_not_in(field_ref,
literals))
+
+ def visit_starts_with(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ return self._apply_offset(self._wrapped.visit_starts_with(field_ref,
literal))
+
+ def visit_ends_with(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ return self._apply_offset(self._wrapped.visit_ends_with(field_ref,
literal))
+
+ def visit_contains(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ return self._apply_offset(self._wrapped.visit_contains(field_ref,
literal))
+
+ def visit_like(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ return self._apply_offset(self._wrapped.visit_like(field_ref, literal))
+
+ def visit_between(self, field_ref: FieldRef, min_v: object, max_v: object)
-> Optional[GlobalIndexResult]:
+ return self._apply_offset(self._wrapped.visit_between(field_ref,
min_v, max_v))
+
+ def _apply_offset(self, result: Optional[GlobalIndexResult]) ->
Optional[GlobalIndexResult]:
+ if result is not None:
+ return result.offset(self._offset)
+ return None
+
+ def close(self) -> None:
+ self._wrapped.close()
diff --git a/paimon-python/pypaimon/globalindex/__init__.py
b/paimon-python/pypaimon/globalindex/tantivy/__init__.py
similarity index 51%
copy from paimon-python/pypaimon/globalindex/__init__.py
copy to paimon-python/pypaimon/globalindex/tantivy/__init__.py
index e96d2d6a80..91054c7984 100644
--- a/paimon-python/pypaimon/globalindex/__init__.py
+++ b/paimon-python/pypaimon/globalindex/tantivy/__init__.py
@@ -16,30 +16,12 @@
# limitations under the License.
################################################################################
-from pypaimon.globalindex.global_index_result import GlobalIndexResult
-from pypaimon.globalindex.global_index_reader import GlobalIndexReader,
FieldRef
-from pypaimon.globalindex.vector_search import VectorSearch
-from pypaimon.globalindex.vector_search_result import (
- ScoredGlobalIndexResult,
- DictBasedScoredIndexResult,
- ScoreGetter,
+from pypaimon.globalindex.tantivy.tantivy_full_text_global_index_reader import
(
+ TantivyFullTextGlobalIndexReader,
+ TANTIVY_FULLTEXT_IDENTIFIER,
)
-from pypaimon.globalindex.global_index_meta import GlobalIndexMeta,
GlobalIndexIOMeta
-from pypaimon.globalindex.global_index_evaluator import GlobalIndexEvaluator
-from pypaimon.globalindex.global_index_scanner import GlobalIndexScanner
-from pypaimon.utils.range import Range
__all__ = [
- 'GlobalIndexResult',
- 'GlobalIndexReader',
- 'FieldRef',
- 'VectorSearch',
- 'ScoredGlobalIndexResult',
- 'DictBasedScoredIndexResult',
- 'ScoreGetter',
- 'GlobalIndexMeta',
- 'GlobalIndexIOMeta',
- 'GlobalIndexEvaluator',
- 'GlobalIndexScanner',
- 'Range',
+ 'TantivyFullTextGlobalIndexReader',
+ 'TANTIVY_FULLTEXT_IDENTIFIER',
]
diff --git
a/paimon-python/pypaimon/globalindex/tantivy/tantivy_full_text_global_index_reader.py
b/paimon-python/pypaimon/globalindex/tantivy/tantivy_full_text_global_index_reader.py
new file mode 100644
index 0000000000..c8b1711b59
--- /dev/null
+++
b/paimon-python/pypaimon/globalindex/tantivy/tantivy_full_text_global_index_reader.py
@@ -0,0 +1,294 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Full-text global index reader using Tantivy.
+
+Reads the archive header to get file layout, then opens a Tantivy searcher
+backed by a stream-based Directory. No temp files are created on disk.
+"""
+
+import os
+import struct
+import threading
+from typing import Dict, List, Optional
+
+from pypaimon.globalindex.global_index_reader import GlobalIndexReader,
FieldRef
+from pypaimon.globalindex.global_index_result import GlobalIndexResult
+from pypaimon.globalindex.vector_search_result import (
+ ScoredGlobalIndexResult,
+ DictBasedScoredIndexResult,
+)
+from pypaimon.globalindex.global_index_meta import GlobalIndexIOMeta
+
+TANTIVY_FULLTEXT_IDENTIFIER = "tantivy-fulltext"
+
+
+class StreamDirectory:
+ """Directory backed by a seekable input stream.
+
+ Implements the Python Directory protocol expected by tantivy-py's
+ support_directory branch. Reads file data on demand via seek+read
+ on the underlying stream, avoiding loading the entire archive into memory.
+
+ Thread-safe: all seek+read pairs are serialized by a lock.
+ """
+
+ def __init__(self, stream, file_names: List[str],
+ file_offsets: List[int], file_lengths: List[int]):
+ self._stream = stream
+ self._lock = threading.Lock()
+ self._layout: Dict[str, tuple] = {}
+ for name, offset, length in zip(file_names, file_offsets,
file_lengths):
+ self._layout[name] = (offset, length)
+ # In-memory store for files written by tantivy (e.g. lock files, meta)
+ self._mem_files: Dict[str, bytes] = {}
+ # In-memory writers: writer_id -> (path, buffer)
+ self._writers: Dict[int, tuple] = {}
+ self._next_writer_id = 0
+
+ def get_file_handle(self, path: str) -> bytes:
+ if path in self._mem_files:
+ return self._mem_files[path]
+ if path not in self._layout:
+ raise FileNotFoundError(path)
+ offset, length = self._layout[path]
+ with self._lock:
+ self._stream.seek(offset)
+ return self._read_fully(length)
+
+ def exists(self, path: str) -> bool:
+ return path in self._layout or path in self._mem_files
+
+ def atomic_read(self, path: str) -> bytes:
+ if path in self._mem_files:
+ return self._mem_files[path]
+ if path not in self._layout:
+ raise FileNotFoundError(path)
+ offset, length = self._layout[path]
+ with self._lock:
+ self._stream.seek(offset)
+ return self._read_fully(length)
+
+ def atomic_write(self, path: str, data: bytes) -> None:
+ self._mem_files[path] = data
+
+ def open_write(self, path: str) -> int:
+ writer_id = self._next_writer_id
+ self._next_writer_id += 1
+ self._writers[writer_id] = (path, bytearray())
+ return writer_id
+
+ def delete(self, path: str) -> None:
+ self._mem_files.pop(path, None)
+
+ def write(self, writer_id: int, data: bytes) -> None:
+ self._writers[writer_id][1].extend(data)
+
+ def flush(self, writer_id: int) -> None:
+ pass
+
+ def terminate(self, writer_id: int) -> None:
+ path, buf = self._writers.pop(writer_id)
+ self._mem_files[path] = bytes(buf)
+
+ def sync_directory(self) -> None:
+ pass
+
+ def close(self, writer_id: int) -> None:
+ """Called when a writer is dropped (e.g., lock files dropped without
terminate)."""
+ self._writers.pop(writer_id, None)
+
+ def _read_fully(self, length: int) -> bytes:
+ buf = bytearray()
+ remaining = length
+ while remaining > 0:
+ chunk = self._stream.read(remaining)
+ if not chunk:
+ raise IOError("Unexpected end of stream")
+ buf.extend(chunk)
+ remaining -= len(chunk)
+ return bytes(buf)
+
+
+class TantivyFullTextGlobalIndexReader(GlobalIndexReader):
+ """Full-text global index reader using Tantivy.
+
+ Reads the archive header to get file layout, then opens a Tantivy searcher
+ backed by stream-based callbacks. No temp files are created.
+
+ Archive format (big-endian):
+ [fileCount(4)] then for each file: [nameLen(4)] [name(utf8)] [dataLen(8)]
[data]
+ """
+
+ def __init__(self, file_io, index_path: str, io_metas:
List[GlobalIndexIOMeta]):
+ assert len(io_metas) == 1, "Expected exactly one index file per shard"
+ self._file_io = file_io
+ self._index_path = index_path
+ self._io_meta = io_metas[0]
+ self._searcher = None
+ self._index = None
+ self._stream = None
+
+ def visit_full_text_search(self, full_text_search) ->
Optional[ScoredGlobalIndexResult]:
+ self._ensure_loaded()
+
+ query_text = full_text_search.query_text
+ limit = full_text_search.limit
+
+ searcher = self._searcher
+ query = self._index.parse_query(query_text, ["text"])
+ results = searcher.search(query, limit)
+
+ id_to_scores: Dict[int, float] = {}
+ for score, doc_address in results.hits:
+ doc = searcher.doc(doc_address)
+ row_id = doc["row_id"][0]
+ id_to_scores[row_id] = score
+
+ return DictBasedScoredIndexResult(id_to_scores)
+
+ def _ensure_loaded(self):
+ if self._searcher is not None:
+ return
+
+ import tantivy
+
+ # Open the archive stream
+ file_path = os.path.join(self._index_path, self._io_meta.file_name)
+ stream = self._file_io.new_input_stream(file_path)
+ try:
+ # Parse archive header to get file layout
+ file_names, file_offsets, file_lengths =
self._parse_archive_header(stream)
+ directory = StreamDirectory(stream, file_names, file_offsets,
file_lengths)
+
+ # Open tantivy index from stream-backed directory
+ schema_builder = tantivy.SchemaBuilder()
+ schema_builder.add_unsigned_field("row_id", stored=True,
indexed=True, fast=True)
+ schema_builder.add_text_field("text", stored=False)
+ schema = schema_builder.build()
+
+ self._index = tantivy.Index(schema, directory=directory)
+ self._index.reload()
+ self._searcher = self._index.searcher()
+ self._stream = stream
+ except Exception:
+ stream.close()
+ raise
+
+ @staticmethod
+ def _parse_archive_header(stream):
+ """Parse the archive header to extract file names, offsets, and
lengths.
+
+ Only reads the header; does not load file data into memory.
+ Computes the absolute byte offset of each file's data within the
stream.
+ """
+ file_count = _read_int(stream)
+ file_names = []
+ file_offsets = []
+ file_lengths = []
+
+ for _ in range(file_count):
+ name_len = _read_int(stream)
+ name_bytes = _read_fully(stream, name_len)
+ file_names.append(name_bytes.decode('utf-8'))
+
+ data_len = _read_long(stream)
+ data_offset = stream.tell()
+ file_offsets.append(data_offset)
+ file_lengths.append(data_len)
+
+ # Skip past the file data
+ stream.seek(data_offset + data_len)
+
+ return file_names, file_offsets, file_lengths
+
+ # =================== unsupported =====================
+
+ def visit_equal(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ return None
+
+ def visit_not_equal(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ return None
+
+ def visit_less_than(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ return None
+
+ def visit_less_or_equal(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ return None
+
+ def visit_greater_than(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ return None
+
+ def visit_greater_or_equal(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ return None
+
+ def visit_is_null(self, field_ref: FieldRef) ->
Optional[GlobalIndexResult]:
+ return None
+
+ def visit_is_not_null(self, field_ref: FieldRef) ->
Optional[GlobalIndexResult]:
+ return None
+
+ def visit_in(self, field_ref: FieldRef, literals: List[object]) ->
Optional[GlobalIndexResult]:
+ return None
+
+ def visit_not_in(self, field_ref: FieldRef, literals: List[object]) ->
Optional[GlobalIndexResult]:
+ return None
+
+ def visit_starts_with(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ return None
+
+ def visit_ends_with(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ return None
+
+ def visit_contains(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ return None
+
+ def visit_like(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ return None
+
+ def visit_between(self, field_ref: FieldRef, min_v: object, max_v: object)
-> Optional[GlobalIndexResult]:
+ return None
+
+ def close(self) -> None:
+ self._searcher = None
+ self._index = None
+ if self._stream is not None:
+ self._stream.close()
+ self._stream = None
+
+
+def _read_int(stream) -> int:
+ data = _read_fully(stream, 4)
+ return struct.unpack('>i', data)[0]
+
+
+def _read_long(stream) -> int:
+ data = _read_fully(stream, 8)
+ return struct.unpack('>q', data)[0]
+
+
+def _read_fully(stream, length: int) -> bytes:
+ buf = bytearray()
+ remaining = length
+ while remaining > 0:
+ chunk = stream.read(remaining)
+ if not chunk:
+ raise IOError("Unexpected end of stream")
+ buf.extend(chunk)
+ remaining -= len(chunk)
+ return bytes(buf)
diff --git a/paimon-python/pypaimon/globalindex/vector_search_result.py
b/paimon-python/pypaimon/globalindex/vector_search_result.py
index 60afe331a9..a93247e690 100644
--- a/paimon-python/pypaimon/globalindex/vector_search_result.py
+++ b/paimon-python/pypaimon/globalindex/vector_search_result.py
@@ -78,6 +78,37 @@ class ScoredGlobalIndexResult(GlobalIndexResult):
return SimpleScoredGlobalIndexResult(result_or, combined_score_getter)
+ def top_k(self, k: int) -> 'ScoredGlobalIndexResult':
+ """Return the top-k results by score."""
+ import heapq
+
+ row_ids = self.results()
+ if row_ids.cardinality() <= k:
+ return self
+
+ score_getter_fn = self.score_getter()
+ # Use a min-heap of size k to find top-k scores in O(n log k)
+ heap = []
+ for row_id in row_ids:
+ score = score_getter_fn(row_id)
+ if score is None:
+ score = 0.0
+ if len(heap) < k:
+ heapq.heappush(heap, (score, row_id))
+ elif score > heap[0][0]:
+ heapq.heapreplace(heap, (score, row_id))
+
+ top_k_bitmap = RoaringBitmap64()
+ for _, row_id in heap:
+ top_k_bitmap.add(row_id)
+
+ return SimpleScoredGlobalIndexResult(top_k_bitmap, score_getter_fn)
+
+ @staticmethod
+ def create_empty() -> 'ScoredGlobalIndexResult':
+ """Returns an empty ScoredGlobalIndexResult."""
+ return SimpleScoredGlobalIndexResult(RoaringBitmap64(), lambda row_id:
0.0)
+
@staticmethod
def create(
supplier: Callable[[], RoaringBitmap64],
diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py
b/paimon-python/pypaimon/read/scanner/file_scanner.py
index aea709669e..b0293dac41 100755
--- a/paimon-python/pypaimon/read/scanner/file_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/file_scanner.py
@@ -199,6 +199,7 @@ class FileScanner:
self.only_read_real_buckets = options.bucket() ==
BucketMode.POSTPONE_BUCKET.value
self.data_evolution = options.data_evolution_enabled()
self.deletion_vectors_enabled = options.deletion_vectors_enabled()
+ self._global_index_result = None
def schema_fields_func(schema_id: int):
return self.table.schema_manager.get_schema(schema_id).fields
@@ -262,7 +263,8 @@ class FileScanner:
def _create_data_evolution_split_generator(self):
row_ranges = None
score_getter = None
- global_index_result = self._eval_global_index()
+ global_index_result = self._global_index_result if
self._global_index_result is not None \
+ else self._eval_global_index()
if global_index_result is not None:
row_ranges = global_index_result.results().to_range_list()
if isinstance(global_index_result, ScoredGlobalIndexResult):
@@ -347,6 +349,10 @@ class FileScanner:
self.end_pos_of_this_subtask = end_pos
return self
+ def with_global_index_result(self, result) -> 'FileScanner':
+ self._global_index_result = result
+ return self
+
def _apply_push_down_limit(self, splits: List[DataSplit]) ->
List[DataSplit]:
if self.limit is None:
return splits
diff --git a/paimon-python/pypaimon/read/table_scan.py
b/paimon-python/pypaimon/read/table_scan.py
index 416ef1717d..9579f875cb 100755
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -131,3 +131,7 @@ class TableScan:
def with_slice(self, start_pos, end_pos) -> 'TableScan':
self.file_scanner.with_slice(start_pos, end_pos)
return self
+
+ def with_global_index_result(self, result) -> 'TableScan':
+ self.file_scanner.with_global_index_result(result)
+ return self
diff --git a/paimon-python/pypaimon/table/file_store_table.py
b/paimon-python/pypaimon/table/file_store_table.py
index 9115c8e885..61a1455f36 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -369,6 +369,10 @@ class FileStoreTable(Table):
def new_stream_write_builder(self) -> StreamWriteBuilder:
return StreamWriteBuilder(self)
+ def new_full_text_search_builder(self) -> 'FullTextSearchBuilder':
+ from pypaimon.table.source.full_text_search_builder import
FullTextSearchBuilderImpl
+ return FullTextSearchBuilderImpl(self)
+
def create_row_key_extractor(self) -> RowKeyExtractor:
bucket_mode = self.bucket_mode()
if bucket_mode == BucketMode.HASH_FIXED:
diff --git a/paimon-python/pypaimon/table/format/format_table.py
b/paimon-python/pypaimon/table/format/format_table.py
index 8c7e82415e..41767e1da8 100644
--- a/paimon-python/pypaimon/table/format/format_table.py
+++ b/paimon-python/pypaimon/table/format/format_table.py
@@ -102,3 +102,6 @@ class FormatTable(Table):
def new_stream_write_builder(self):
raise NotImplementedError("Format table does not support stream
write.")
+
+ def new_full_text_search_builder(self):
+ raise NotImplementedError("Format table does not support full text
search.")
diff --git a/paimon-python/pypaimon/table/iceberg/iceberg_table.py
b/paimon-python/pypaimon/table/iceberg/iceberg_table.py
index 137b7c8622..8e0ddfa07b 100644
--- a/paimon-python/pypaimon/table/iceberg/iceberg_table.py
+++ b/paimon-python/pypaimon/table/iceberg/iceberg_table.py
@@ -107,3 +107,6 @@ class IcebergTable(Table):
raise NotImplementedError(
"IcebergTable does not support stream write operation in
paimon-python yet."
)
+
+ def new_full_text_search_builder(self):
+ raise NotImplementedError("IcebergTable does not support full text
search.")
diff --git a/paimon-python/pypaimon/table/object/object_table.py
b/paimon-python/pypaimon/table/object/object_table.py
index f3dc44b761..df6d6f2f84 100644
--- a/paimon-python/pypaimon/table/object/object_table.py
+++ b/paimon-python/pypaimon/table/object/object_table.py
@@ -101,3 +101,8 @@ class ObjectTable(Table):
raise NotImplementedError(
"ObjectTable is read-only and does not support stream write."
)
+
+ def new_full_text_search_builder(self):
+ raise NotImplementedError(
+ "ObjectTable is read-only and does not support full text search."
+ )
diff --git a/paimon-python/pypaimon/table/source/full_text_read.py
b/paimon-python/pypaimon/table/source/full_text_read.py
new file mode 100644
index 0000000000..9be1c2e4ca
--- /dev/null
+++ b/paimon-python/pypaimon/table/source/full_text_read.py
@@ -0,0 +1,120 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Full-text read to read index files."""
+
+from abc import ABC, abstractmethod
+from typing import List, Optional
+
+from pypaimon.globalindex.full_text_search import FullTextSearch
+from pypaimon.globalindex.global_index_meta import GlobalIndexIOMeta
+from pypaimon.globalindex.global_index_result import GlobalIndexResult
+from pypaimon.globalindex.offset_global_index_reader import
OffsetGlobalIndexReader
+from pypaimon.globalindex.vector_search_result import ScoredGlobalIndexResult
+from pypaimon.table.source.full_text_search_split import FullTextSearchSplit
+from pypaimon.table.source.full_text_scan import FullTextScanPlan
+
+
+class FullTextRead(ABC):
+ """Full-text read to read index files."""
+
+ def read_plan(self, plan: FullTextScanPlan) -> GlobalIndexResult:
+ return self.read(plan.splits())
+
+ @abstractmethod
+ def read(self, splits: List[FullTextSearchSplit]) -> GlobalIndexResult:
+ pass
+
+
+class FullTextReadImpl(FullTextRead):
+ """Implementation for FullTextRead."""
+
+ def __init__(
+ self,
+ table: 'FileStoreTable',
+ limit: int,
+ text_column: 'DataField',
+ query_text: str
+ ):
+ self._table = table
+ self._limit = limit
+ self._text_column = text_column
+ self._query_text = query_text
+
+ def read(self, splits: List[FullTextSearchSplit]) -> GlobalIndexResult:
+ if not splits:
+ return GlobalIndexResult.create_empty()
+
+ result = ScoredGlobalIndexResult.create_empty()
+ for split in splits:
+ split_result = self._eval(
+ split.row_range_start, split.row_range_end,
+ split.full_text_index_files
+ )
+ if split_result is not None:
+ result = result.or_(split_result)
+
+ return result.top_k(self._limit)
+
+ def _eval(self, row_range_start, row_range_end, full_text_index_files
+ ) -> Optional[ScoredGlobalIndexResult]:
+ index_io_meta_list = []
+ for index_file in full_text_index_files:
+ meta = index_file.global_index_meta
+ assert meta is not None
+ index_io_meta_list.append(
+ GlobalIndexIOMeta(
+ file_name=index_file.file_name,
+ file_size=index_file.file_size,
+ metadata=meta.index_meta
+ )
+ )
+
+ index_type = full_text_index_files[0].index_type
+ index_path =
self._table.path_factory().global_index_path_factory().index_path()
+ file_io = self._table.file_io
+
+ reader = _create_full_text_reader(
+ index_type, file_io, index_path,
+ index_io_meta_list
+ )
+
+ full_text_search = FullTextSearch(
+ query_text=self._query_text,
+ limit=self._limit,
+ field_name=self._text_column.name
+ )
+
+ try:
+ offset_reader = OffsetGlobalIndexReader(reader, row_range_start,
row_range_end)
+ return offset_reader.visit_full_text_search(full_text_search)
+ finally:
+ reader.close()
+
+
+def _create_full_text_reader(index_type, file_io, index_path,
index_io_meta_list):
+ """Create a global index reader for full-text search."""
+ from pypaimon.globalindex.tantivy.tantivy_full_text_global_index_reader
import (
+ TANTIVY_FULLTEXT_IDENTIFIER,
+ )
+ if index_type == TANTIVY_FULLTEXT_IDENTIFIER:
+ from
pypaimon.globalindex.tantivy.tantivy_full_text_global_index_reader import (
+ TantivyFullTextGlobalIndexReader,
+ )
+ return TantivyFullTextGlobalIndexReader(file_io, index_path,
index_io_meta_list)
+ raise ValueError(f"Unsupported full-text index type: '{index_type}'")
diff --git a/paimon-python/pypaimon/table/source/full_text_scan.py
b/paimon-python/pypaimon/table/source/full_text_scan.py
new file mode 100644
index 0000000000..06f72405b3
--- /dev/null
+++ b/paimon-python/pypaimon/table/source/full_text_scan.py
@@ -0,0 +1,93 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Full-text scan to scan index files."""
+
+from abc import ABC, abstractmethod
+from collections import defaultdict
+from typing import List
+
+from pypaimon.table.source.full_text_search_split import FullTextSearchSplit
+from pypaimon.utils.range import Range
+
+
+class FullTextScanPlan:
+ """Plan of full-text scan."""
+
+ def __init__(self, splits: List[FullTextSearchSplit]):
+ self._splits = splits
+
+ def splits(self) -> List[FullTextSearchSplit]:
+ return self._splits
+
+
+class FullTextScan(ABC):
+ """Full-text scan to scan index files."""
+
+ @abstractmethod
+ def scan(self) -> FullTextScanPlan:
+ pass
+
+
+class FullTextScanImpl(FullTextScan):
+ """Implementation for FullTextScan."""
+
+ def __init__(self, table: 'FileStoreTable', text_column: 'DataField'):
+ self._table = table
+ self._text_column = text_column
+
+ def scan(self) -> FullTextScanPlan:
+ from pypaimon.index.index_file_handler import IndexFileHandler
+ from pypaimon.snapshot.snapshot_manager import SnapshotManager
+
+ text_column = self._text_column
+ snapshot = SnapshotManager(self._table).get_latest_snapshot()
+
+ from pypaimon.snapshot.time_travel_util import TimeTravelUtil
+ from pypaimon.common.options.options import Options
+ travel_snapshot = TimeTravelUtil.try_travel_to_snapshot(
+ Options(self._table.table_schema.options),
+ self._table.tag_manager()
+ )
+ if travel_snapshot is not None:
+ snapshot = travel_snapshot
+
+ index_file_handler = IndexFileHandler(table=self._table)
+
+ def index_file_filter(entry):
+ global_index_meta = entry.index_file.global_index_meta
+ if global_index_meta is None:
+ return False
+ return text_column.id == global_index_meta.index_field_id
+
+ entries = index_file_handler.scan(snapshot, index_file_filter)
+ all_index_files = [entry.index_file for entry in entries]
+
+ # Group full-text index files by (rowRangeStart, rowRangeEnd)
+ by_range = defaultdict(list)
+ for index_file in all_index_files:
+ meta = index_file.global_index_meta
+ assert meta is not None
+ range_key = Range(meta.row_range_start, meta.row_range_end)
+ by_range[range_key].append(index_file)
+
+ splits = []
+ for range_key, files in by_range.items():
+ splits.append(FullTextSearchSplit(range_key.from_, range_key.to,
files))
+
+ return FullTextScanPlan(splits)
diff --git a/paimon-python/pypaimon/table/source/full_text_search_builder.py
b/paimon-python/pypaimon/table/source/full_text_search_builder.py
new file mode 100644
index 0000000000..70bdb28a46
--- /dev/null
+++ b/paimon-python/pypaimon/table/source/full_text_search_builder.py
@@ -0,0 +1,100 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Builder to build full-text search."""
+
+from abc import ABC, abstractmethod
+from typing import Optional
+
+from pypaimon.globalindex.global_index_result import GlobalIndexResult
+from pypaimon.table.source.full_text_read import FullTextRead, FullTextReadImpl
+from pypaimon.table.source.full_text_scan import FullTextScan, FullTextScanImpl
+
+
+class FullTextSearchBuilder(ABC):
+ """Builder to build full-text search."""
+
+ @abstractmethod
+ def with_limit(self, limit: int) -> 'FullTextSearchBuilder':
+ """The top k results to return."""
+ pass
+
+ @abstractmethod
+ def with_text_column(self, name: str) -> 'FullTextSearchBuilder':
+ """The text column to search."""
+ pass
+
+ @abstractmethod
+ def with_query_text(self, query_text: str) -> 'FullTextSearchBuilder':
+ """The query text to search."""
+ pass
+
+ @abstractmethod
+ def new_full_text_scan(self) -> FullTextScan:
+ """Create full-text scan to scan index files."""
+ pass
+
+ @abstractmethod
+ def new_full_text_read(self) -> FullTextRead:
+ """Create full-text read to read index files."""
+ pass
+
+ def execute_local(self) -> GlobalIndexResult:
+ """Execute full-text index search in local."""
+ return
self.new_full_text_read().read_plan(self.new_full_text_scan().scan())
+
+
+class FullTextSearchBuilderImpl(FullTextSearchBuilder):
+ """Implementation for FullTextSearchBuilder."""
+
+ def __init__(self, table: 'FileStoreTable'):
+ self._table = table
+ self._limit: int = 0
+ self._text_column: Optional['DataField'] = None
+ self._query_text: Optional[str] = None
+
+ def with_limit(self, limit: int) -> 'FullTextSearchBuilder':
+ self._limit = limit
+ return self
+
+ def with_text_column(self, name: str) -> 'FullTextSearchBuilder':
+ field_dict = {f.name: f for f in self._table.fields}
+ if name not in field_dict:
+ raise ValueError(f"Text column '{name}' not found in table schema")
+ self._text_column = field_dict[name]
+ return self
+
+ def with_query_text(self, query_text: str) -> 'FullTextSearchBuilder':
+ self._query_text = query_text
+ return self
+
+ def new_full_text_scan(self) -> FullTextScan:
+ if self._text_column is None:
+ raise ValueError("Text column must be set via with_text_column()")
+ return FullTextScanImpl(self._table, self._text_column)
+
+ def new_full_text_read(self) -> FullTextRead:
+ if self._limit <= 0:
+ raise ValueError("Limit must be positive, set via with_limit()")
+ if self._text_column is None:
+ raise ValueError("Text column must be set via with_text_column()")
+ if self._query_text is None:
+ raise ValueError("Query text must be set via with_query_text()")
+ return FullTextReadImpl(
+ self._table, self._limit, self._text_column, self._query_text
+ )
diff --git a/paimon-python/pypaimon/table/source/full_text_search_split.py
b/paimon-python/pypaimon/table/source/full_text_search_split.py
new file mode 100644
index 0000000000..b3ce272aa1
--- /dev/null
+++ b/paimon-python/pypaimon/table/source/full_text_search_split.py
@@ -0,0 +1,53 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Split of full-text search."""
+
+from dataclasses import dataclass
+from typing import List
+
+from pypaimon.index.index_file_meta import IndexFileMeta
+
+
+@dataclass
+class FullTextSearchSplit:
+ """Split of full-text search."""
+
+ row_range_start: int
+ row_range_end: int
+ full_text_index_files: List[IndexFileMeta]
+
+ def __eq__(self, other):
+ if not isinstance(other, FullTextSearchSplit):
+ return False
+ return (
+ self.row_range_start == other.row_range_start
+ and self.row_range_end == other.row_range_end
+ and self.full_text_index_files == other.full_text_index_files
+ )
+
+ def __hash__(self):
+ return hash((self.row_range_start, self.row_range_end,
tuple(self.full_text_index_files)))
+
+ def __repr__(self):
+ return (
+ f"FullTextSearchSplit("
+ f"row_range_start={self.row_range_start}, "
+ f"row_range_end={self.row_range_end}, "
+ f"full_text_index_files={self.full_text_index_files})"
+ )
diff --git a/paimon-python/pypaimon/table/table.py
b/paimon-python/pypaimon/table/table.py
index dfae232810..68ad3a9d13 100644
--- a/paimon-python/pypaimon/table/table.py
+++ b/paimon-python/pypaimon/table/table.py
@@ -20,6 +20,7 @@ from abc import ABC, abstractmethod
from pypaimon.read.read_builder import ReadBuilder
from pypaimon.read.stream_read_builder import StreamReadBuilder
+from pypaimon.table.source.full_text_search_builder import
FullTextSearchBuilder
from pypaimon.write.write_builder import BatchWriteBuilder, StreamWriteBuilder
@@ -41,3 +42,7 @@ class Table(ABC):
@abstractmethod
def new_stream_write_builder(self) -> StreamWriteBuilder:
"""Returns a builder for building stream table write and table
commit."""
+
+ @abstractmethod
+ def new_full_text_search_builder(self) -> FullTextSearchBuilder:
+ """Returns a new full-text search builder."""
diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
index 4d888c3cb4..61cd48ca67 100644
--- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
+++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
@@ -496,3 +496,71 @@ class JavaPyReadWriteTest(unittest.TestCase):
table_read.to_arrow(splits)
self.assertIn(file_format, str(ctx.exception))
self.assertIn("not yet supported", str(ctx.exception))
+
+ def test_read_tantivy_full_text_index(self):
+ """Test reading a Tantivy full-text index built by Java."""
+ table = self.catalog.get_table('default.test_tantivy_fulltext')
+
+ # Use FullTextSearchBuilder to search
+ builder = table.new_full_text_search_builder()
+ builder.with_text_column('content')
+ builder.with_query_text('paimon')
+ builder.with_limit(10)
+
+ result = builder.execute_local()
+ # Row 0, 2, 4 mention "paimon"
+ row_ids = sorted(list(result.results()))
+ print(f"Tantivy full-text search for 'paimon': row_ids={row_ids}")
+ self.assertEqual(row_ids, [0, 2, 4])
+
+ # Read matching rows using withGlobalIndexResult
+ read_builder = table.new_read_builder()
+ scan = read_builder.new_scan().with_global_index_result(result)
+ plan = scan.plan()
+ table_read = read_builder.new_read()
+ pa_table = table_read.to_arrow(plan.splits())
+ pa_table = table_sort_by(pa_table, 'id')
+ self.assertEqual(pa_table.num_rows, 3)
+ ids = pa_table.column('id').to_pylist()
+ self.assertEqual(ids, [0, 2, 4])
+
+ # Search for "tantivy" - only row 1
+ builder2 = table.new_full_text_search_builder()
+ builder2.with_text_column('content')
+ builder2.with_query_text('tantivy')
+ builder2.with_limit(10)
+
+ result2 = builder2.execute_local()
+ row_ids2 = sorted(list(result2.results()))
+ print(f"Tantivy full-text search for 'tantivy': row_ids={row_ids2}")
+ self.assertEqual(row_ids2, [1])
+
+ # Read matching rows
+ read_builder2 = table.new_read_builder()
+ scan2 = read_builder2.new_scan().with_global_index_result(result2)
+ plan2 = scan2.plan()
+ pa_table2 = read_builder2.new_read().to_arrow(plan2.splits())
+ self.assertEqual(pa_table2.num_rows, 1)
+ self.assertEqual(pa_table2.column('id').to_pylist(), [1])
+
+ # Search for "full-text search" - rows 1, 3
+ builder3 = table.new_full_text_search_builder()
+ builder3.with_text_column('content')
+ builder3.with_query_text('full-text search')
+ builder3.with_limit(10)
+
+ result3 = builder3.execute_local()
+ row_ids3 = sorted(list(result3.results()))
+ print(f"Tantivy full-text search for 'full-text search':
row_ids={row_ids3}")
+ self.assertIn(1, row_ids3)
+ self.assertIn(3, row_ids3)
+
+ # Read matching rows
+ read_builder3 = table.new_read_builder()
+ scan3 = read_builder3.new_scan().with_global_index_result(result3)
+ plan3 = scan3.plan()
+ pa_table3 = read_builder3.new_read().to_arrow(plan3.splits())
+ pa_table3 = table_sort_by(pa_table3, 'id')
+ ids3 = pa_table3.column('id').to_pylist()
+ self.assertIn(1, ids3)
+ self.assertIn(3, ids3)
diff --git a/paimon-tantivy/paimon-tantivy-index/pom.xml
b/paimon-tantivy/paimon-tantivy-index/pom.xml
index b4cd732546..e416d9ef5c 100644
--- a/paimon-tantivy/paimon-tantivy-index/pom.xml
+++ b/paimon-tantivy/paimon-tantivy-index/pom.xml
@@ -60,6 +60,22 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-format</artifactId>
@@ -73,6 +89,93 @@ under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>jdk.tools</groupId>
+ <artifactId>jdk.tools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>ch.qos.reload4j</groupId>
+ <artifactId>reload4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-reload4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>jdk.tools</groupId>
+ <artifactId>jdk.tools</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
diff --git
a/paimon-tantivy/paimon-tantivy-index/src/test/java/org/apache/paimon/tantivy/index/JavaPyTantivyE2ETest.java
b/paimon-tantivy/paimon-tantivy-index/src/test/java/org/apache/paimon/tantivy/index/JavaPyTantivyE2ETest.java
new file mode 100644
index 0000000000..fa46e898c3
--- /dev/null
+++
b/paimon-tantivy/paimon-tantivy-index/src/test/java/org/apache/paimon/tantivy/index/JavaPyTantivyE2ETest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tantivy.index;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.FileIOFinder;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.globalindex.GlobalIndexBuilderUtils;
+import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
+import org.apache.paimon.globalindex.ResultEntry;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.tantivy.NativeLoader;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Range;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.paimon.CoreOptions.DATA_EVOLUTION_ENABLED;
+import static org.apache.paimon.CoreOptions.GLOBAL_INDEX_ENABLED;
+import static org.apache.paimon.CoreOptions.PATH;
+import static org.apache.paimon.CoreOptions.ROW_TRACKING_ENABLED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/**
+ * Mixed language E2E test for Java Tantivy full-text index building and
Python reading.
+ *
+ * <p>Java writes data and builds a Tantivy full-text index, then Python reads
and searches it.
+ */
+public class JavaPyTantivyE2ETest {
+
+ @BeforeAll
+ public static void checkNativeLibrary() {
+ assumeTrue(isNativeAvailable(), "Tantivy native library not available,
skipping tests");
+ }
+
+ private static boolean isNativeAvailable() {
+ try {
+ NativeLoader.loadJni();
+ return true;
+ } catch (Throwable t) {
+ return false;
+ }
+ }
+
+ java.nio.file.Path tempDir =
+
Paths.get("../../paimon-python/pypaimon/tests/e2e").toAbsolutePath();
+
+ protected Path warehouse;
+
+ @BeforeEach
+ public void before() throws Exception {
+ if (!Files.exists(tempDir.resolve("warehouse"))) {
+ Files.createDirectories(tempDir.resolve("warehouse"));
+ }
+ warehouse = new Path("file://" + tempDir.resolve("warehouse"));
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+ public void testTantivyFullTextIndexWrite() throws Exception {
+ String tableName = "test_tantivy_fulltext";
+ Path tablePath = new Path(warehouse.toString() + "/default.db/" +
tableName);
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.STRING()},
+ new String[] {"id", "content"});
+
+ Options options = new Options();
+ options.set(PATH, tablePath.toString());
+ options.set(ROW_TRACKING_ENABLED, true);
+ options.set(DATA_EVOLUTION_ENABLED, true);
+ options.set(GLOBAL_INDEX_ENABLED, true);
+
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), tablePath),
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ options.toMap(),
+ ""));
+
+ AppendOnlyFileStoreTable table =
+ new AppendOnlyFileStoreTable(
+ FileIOFinder.find(tablePath),
+ tablePath,
+ tableSchema,
+ CatalogEnvironment.empty());
+
+ // Write data
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.write(
+ GenericRow.of(
+ 0,
+ BinaryString.fromString(
+ "Apache Paimon is a streaming data lake
platform")));
+ write.write(
+ GenericRow.of(
+ 1,
+ BinaryString.fromString(
+ "Tantivy is a full-text search engine
written in Rust")));
+ write.write(
+ GenericRow.of(
+ 2,
+ BinaryString.fromString(
+ "Paimon supports real-time data ingestion
and analytics")));
+ write.write(
+ GenericRow.of(
+ 3,
+ BinaryString.fromString(
+ "Full-text search enables efficient text
retrieval")));
+ write.write(
+ GenericRow.of(
+ 4,
+ BinaryString.fromString(
+ "Data lake platforms like Paimon handle
large-scale data")));
+ commit.commit(write.prepareCommit());
+ }
+
+ // Build tantivy full-text index on the "content" column
+ DataField contentField = table.rowType().getField("content");
+ Options indexOptions = table.coreOptions().toConfiguration();
+
+ GlobalIndexSingletonWriter writer =
+ (GlobalIndexSingletonWriter)
+ GlobalIndexBuilderUtils.createIndexWriter(
+ table,
+ TantivyFullTextGlobalIndexerFactory.IDENTIFIER,
+ contentField,
+ indexOptions);
+
+ // Write the same text data to the index
+ writer.write(BinaryString.fromString("Apache Paimon is a streaming
data lake platform"));
+ writer.write(
+ BinaryString.fromString("Tantivy is a full-text search engine
written in Rust"));
+ writer.write(
+ BinaryString.fromString("Paimon supports real-time data
ingestion and analytics"));
+ writer.write(BinaryString.fromString("Full-text search enables
efficient text retrieval"));
+ writer.write(
+ BinaryString.fromString("Data lake platforms like Paimon
handle large-scale data"));
+
+ List<ResultEntry> entries = writer.finish();
+ assertThat(entries).hasSize(1);
+ assertThat(entries.get(0).rowCount()).isEqualTo(5);
+
+ Range rowRange = new Range(0, 4);
+ List<IndexFileMeta> indexFiles =
+ GlobalIndexBuilderUtils.toIndexFileMetas(
+ table.fileIO(),
+ table.store().pathFactory().globalIndexFileFactory(),
+ table.coreOptions(),
+ rowRange,
+ contentField.id(),
+ TantivyFullTextGlobalIndexerFactory.IDENTIFIER,
+ entries);
+
+ // Commit the index
+ DataIncrement dataIncrement = DataIncrement.indexIncrement(indexFiles);
+ CommitMessage message =
+ new CommitMessageImpl(
+ BinaryRow.EMPTY_ROW,
+ 0,
+ null,
+ dataIncrement,
+ CompactIncrement.emptyIncrement());
+ try (BatchTableCommit commit = writeBuilder.newCommit()) {
+ commit.commit(Collections.singletonList(message));
+ }
+
+ // Verify the index was committed
+ List<org.apache.paimon.manifest.IndexManifestEntry> indexEntries =
+
table.indexManifestFileReader().read(table.latestSnapshot().get().indexManifest());
+ assertThat(indexEntries).hasSize(1);
+ assertThat(indexEntries.get(0).indexFile().indexType())
+ .isEqualTo(TantivyFullTextGlobalIndexerFactory.IDENTIFIER);
+ }
+}